Repository: incubator-impala
Updated Branches:
  refs/heads/master d6b5f82e3 -> e7cb80b66


IMPALA-5294: Kudu INSERT partitioning fails with constants

An INSERT into a Kudu table with a constant value being inserted
into a partition column causes an IllegalStateExcaption. This is
because DistributedPlanner removes constants from the list of
partition exprs before creating the KuduPartitionExpr, but
KuduPartitionExpr expects to get one expr per partition column.

The fix is to pass the full list of partition exprs into the
KuduPartitionExpr, instead of the list that has had constants
removed. This preserves the behavior that if all of the partition
exprs are constant we fall back to UNPARTITIONED.

One complication is that if a partition expr is a NullLiteral, it
must be cast to a specific type to be passed to the BE. The
InsertStmt will cast the partition exprs to the partition column
types, but these casts may be lost from the copies of the partition
exprs stored by the KuduPartitionExpr during reset(). To fix this,
the KuduPartitionExpr can store the types of the partition cols and
recast the partition exprs to those types during analyze().

Change-Id: I12cbb319f9a5c47fdbfee347b47650186b27f8f9
Reviewed-on: http://gerrit.cloudera.org:8080/6828
Reviewed-by: Thomas Tauber-Marshall <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b8c8fb1b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b8c8fb1b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b8c8fb1b

Branch: refs/heads/master
Commit: b8c8fb1b439ceffc6e167089184b14559f03699c
Parents: d6b5f82
Author: Thomas Tauber-Marshall <[email protected]>
Authored: Mon May 8 19:10:37 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Wed May 10 22:53:02 2017 +0000

----------------------------------------------------------------------
 .../impala/analysis/KuduPartitionExpr.java      | 14 ++++++++--
 .../impala/planner/DistributedPlanner.java      |  7 ++++-
 .../queries/QueryTest/kudu_insert.test          | 29 ++++++++++++++++++++
 3 files changed, 47 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b8c8fb1b/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java 
b/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java
index 88644e2..cc42804 100644
--- a/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java
@@ -20,6 +20,7 @@ package org.apache.impala.analysis;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TExprNode;
@@ -42,14 +43,16 @@ public class KuduPartitionExpr extends Expr {
 
   // The table to use the partitioning scheme from.
   private final int targetTableId_;
+  private final KuduTable targetTable_;
   // Maps from this Epxrs children to column positions in the table, i.e. 
children_[i]
   // produces the value for column partitionColPos_[i].
   private List<Integer> partitionColPos_;
 
-  public KuduPartitionExpr(
-      int targetTableId, List<Expr> partitionKeyExprs, List<Integer> 
partitionKeyIdxs) {
+  public KuduPartitionExpr(int targetTableId, KuduTable targetTable,
+      List<Expr> partitionKeyExprs, List<Integer> partitionKeyIdxs) {
     Preconditions.checkState(partitionKeyExprs.size() == 
partitionKeyIdxs.size());
     targetTableId_ = targetTableId;
+    targetTable_ = targetTable;
     partitionColPos_ = partitionKeyIdxs;
     children_.addAll(Expr.cloneList(partitionKeyExprs));
   }
@@ -60,12 +63,19 @@ public class KuduPartitionExpr extends Expr {
   protected KuduPartitionExpr(KuduPartitionExpr other) {
     super(other);
     targetTableId_ = other.targetTableId_;
+    targetTable_ = other.targetTable_;
     partitionColPos_ = other.partitionColPos_;
   }
 
   @Override
   protected void analyzeImpl(Analyzer analyzer) throws AnalysisException {
     type_ = Type.INT;
+    // IMPALA-5294: If one of the children is a NullLiteral, it has to be cast 
to a type
+    // to be passed to the BE.
+    for (int i = 0; i < children_.size(); ++i) {
+      children_.get(i).castTo(
+          targetTable_.getColumns().get(partitionColPos_.get(i)).getType());
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b8c8fb1b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 83c8ccb..ac0355e 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -246,8 +246,13 @@ public class DistributedPlanner {
     if (partitionExprs.isEmpty()) {
       partition = DataPartition.UNPARTITIONED;
     } else if (insertStmt.getTargetTable() instanceof KuduTable) {
+      // IMPALA-5294: Don't use 'partitionExpr' here because the constants 
were removed.
+      // We need all of the partition exprs to fill in the partition column 
values for
+      // each row to send to Kudu to determine the partition number.
       Expr kuduPartitionExpr = new 
KuduPartitionExpr(DescriptorTable.TABLE_SINK_ID,
-          partitionExprs, insertStmt.getPartitionColPos());
+          (KuduTable) insertStmt.getTargetTable(),
+          Lists.newArrayList(insertStmt.getPartitionKeyExprs()),
+          insertStmt.getPartitionColPos());
       kuduPartitionExpr.analyze(ctx_.getRootAnalyzer());
       partition = DataPartition.kuduPartitioned(kuduPartitionExpr);
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b8c8fb1b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test 
b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
index 71224ce..9779c7f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
@@ -394,3 +394,32 @@ A, B, C, D, E, F, G, H, I, J
 ---- TYPES
 INT,INT,INT,INT,INT,INT,STRING,BOOLEAN,INT,INT
 ====
+---- QUERY
+create table multiple_partition_cols (x bigint, y bigint, z string, primary 
key(x, y))
+partition by hash(x, y) partitions 8 stored as kudu
+---- RESULTS
+====
+---- QUERY
+# SELECT with constant
+insert into multiple_partition_cols select 0, bigint_col, string_col
+  from functional.alltypes where id = 0
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumRowErrors: 0
+---- LABELS
+X,Y,Z
+---- DML_RESULTS: multiple_partition_cols
+0,0,'0'
+---- TYPES
+BIGINT,BIGINT,STRING
+====
+---- QUERY
+# SELECT with constant NULL
+insert into multiple_partition_cols select bigint_col, null, string_col
+  from functional.alltypes where id = 1
+---- RESULTS
+: 0
+---- RUNTIME_PROFILE
+NumModifiedRows: 0
+NumRowErrors: 1
+====
\ No newline at end of file

Reply via email to