Repository: incubator-impala Updated Branches: refs/heads/master d6b5f82e3 -> e7cb80b66
IMPALA-5294: Kudu INSERT partitioning fails with constants An INSERT into a Kudu table with a constant value being inserted into a partition column causes an IllegalStateExcaption. This is because DistributedPlanner removes constants from the list of partition exprs before creating the KuduPartitionExpr, but KuduPartitionExpr expects to get one expr per partition column. The fix is to pass the full list of partition exprs into the KuduPartitionExpr, instead of the list that has had constants removed. This preserves the behavior that if all of the partition exprs are constant we fall back to UNPARTITIONED. One complication is that if a partition expr is a NullLiteral, it must be cast to a specific type to be passed to the BE. The InsertStmt will cast the partition exprs to the partition column types, but these casts may be lost from the copies of the partition exprs stored by the KuduPartitionExpr during reset(). To fix this, the KuduPartitionExpr can store the types of the partition cols and recast the partition exprs to those types during analyze(). Change-Id: I12cbb319f9a5c47fdbfee347b47650186b27f8f9 Reviewed-on: http://gerrit.cloudera.org:8080/6828 Reviewed-by: Thomas Tauber-Marshall <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b8c8fb1b Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b8c8fb1b Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b8c8fb1b Branch: refs/heads/master Commit: b8c8fb1b439ceffc6e167089184b14559f03699c Parents: d6b5f82 Author: Thomas Tauber-Marshall <[email protected]> Authored: Mon May 8 19:10:37 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Wed May 10 22:53:02 2017 +0000 ---------------------------------------------------------------------- .../impala/analysis/KuduPartitionExpr.java | 14 ++++++++-- .../impala/planner/DistributedPlanner.java | 7 ++++- .../queries/QueryTest/kudu_insert.test | 29 ++++++++++++++++++++ 3 files changed, 47 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b8c8fb1b/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java b/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java index 88644e2..cc42804 100644 --- a/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java +++ b/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java @@ -20,6 +20,7 @@ package org.apache.impala.analysis; import java.util.List; import java.util.Set; +import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; import org.apache.impala.thrift.TExprNode; @@ -42,14 +43,16 @@ public class KuduPartitionExpr extends Expr { // The table to use the partitioning scheme from. private final int targetTableId_; + private final KuduTable targetTable_; // Maps from this Epxrs children to column positions in the table, i.e. children_[i] // produces the value for column partitionColPos_[i]. private List<Integer> partitionColPos_; - public KuduPartitionExpr( - int targetTableId, List<Expr> partitionKeyExprs, List<Integer> partitionKeyIdxs) { + public KuduPartitionExpr(int targetTableId, KuduTable targetTable, + List<Expr> partitionKeyExprs, List<Integer> partitionKeyIdxs) { Preconditions.checkState(partitionKeyExprs.size() == partitionKeyIdxs.size()); targetTableId_ = targetTableId; + targetTable_ = targetTable; partitionColPos_ = partitionKeyIdxs; children_.addAll(Expr.cloneList(partitionKeyExprs)); } @@ -60,12 +63,19 @@ public class KuduPartitionExpr extends Expr { protected KuduPartitionExpr(KuduPartitionExpr other) { super(other); targetTableId_ = other.targetTableId_; + targetTable_ = other.targetTable_; partitionColPos_ = other.partitionColPos_; } @Override protected void analyzeImpl(Analyzer analyzer) throws AnalysisException { type_ = Type.INT; + // IMPALA-5294: If one of the children is a NullLiteral, it has to be cast to a type + // to be passed to the BE. + for (int i = 0; i < children_.size(); ++i) { + children_.get(i).castTo( + targetTable_.getColumns().get(partitionColPos_.get(i)).getType()); + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b8c8fb1b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java index 83c8ccb..ac0355e 100644 --- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java @@ -246,8 +246,13 @@ public class DistributedPlanner { if (partitionExprs.isEmpty()) { partition = DataPartition.UNPARTITIONED; } else if (insertStmt.getTargetTable() instanceof KuduTable) { + // IMPALA-5294: Don't use 'partitionExpr' here because the constants were removed. + // We need all of the partition exprs to fill in the partition column values for + // each row to send to Kudu to determine the partition number. Expr kuduPartitionExpr = new KuduPartitionExpr(DescriptorTable.TABLE_SINK_ID, - partitionExprs, insertStmt.getPartitionColPos()); + (KuduTable) insertStmt.getTargetTable(), + Lists.newArrayList(insertStmt.getPartitionKeyExprs()), + insertStmt.getPartitionColPos()); kuduPartitionExpr.analyze(ctx_.getRootAnalyzer()); partition = DataPartition.kuduPartitioned(kuduPartitionExpr); } else { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b8c8fb1b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test index 71224ce..9779c7f 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test @@ -394,3 +394,32 @@ A, B, C, D, E, F, G, H, I, J ---- TYPES INT,INT,INT,INT,INT,INT,STRING,BOOLEAN,INT,INT ==== +---- QUERY +create table multiple_partition_cols (x bigint, y bigint, z string, primary key(x, y)) +partition by hash(x, y) partitions 8 stored as kudu +---- RESULTS +==== +---- QUERY +# SELECT with constant +insert into multiple_partition_cols select 0, bigint_col, string_col + from functional.alltypes where id = 0 +---- RUNTIME_PROFILE +NumModifiedRows: 1 +NumRowErrors: 0 +---- LABELS +X,Y,Z +---- DML_RESULTS: multiple_partition_cols +0,0,'0' +---- TYPES +BIGINT,BIGINT,STRING +==== +---- QUERY +# SELECT with constant NULL +insert into multiple_partition_cols select bigint_col, null, string_col + from functional.alltypes where id = 1 +---- RESULTS +: 0 +---- RUNTIME_PROFILE +NumModifiedRows: 0 +NumRowErrors: 1 +==== \ No newline at end of file
