Repository: incubator-impala
Updated Branches:
  refs/heads/master 77304530f -> 8b459dffe


IMPALA-3742: Partitions and sort INSERTs for Kudu tables

Bulk DMLs (INSERT, UPSERT, UPDATE, and DELETE) for Kudu
are currently painful because we just send rows randomly,
which creates a lot of work for Kudu since it partitions
and sorts data before writing, causing writes to be slow
and leading to timeouts.

We can alleviate this by sending the rows to Kudu already
partitioned and sorted. This patch partitions and sorts
rows according to Kudu's partitioning scheme for INSERTs
and UPSERTs. A followup patch will handle UPDATE and DELETE.

It accomplishes this by inserting an exchange node and a sort
node into the plan before the operation. Both the exchange and
the sort are given a KuduPartitionExpr which takes a row and
calls into the Kudu client to return its partition number.

It also disallows INSERT hints for Kudu tables, since the
hints that we support (SHUFFLE, CLUSTER, SORTBY), so longer
make sense.

Testing:
- Updated planner tests.
- Ran the Kudu functional tests.
- Ran performance tests demonstrating that we can now handle much
  larger inserts without having timeouts.

Change-Id: I84ce0032a1b10958fdf31faef225372c5c38fdc4
Reviewed-on: http://gerrit.cloudera.org:8080/6559
Reviewed-by: Thomas Tauber-Marshall <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/801c95f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/801c95f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/801c95f3

Branch: refs/heads/master
Commit: 801c95f39f9de6c29380910274f97748ea8e47a9
Parents: 7730453
Author: Thomas Tauber-Marshall <[email protected]>
Authored: Wed Apr 5 12:35:53 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Tue May 2 01:40:43 2017 +0000

----------------------------------------------------------------------
 be/src/exec/kudu-table-sink.cc                  |  48 +--------
 be/src/exec/kudu-util.cc                        |  52 +++++++++
 be/src/exec/kudu-util.h                         |   9 ++
 be/src/exprs/CMakeLists.txt                     |   1 +
 be/src/exprs/expr-context.h                     |   1 +
 be/src/exprs/expr.cc                            |   4 +
 be/src/exprs/kudu-partition-expr.cc             |  94 ++++++++++++++++
 be/src/exprs/kudu-partition-expr.h              |  62 +++++++++++
 be/src/runtime/coordinator.cc                   |  12 +++
 be/src/runtime/data-stream-sender.cc            |  42 ++++++--
 be/src/runtime/data-stream-sender.h             |   7 +-
 be/src/scheduling/scheduler.cc                  |   3 +-
 common/thrift/Exprs.thrift                      |  16 ++-
 common/thrift/Partitions.thrift                 |   8 +-
 .../org/apache/impala/analysis/InsertStmt.java  |  70 +++++++-----
 .../impala/analysis/KuduPartitionExpr.java      |  94 ++++++++++++++++
 .../org/apache/impala/catalog/KuduTable.java    |   9 ++
 .../apache/impala/planner/DataPartition.java    |   8 +-
 .../impala/planner/DistributedPlanner.java      |  22 +++-
 .../java/org/apache/impala/planner/Planner.java |  16 ++-
 .../org/apache/impala/planner/TableSink.java    |   2 -
 .../impala/analysis/AnalyzeStmtsTest.java       |  16 ++-
 .../impala/analysis/AnalyzeUpsertStmtTest.java  |   9 +-
 .../queries/PlannerTest/kudu-upsert.test        | 106 ++++++++++++++++++-
 .../queries/PlannerTest/kudu.test               |  39 ++++---
 .../queries/QueryTest/kudu_insert.test          |  36 -------
 26 files changed, 616 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index 9b0085c..f09b832 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -251,53 +251,7 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* 
batch) {
       }
 
       PrimitiveType type = output_expr_ctxs_[j]->root()->type().type;
-      switch (type) {
-        case TYPE_VARCHAR:
-        case TYPE_STRING: {
-          StringValue* sv = reinterpret_cast<StringValue*>(value);
-          kudu::Slice slice(reinterpret_cast<uint8_t*>(sv->ptr), sv->len);
-          KUDU_RETURN_IF_ERROR(write->mutable_row()->SetString(col, slice),
-              "Could not add Kudu WriteOp.");
-          break;
-        }
-        case TYPE_FLOAT:
-          KUDU_RETURN_IF_ERROR(
-              write->mutable_row()->SetFloat(col, 
*reinterpret_cast<float*>(value)),
-              "Could not add Kudu WriteOp.");
-          break;
-        case TYPE_DOUBLE:
-          KUDU_RETURN_IF_ERROR(
-              write->mutable_row()->SetDouble(col, 
*reinterpret_cast<double*>(value)),
-              "Could not add Kudu WriteOp.");
-          break;
-        case TYPE_BOOLEAN:
-          KUDU_RETURN_IF_ERROR(
-              write->mutable_row()->SetBool(col, 
*reinterpret_cast<bool*>(value)),
-              "Could not add Kudu WriteOp.");
-          break;
-        case TYPE_TINYINT:
-          KUDU_RETURN_IF_ERROR(
-              write->mutable_row()->SetInt8(col, 
*reinterpret_cast<int8_t*>(value)),
-              "Could not add Kudu WriteOp.");
-          break;
-        case TYPE_SMALLINT:
-          KUDU_RETURN_IF_ERROR(
-              write->mutable_row()->SetInt16(col, 
*reinterpret_cast<int16_t*>(value)),
-              "Could not add Kudu WriteOp.");
-          break;
-        case TYPE_INT:
-          KUDU_RETURN_IF_ERROR(
-              write->mutable_row()->SetInt32(col, 
*reinterpret_cast<int32_t*>(value)),
-              "Could not add Kudu WriteOp.");
-          break;
-        case TYPE_BIGINT:
-          KUDU_RETURN_IF_ERROR(
-              write->mutable_row()->SetInt64(col, 
*reinterpret_cast<int64_t*>(value)),
-              "Could not add Kudu WriteOp.");
-          break;
-        default:
-          return Status(TErrorCode::IMPALA_KUDU_TYPE_MISSING, 
TypeToString(type));
-      }
+      WriteKuduRowValue(write->mutable_row(), col, type, value);
     }
     if (add_row) write_ops.push_back(move(write));
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/exec/kudu-util.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-util.cc b/be/src/exec/kudu-util.cc
index e7afb7c..59a4c81 100644
--- a/be/src/exec/kudu-util.cc
+++ b/be/src/exec/kudu-util.cc
@@ -27,6 +27,7 @@
 #include "common/logging.h"
 #include "common/names.h"
 #include "common/status.h"
+#include "runtime/string-value.h"
 
 using kudu::client::KuduSchema;
 using kudu::client::KuduClient;
@@ -107,4 +108,55 @@ void InitKuduLogging() {
   kudu::client::SetVerboseLogLevel(std::max(0, FLAGS_v - 1));
 }
 
+Status WriteKuduRowValue(kudu::KuduPartialRow* row, int col, PrimitiveType 
type,
+    const void* value, bool copy_strings) {
+  // TODO: codegen this to eliminate braching on type.
+  switch (type) {
+    case TYPE_VARCHAR:
+    case TYPE_STRING: {
+      const StringValue* sv = reinterpret_cast<const StringValue*>(value);
+      kudu::Slice slice(reinterpret_cast<uint8_t*>(sv->ptr), sv->len);
+      if (copy_strings) {
+        KUDU_RETURN_IF_ERROR(row->SetString(col, slice), "Could not set Kudu 
row value.");
+      } else {
+        KUDU_RETURN_IF_ERROR(
+            row->SetStringNoCopy(col, slice), "Could not set Kudu row value.");
+      }
+      break;
+    }
+    case TYPE_FLOAT:
+      KUDU_RETURN_IF_ERROR(row->SetFloat(col, *reinterpret_cast<const 
float*>(value)),
+          "Could not set Kudu row value.");
+      break;
+    case TYPE_DOUBLE:
+      KUDU_RETURN_IF_ERROR(row->SetDouble(col, *reinterpret_cast<const 
double*>(value)),
+          "Could not set Kudu row value.");
+      break;
+    case TYPE_BOOLEAN:
+      KUDU_RETURN_IF_ERROR(row->SetBool(col, *reinterpret_cast<const 
bool*>(value)),
+          "Could not set Kudu row value.");
+      break;
+    case TYPE_TINYINT:
+      KUDU_RETURN_IF_ERROR(row->SetInt8(col, *reinterpret_cast<const 
int8_t*>(value)),
+          "Could not set Kudu row value.");
+      break;
+    case TYPE_SMALLINT:
+      KUDU_RETURN_IF_ERROR(row->SetInt16(col, *reinterpret_cast<const 
int16_t*>(value)),
+          "Could not set Kudu row value.");
+      break;
+    case TYPE_INT:
+      KUDU_RETURN_IF_ERROR(row->SetInt32(col, *reinterpret_cast<const 
int32_t*>(value)),
+          "Could not set Kudu row value.");
+      break;
+    case TYPE_BIGINT:
+      KUDU_RETURN_IF_ERROR(row->SetInt64(col, *reinterpret_cast<const 
int64_t*>(value)),
+          "Could not set Kudu row value.");
+      break;
+    default:
+      return Status(TErrorCode::IMPALA_KUDU_TYPE_MISSING, TypeToString(type));
+  }
+
+  return Status::OK();
+}
+
 }  // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/exec/kudu-util.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-util.h b/be/src/exec/kudu-util.h
index 7774812..27765df 100644
--- a/be/src/exec/kudu-util.h
+++ b/be/src/exec/kudu-util.h
@@ -21,6 +21,8 @@
 #include <kudu/client/callbacks.h>
 #include <kudu/client/client.h>
 
+#include "runtime/types.h"
+
 namespace impala {
 
 class Status;
@@ -55,6 +57,13 @@ void InitKuduLogging();
 void LogKuduMessage(kudu::client::KuduLogSeverity severity, const char* 
filename,
     int line_number, const struct ::tm* time, const char* message, size_t 
message_len);
 
+/// Casts 'value' according to 'type' and writes it into 'row' at position 
'col'.
+/// If 'type' is STRING or VARCHAR, 'copy_strings' determines if 'value' will 
be copied
+/// into memory owned by the row. If false, string data must remain valid 
while the row is
+/// being used.
+Status WriteKuduRowValue(kudu::KuduPartialRow* row, int col, PrimitiveType 
type,
+    const void* value, bool copy_strings = true);
+
 /// Takes a Kudu status and returns an impala one, if it's not OK.
 #define KUDU_RETURN_IF_ERROR(expr, prepend) \
   do { \

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/exprs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt
index 87a9ae0..c3cc151 100644
--- a/be/src/exprs/CMakeLists.txt
+++ b/be/src/exprs/CMakeLists.txt
@@ -42,6 +42,7 @@ add_library(Exprs
   in-predicate-ir.cc
   is-not-empty-predicate.cc
   is-null-predicate-ir.cc
+  kudu-partition-expr.cc
   like-predicate.cc
   like-predicate-ir.cc
   literal.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/exprs/expr-context.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-context.h b/be/src/exprs/expr-context.h
index 6903105..fe9bf44 100644
--- a/be/src/exprs/expr-context.h
+++ b/be/src/exprs/expr-context.h
@@ -152,6 +152,7 @@ class ExprContext {
   friend class CaseExpr;
   friend class HiveUdfCall;
   friend class ScalarFnCall;
+  friend class KuduPartitionExpr;
 
   /// FunctionContexts for each registered expression. The FunctionContexts 
are created
   /// and owned by this ExprContext.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/exprs/expr.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr.cc b/be/src/exprs/expr.cc
index 32c16a0..c7fe2ab 100644
--- a/be/src/exprs/expr.cc
+++ b/be/src/exprs/expr.cc
@@ -44,6 +44,7 @@
 #include "exprs/in-predicate.h"
 #include "exprs/is-not-empty-predicate.h"
 #include "exprs/is-null-predicate.h"
+#include "exprs/kudu-partition-expr.h"
 #include "exprs/like-predicate.h"
 #include "exprs/literal.h"
 #include "exprs/math-functions.h"
@@ -262,6 +263,9 @@ Status Expr::CreateExpr(ObjectPool* pool, const TExprNode& 
texpr_node, Expr** ex
     case TExprNodeType::IS_NOT_EMPTY_PRED:
       *expr = pool->Add(new IsNotEmptyPredicate(texpr_node));
       return Status::OK();
+    case TExprNodeType::KUDU_PARTITION_EXPR:
+      *expr = pool->Add(new KuduPartitionExpr(texpr_node));
+      return Status::OK();
     default:
       stringstream os;
       os << "Unknown expr node type: " << texpr_node.node_type;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/exprs/kudu-partition-expr.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/kudu-partition-expr.cc 
b/be/src/exprs/kudu-partition-expr.cc
new file mode 100644
index 0000000..2faaee5
--- /dev/null
+++ b/be/src/exprs/kudu-partition-expr.cc
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/kudu-partition-expr.h"
+
+#include <gutil/strings/substitute.h>
+
+#include "exec/kudu-util.h"
+#include "exprs/expr-context.h"
+#include "runtime/query-state.h"
+#include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
+#include "runtime/tuple.h"
+#include "runtime/types.h"
+
+namespace impala {
+
+KuduPartitionExpr::KuduPartitionExpr(const TExprNode& node)
+  : Expr(node), tkudu_partition_expr_(node.kudu_partition_expr) {}
+
+Status KuduPartitionExpr::Prepare(
+    RuntimeState* state, const RowDescriptor& row_desc, ExprContext* ctx) {
+  RETURN_IF_ERROR(Expr::Prepare(state, row_desc, ctx));
+  DCHECK_EQ(tkudu_partition_expr_.referenced_columns.size(), children_.size());
+
+  // Create the KuduPartitioner we'll use to get the partition index for each 
row.
+  TableDescriptor* table_desc =
+      
state->desc_tbl().GetTableDescriptor(tkudu_partition_expr_.target_table_id);
+  DCHECK(table_desc != nullptr);
+  DCHECK(dynamic_cast<KuduTableDescriptor*>(table_desc))
+      << "Target table for KuduPartitioner must be a Kudu table.";
+  table_desc_ = static_cast<KuduTableDescriptor*>(table_desc);
+  kudu::client::KuduClient* client;
+  RETURN_IF_ERROR(
+      
state->query_state()->GetKuduClient(table_desc_->kudu_master_addresses(), 
&client));
+  kudu::client::sp::shared_ptr<kudu::client::KuduTable> table;
+  KUDU_RETURN_IF_ERROR(
+      client->OpenTable(table_desc_->table_name(), &table), "Failed to open 
Kudu table.");
+  kudu::client::KuduPartitionerBuilder b(table);
+  kudu::client::KuduPartitioner* partitioner;
+  KUDU_RETURN_IF_ERROR(b.Build(&partitioner), "Failed to build Kudu 
partitioner.");
+  partitioner_.reset(partitioner);
+  row_.reset(table->schema().NewRow());
+
+  return Status::OK();
+}
+
+IntVal KuduPartitionExpr::GetIntVal(ExprContext* ctx, const TupleRow* row) {
+  for (int i = 0; i < children_.size(); ++i) {
+    void* val = ctx->GetValue(GetChild(i), row);
+    if (val == NULL) {
+      // We don't currently support nullable partition columns, but pass it 
along and let
+      // the KuduTableSink generate the error message.
+      return IntVal(-1);
+    }
+    int col = tkudu_partition_expr_.referenced_columns[i];
+    const ColumnDescriptor& col_desc = table_desc_->col_descs()[col];
+    PrimitiveType type = col_desc.type().type;
+    Status s = WriteKuduRowValue(row_.get(), col, type, val, false);
+    // This can only fail if we set a col to an incorect type, which would be 
a bug in
+    // planning, so we can DCHECK.
+    DCHECK(s.ok()) << "WriteKuduRowValue failed for col = " << col_desc.name()
+                   << " and type = " << col_desc.type() << ": " << 
s.GetDetail();
+  }
+
+  int32_t kudu_partition = -1;
+  kudu::Status s = partitioner_->PartitionRow(*row_.get(), &kudu_partition);
+  // This can only fail if we fail to supply some of the partition cols, which 
would be a
+  // bug in planning, so we can DCHECK.
+  DCHECK(s.ok()) << "KuduPartitioner::PartitionRow failed on row = '" << 
row_->ToString()
+                 << "': " << s.ToString();
+  return IntVal(kudu_partition);
+}
+
+Status KuduPartitionExpr::GetCodegendComputeFn(
+    LlvmCodeGen* codegen, llvm::Function** fn) {
+  return Status("Error: KuduPartitionExpr::GetCodegendComputeFn not 
implemented.");
+}
+
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/exprs/kudu-partition-expr.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/kudu-partition-expr.h 
b/be/src/exprs/kudu-partition-expr.h
new file mode 100644
index 0000000..6620338
--- /dev/null
+++ b/be/src/exprs/kudu-partition-expr.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXPRS_KUDU_PARTITION_EXPR_H_
+#define IMPALA_EXPRS_KUDU_PARTITION_EXPR_H_
+
+#include <kudu/client/client.h>
+
+#include "exprs/expr.h"
+
+namespace impala {
+
+class KuduTableDescriptor;
+class TExprNode;
+class TKuduPartitionExpr;
+
+/// Expr that calls into the Kudu client to determine the partition index for 
rows.
+/// Returns -1 if the row doesn't have a partition or if an error is 
encountered.
+/// The children of this Expr produce the values for the partition columns.
+class KuduPartitionExpr : public Expr {
+ protected:
+  friend class Expr;
+
+  KuduPartitionExpr(const TExprNode& node);
+
+  virtual Status Prepare(
+      RuntimeState* state, const RowDescriptor& row_desc, ExprContext* ctx);
+
+  virtual IntVal GetIntVal(ExprContext* ctx, const TupleRow* row);
+
+  virtual Status GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** 
fn);
+
+ private:
+  TKuduPartitionExpr tkudu_partition_expr_;
+
+  /// Descriptor of the table to use the partiitoning scheme from. Set in 
Prepare().
+  KuduTableDescriptor* table_desc_;
+
+  /// Used to call into Kudu to determine partitions. Set in Prepare().
+  std::unique_ptr<kudu::client::KuduPartitioner> partitioner_;
+
+  /// Stores the col values for each row that is partitioned.
+  std::unique_ptr<kudu::KuduPartialRow> row_;
+};
+
+} // namespace impala
+
+#endif // IMPALA_EXPRS_KUDU_PARTITION_EXPR_H_

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index a8284c2..bede0ef 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -1842,6 +1842,18 @@ void Coordinator::SetExecPlanDescriptorTable(const 
TPlanFragment& fragment,
     table_ids.insert(fragment.output_sink.table_sink.target_table_id);
   }
 
+  // For DataStreamSinks that partition according to the partitioning scheme 
of a Kudu
+  // table, we need the corresponding tableId.
+  if (fragment.__isset.output_sink && fragment.output_sink.__isset.stream_sink
+      && fragment.output_sink.type == TDataSinkType::DATA_STREAM_SINK
+      && fragment.output_sink.stream_sink.output_partition.type == 
TPartitionType::KUDU) {
+    TDataPartition partition = 
fragment.output_sink.stream_sink.output_partition;
+    DCHECK_EQ(partition.partition_exprs.size(), 1);
+    DCHECK(partition.partition_exprs[0].nodes[0].__isset.kudu_partition_expr);
+    table_ids.insert(
+        
partition.partition_exprs[0].nodes[0].kudu_partition_expr.target_table_id);
+  }
+
   // Iterate over all TTableDescriptor(s) and add the ones that are needed.
   for (const TTableDescriptor& table_desc: desc_tbl_.tableDescriptors) {
     if (table_ids.find(table_desc.id) == table_ids.end()) continue;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc 
b/be/src/runtime/data-stream-sender.cc
index 9e0ddc7..dc98c49 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -331,6 +331,7 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int 
sender_id,
     int per_channel_buffer_size)
   : DataSink(row_desc),
     sender_id_(sender_id),
+    partition_type_(sink.output_partition.type),
     current_channel_idx_(0),
     flushed_(false),
     closed_(false),
@@ -339,13 +340,13 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int 
sender_id,
     thrift_transmit_timer_(NULL),
     bytes_sent_counter_(NULL),
     total_sent_rows_counter_(NULL),
-    dest_node_id_(sink.dest_node_id) {
+    dest_node_id_(sink.dest_node_id),
+    next_unknown_partition_(0) {
   DCHECK_GT(destinations.size(), 0);
   DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED
       || sink.output_partition.type == TPartitionType::HASH_PARTITIONED
-      || sink.output_partition.type == TPartitionType::RANDOM);
-  broadcast_ = sink.output_partition.type == TPartitionType::UNPARTITIONED;
-  random_ = sink.output_partition.type == TPartitionType::RANDOM;
+      || sink.output_partition.type == TPartitionType::RANDOM
+      || sink.output_partition.type == TPartitionType::KUDU);
   // TODO: use something like google3's linked_ptr here (scoped_ptr isn't 
copyable)
   for (int i = 0; i < destinations.size(); ++i) {
     channels_.push_back(
@@ -354,17 +355,18 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int 
sender_id,
                     sink.dest_node_id, per_channel_buffer_size));
   }
 
-  if (broadcast_ || random_) {
+  if (partition_type_ == TPartitionType::UNPARTITIONED
+      || partition_type_ == TPartitionType::RANDOM) {
     // Randomize the order we open/transmit to channels to avoid thundering 
herd problems.
     srand(reinterpret_cast<uint64_t>(this));
     random_shuffle(channels_.begin(), channels_.end());
   }
 
-  if (sink.output_partition.type == TPartitionType::HASH_PARTITIONED) {
+  if (partition_type_ == TPartitionType::HASH_PARTITIONED
+      || partition_type_ == TPartitionType::KUDU) {
     // TODO: move this to Init()? would need to save 'sink' somewhere
-    Status status =
-        Expr::CreateExprTrees(pool, sink.output_partition.partition_exprs,
-                              &partition_expr_ctxs_);
+    Status status = Expr::CreateExprTrees(
+        pool, sink.output_partition.partition_exprs, &partition_expr_ctxs_);
     DCHECK(status.ok());
   }
 }
@@ -420,7 +422,8 @@ Status DataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
   DCHECK(!flushed_);
 
   if (batch->num_rows() == 0) return Status::OK();
-  if (broadcast_ || channels_.size() == 1) {
+  if (partition_type_ == TPartitionType::UNPARTITIONED
+      || channels_.size() == 1) {
     // current_thrift_batch_ is *not* the one that was written by the last call
     // to Serialize()
     RETURN_IF_ERROR(SerializeBatch(batch, current_thrift_batch_, 
channels_.size()));
@@ -431,7 +434,7 @@ Status DataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
     }
     current_thrift_batch_ =
         (current_thrift_batch_ == &thrift_batch1_ ? &thrift_batch2_ : 
&thrift_batch1_);
-  } else if (random_) {
+  } else if (partition_type_ == TPartitionType::RANDOM) {
     // Round-robin batches among channels. Wait for the current channel to 
finish its
     // rpc before overwriting its batch.
     Channel* current_channel = channels_[current_channel_idx_];
@@ -439,8 +442,25 @@ Status DataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
     RETURN_IF_ERROR(SerializeBatch(batch, current_channel->thrift_batch()));
     
RETURN_IF_ERROR(current_channel->SendBatch(current_channel->thrift_batch()));
     current_channel_idx_ = (current_channel_idx_ + 1) % channels_.size();
+  } else if (partition_type_ == TPartitionType::KUDU) {
+    DCHECK_EQ(partition_expr_ctxs_.size(), 1);
+    int num_channels = channels_.size();
+    for (int i = 0; i < batch->num_rows(); ++i) {
+      TupleRow* row = batch->GetRow(i);
+      int32_t partition =
+          *reinterpret_cast<int32_t*>(partition_expr_ctxs_[0]->GetValue(row));
+      if (partition < 0) {
+        // This row doesn't coorespond to a partition, e.g. it's outside the 
given ranges.
+        partition = next_unknown_partition_;
+        ++next_unknown_partition_;
+      }
+      channels_[partition % num_channels]->AddRow(row);
+    }
   } else {
+    DCHECK(partition_type_ == TPartitionType::HASH_PARTITIONED);
     // hash-partition batch's rows across channels
+    // TODO: encapsulate this in an Expr as we've done for Kudu above and 
remove this case
+    // once we have codegen here.
     int num_channels = channels_.size();
     for (int i = 0; i < batch->num_rows(); ++i) {
       TupleRow* row = batch->GetRow(i);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/runtime/data-stream-sender.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.h 
b/be/src/runtime/data-stream-sender.h
index 60b9025..d157577 100644
--- a/be/src/runtime/data-stream-sender.h
+++ b/be/src/runtime/data-stream-sender.h
@@ -104,8 +104,7 @@ class DataStreamSender : public DataSink {
   /// Sender instance id, unique within a fragment.
   int sender_id_;
   RuntimeState* state_;
-  bool broadcast_;  // if true, send all rows on all channels
-  bool random_; // if true, round-robins row batches among channels
+  TPartitionType::type partition_type_; // The type of partitioning to perform.
   int current_channel_idx_; // index of current channel to send to if random_ 
== true
 
   /// If true, this sender has called FlushFinal() successfully.
@@ -139,6 +138,10 @@ class DataStreamSender : public DataSink {
 
   /// Identifier of the destination plan node.
   PlanNodeId dest_node_id_;
+
+  /// Used for Kudu partitioning to round-robin rows that don't correspond to 
a partition
+  /// or when errors are encountered.
+  int next_unknown_partition_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index bca5965..988a0b4 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -304,7 +304,8 @@ void Scheduler::ComputeFragmentExecParams(QuerySchedule* 
schedule) {
       const TDataStreamSink& sink = src_fragment.output_sink.stream_sink;
       DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED
           || sink.output_partition.type == TPartitionType::HASH_PARTITIONED
-          || sink.output_partition.type == TPartitionType::RANDOM);
+          || sink.output_partition.type == TPartitionType::RANDOM
+          || sink.output_partition.type == TPartitionType::KUDU);
       PlanNodeId exch_id = sink.dest_node_id;
       int sender_id_base = dest_params->per_exch_num_senders[exch_id];
       dest_params->per_exch_num_senders[exch_id] +=

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/common/thrift/Exprs.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Exprs.thrift b/common/thrift/Exprs.thrift
index fc0f4ee..7c88f2b 100644
--- a/common/thrift/Exprs.thrift
+++ b/common/thrift/Exprs.thrift
@@ -37,7 +37,8 @@ enum TExprNodeType {
   TUPLE_IS_NULL_PRED,
   FUNCTION_CALL,
   AGGREGATE_EXPR,
-  IS_NOT_EMPTY_PRED
+  IS_NOT_EMPTY_PRED,
+  KUDU_PARTITION_EXPR
 }
 
 struct TBoolLiteral {
@@ -122,6 +123,18 @@ struct TAggregateExpr {
   2: required list<Types.TColumnType> arg_types;
 }
 
+// Expr used to call into the Kudu client to determine the partition index for 
rows. The
+// values for the partition columns are produced by its children.
+struct TKuduPartitionExpr {
+  // The Kudu table to use the partitioning scheme from.
+  1: required Types.TTableId target_table_id
+
+  // Mapping from the children of this expr to their column positions in the 
table, i.e.
+  // child(i) produces the value for column referenced_columns[i].
+  // TODO: Include the partition cols in the KuduTableDesciptor and remove 
this.
+  2: required list<i32> referenced_columns
+}
+
 // This is essentially a union over the subclasses of Expr.
 struct TExprNode {
   1: required TExprNodeType node_type
@@ -151,6 +164,7 @@ struct TExprNode {
   18: optional TDecimalLiteral decimal_literal
   19: optional TAggregateExpr agg_expr
   20: optional TTimestampLiteral timestamp_literal
+  21: optional TKuduPartitionExpr kudu_partition_expr
 }
 
 // A flattened representation of a tree of Expr nodes, obtained by depth-first

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/common/thrift/Partitions.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Partitions.thrift b/common/thrift/Partitions.thrift
index 0e918d1..efc9b08 100644
--- a/common/thrift/Partitions.thrift
+++ b/common/thrift/Partitions.thrift
@@ -32,7 +32,13 @@ enum TPartitionType {
 
   // ordered partition on a list of exprs
   // (partition bounds don't overlap)
-  RANGE_PARTITIONED
+  RANGE_PARTITIONED,
+
+  // use the partitioning scheme of a Kudu table
+  // TODO: this is a special case now because Kudu supports multilevel 
partition
+  // schemes. We should add something like lists of TDataPartitions to reflect 
that
+  // and then this can be removed. (IMPALA-5255)
+  KUDU
 }
 
 // Specification of how a single logical data stream is partitioned.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index b7172da..83c6e40 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -106,9 +106,17 @@ public class InsertStmt extends StatementBase {
   // Set in analyze(). Contains metadata of target table to determine type of 
sink.
   private Table table_;
 
-  // Set in analyze(). Exprs corresponding to the partitionKeyValues.
+  // Set in analyze(). Exprs correspond to the partitionKeyValues, if 
specified, or to
+  // the partition columns for Kudu tables.
   private List<Expr> partitionKeyExprs_ = Lists.newArrayList();
 
+  // Set in analyze(). Maps exprs in partitionKeyExprs_ to their column's 
position in the
+  // table, eg. partitionKeyExprs_[i] corresponds to 
table_.columns(partitionKeyIdx_[i]).
+  // For Kudu tables, the primary keys are a leading subset of the cols, and 
the partition
+  // cols can be any subset of the primary keys, meaning that this list will 
be in
+  // ascending order from '0' to '# primary key cols - 1' but may leave out 
some numbers.
+  private List<Integer> partitionColPos_ = Lists.newArrayList();
+
   // Indicates whether this insert stmt has a shuffle or noshuffle plan hint.
   // Both flags may be false. Only one of them may be true, not both.
   // Shuffle forces data repartitioning before the data sink, and noshuffle
@@ -218,6 +226,7 @@ public class InsertStmt extends StatementBase {
     queryStmt_.reset();
     table_ = null;
     partitionKeyExprs_.clear();
+    partitionColPos_.clear();
     hasShuffleHint_ = false;
     hasNoShuffleHint_ = false;
     hasClusteredHint_ = false;
@@ -633,6 +642,11 @@ public class InsertStmt extends StatementBase {
     List<String> tmpPartitionKeyNames = new ArrayList<String>();
 
     int numClusteringCols = (tbl instanceof HBaseTable) ? 0 : 
tbl.getNumClusteringCols();
+    boolean isKuduTable = table_ instanceof KuduTable;
+    Set<String> kuduPartitionColumnNames = null;
+    if (isKuduTable) {
+      kuduPartitionColumnNames = ((KuduTable) 
table_).getPartitionColumnNames();
+    }
 
     // Check dynamic partition columns for type compatibility.
     for (int i = 0; i < selectListExprs.size(); ++i) {
@@ -643,6 +657,11 @@ public class InsertStmt extends StatementBase {
         // This is a dynamic clustering column
         tmpPartitionKeyExprs.add(compatibleExpr);
         tmpPartitionKeyNames.add(targetColumn.getName());
+      } else if (isKuduTable) {
+        if (kuduPartitionColumnNames.contains(targetColumn.getName())) {
+          tmpPartitionKeyExprs.add(compatibleExpr);
+          tmpPartitionKeyNames.add(targetColumn.getName());
+        }
       }
       selectListExprs.set(i, compatibleExpr);
     }
@@ -663,24 +682,28 @@ public class InsertStmt extends StatementBase {
     }
 
     // Reorder the partition key exprs and names to be consistent with the 
target table
-    // declaration.  We need those exprs in the original order to create the 
corresponding
-    // Hdfs folder structure correctly.
-    for (Column c: table_.getColumns()) {
+    // declaration, and store their column positions.  We need those exprs in 
the original
+    // order to create the corresponding Hdfs folder structure correctly, or 
the indexes
+    // to construct rows to pass to the Kudu partitioning API.
+    for (int i = 0; i < table_.getColumns().size(); ++i) {
+      Column c = table_.getColumns().get(i);
       for (int j = 0; j < tmpPartitionKeyNames.size(); ++j) {
         if (c.getName().equals(tmpPartitionKeyNames.get(j))) {
           partitionKeyExprs_.add(tmpPartitionKeyExprs.get(j));
+          partitionColPos_.add(i);
           break;
         }
       }
     }
 
-    Preconditions.checkState(partitionKeyExprs_.size() == numClusteringCols);
+    Preconditions.checkState(
+        (isKuduTable && partitionKeyExprs_.size() == 
kuduPartitionColumnNames.size())
+        || partitionKeyExprs_.size() == numClusteringCols);
     // Make sure we have stats for partitionKeyExprs
     for (Expr expr: partitionKeyExprs_) {
       expr.analyze(analyzer);
     }
 
-    boolean isKuduTable = table_ instanceof KuduTable;
     // Finally, 'undo' the permutation so that the selectListExprs are in Hive 
column
     // order, and add NULL expressions to all missing columns, unless this is 
an UPSERT.
     ArrayList<Column> columns = table_.getColumnsInHiveOrder();
@@ -741,9 +764,13 @@ public class InsertStmt extends StatementBase {
   }
 
   private void analyzePlanHints(Analyzer analyzer) throws AnalysisException {
-    if (!planHints_.isEmpty() && table_ instanceof HBaseTable) {
+    if (planHints_.isEmpty()) return;
+    if (isUpsert_) {
+      throw new AnalysisException("Hints not supported in UPSERT statements.");
+    }
+    if (table_ instanceof HBaseTable || table_ instanceof KuduTable) {
       throw new AnalysisException(String.format("INSERT hints are only 
supported for " +
-          "inserting into Hdfs and Kudu tables: %s", getTargetTableName()));
+          "inserting into Hdfs tables: %s", getTargetTableName()));
     }
     boolean hasNoClusteredHint = false;
     for (PlanHint hint: planHints_) {
@@ -776,29 +803,17 @@ public class InsertStmt extends StatementBase {
   }
 
   private void analyzeSortByHint(PlanHint hint) throws AnalysisException {
-    // HBase tables don't support insert hints at all (must be enforced by the 
caller).
-    Preconditions.checkState(!(table_ instanceof HBaseTable));
-
-    if (isUpsert_) {
-      throw new AnalysisException("SORTBY hint is not supported in UPSERT 
statements.");
-    }
+    // HBase and Kudu tables don't support insert hints at all (must be 
enforced by the caller).
+    Preconditions.checkState(!(table_ instanceof HBaseTable || table_ 
instanceof KuduTable));
 
     List<String> columnNames = hint.getArgs();
     Preconditions.checkState(!columnNames.isEmpty());
     for (String columnName: columnNames) {
-      // Make sure it's not a Kudu primary key column or Hdfs partition column.
-      if (table_ instanceof KuduTable) {
-        KuduTable kuduTable = (KuduTable) table_;
-        if (kuduTable.isPrimaryKeyColumn(columnName)) {
-          throw new AnalysisException(String.format("SORTBY hint column list 
must not " +
-              "contain Kudu primary key column: '%s'", columnName));
-        }
-      } else {
-        for (Column tableColumn: table_.getClusteringColumns()) {
-          if (tableColumn.getName().equals(columnName)) {
-            throw new AnalysisException(String.format("SORTBY hint column list 
must " +
-                "not contain Hdfs partition column: '%s'", columnName));
-          }
+      // Make sure it's not an Hdfs partition column.
+      for (Column tableColumn: table_.getClusteringColumns()) {
+        if (tableColumn.getName().equals(columnName)) {
+          throw new AnalysisException(String.format("SORTBY hint column list 
must " +
+              "not contain Hdfs partition column: '%s'", columnName));
         }
       }
 
@@ -843,6 +858,7 @@ public class InsertStmt extends StatementBase {
    */
   public QueryStmt getQueryStmt() { return queryStmt_; }
   public List<Expr> getPartitionKeyExprs() { return partitionKeyExprs_; }
+  public List<Integer> getPartitionColPos() { return partitionColPos_; }
   public boolean hasShuffleHint() { return hasShuffleHint_; }
   public boolean hasNoShuffleHint() { return hasNoShuffleHint_; }
   public boolean hasClusteredHint() { return hasClusteredHint_; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java 
b/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java
new file mode 100644
index 0000000..88644e2
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.impala.catalog.Type;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.thrift.TExprNode;
+import org.apache.impala.thrift.TExprNodeType;
+import org.apache.impala.thrift.TKuduPartitionExpr;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * Internal expr that calls into the Kudu client to determine the partition 
index for
+ * a given row. Returns -1 for rows that do not correspond to a partition. The 
children of
+ * this Expr produce the values for the partition columns.
+ */
+public class KuduPartitionExpr extends Expr {
+  private final static Logger LOG = 
LoggerFactory.getLogger(KuduPartitionExpr.class);
+
+  // The table to use the partitioning scheme from.
+  private final int targetTableId_;
+  // Maps from this Epxrs children to column positions in the table, i.e. 
children_[i]
+  // produces the value for column partitionColPos_[i].
+  private List<Integer> partitionColPos_;
+
+  public KuduPartitionExpr(
+      int targetTableId, List<Expr> partitionKeyExprs, List<Integer> 
partitionKeyIdxs) {
+    Preconditions.checkState(partitionKeyExprs.size() == 
partitionKeyIdxs.size());
+    targetTableId_ = targetTableId;
+    partitionColPos_ = partitionKeyIdxs;
+    children_.addAll(Expr.cloneList(partitionKeyExprs));
+  }
+
+  /**
+   * Copy c'tor used in clone().
+   */
+  protected KuduPartitionExpr(KuduPartitionExpr other) {
+    super(other);
+    targetTableId_ = other.targetTableId_;
+    partitionColPos_ = other.partitionColPos_;
+  }
+
+  @Override
+  protected void analyzeImpl(Analyzer analyzer) throws AnalysisException {
+    type_ = Type.INT;
+  }
+
+  @Override
+  protected String toSqlImpl() {
+    StringBuilder sb = new StringBuilder("KuduPartition(");
+    for (int i = 0; i < children_.size(); ++i) {
+      if (i != 0) sb.append(", ");
+      sb.append(children_.get(i).toSql());
+    }
+    sb.append(")");
+    return sb.toString();
+  }
+
+  @Override
+  protected void toThrift(TExprNode msg) {
+    msg.node_type = TExprNodeType.KUDU_PARTITION_EXPR;
+    msg.kudu_partition_expr = new TKuduPartitionExpr();
+    for (int i = 0; i < children_.size(); ++i) {
+      msg.kudu_partition_expr.addToReferenced_columns(partitionColPos_.get(i));
+    }
+    msg.kudu_partition_expr.setTarget_table_id(targetTableId_);
+  }
+
+  @Override
+  public Expr clone() { return new KuduPartitionExpr(this); }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java 
b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index 5cf5fd1..af3fb46 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -19,6 +19,7 @@ package org.apache.impala.catalog;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -150,6 +151,14 @@ public class KuduTable extends Table {
     return ImmutableList.copyOf(partitionBy_);
   }
 
+  public Set<String> getPartitionColumnNames() {
+    Set<String> ret = new HashSet<String>();
+    for (KuduPartitionParam partitionParam : partitionBy_) {
+      ret.addAll(partitionParam.getColumnNames());
+    }
+    return ret;
+  }
+
   /**
    * Returns the range-based partitioning of this table if it exists, null 
otherwise.
    */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/fe/src/main/java/org/apache/impala/planner/DataPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DataPartition.java 
b/fe/src/main/java/org/apache/impala/planner/DataPartition.java
index 9e942e4..67a411a 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataPartition.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataPartition.java
@@ -49,7 +49,8 @@ public class DataPartition {
     Preconditions.checkNotNull(exprs);
     Preconditions.checkState(!exprs.isEmpty());
     Preconditions.checkState(type == TPartitionType.HASH_PARTITIONED
-        || type == TPartitionType.RANGE_PARTITIONED);
+        || type == TPartitionType.RANGE_PARTITIONED
+        || type == TPartitionType.KUDU);
     type_ = type;
     partitionExprs_ = exprs;
   }
@@ -71,6 +72,10 @@ public class DataPartition {
     return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs);
   }
 
+  public static DataPartition kuduPartitioned(Expr expr) {
+    return new DataPartition(TPartitionType.KUDU, Lists.newArrayList(expr));
+  }
+
   public boolean isPartitioned() { return type_ != 
TPartitionType.UNPARTITIONED; }
   public boolean isHashPartitioned() { return type_ == 
TPartitionType.HASH_PARTITIONED; }
   public TPartitionType getType() { return type_; }
@@ -123,6 +128,7 @@ public class DataPartition {
       case HASH_PARTITIONED: return "HASH";
       case RANGE_PARTITIONED: return "RANGE";
       case UNPARTITIONED: return "UNPARTITIONED";
+      case KUDU: return "KUDU";
       default: return "";
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 85a8ab8..e0e325c 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -23,10 +23,13 @@ import java.util.List;
 import org.apache.impala.analysis.AggregateInfo;
 import org.apache.impala.analysis.AnalysisContext;
 import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.InsertStmt;
 import org.apache.impala.analysis.JoinOperator;
+import org.apache.impala.analysis.KuduPartitionExpr;
 import org.apache.impala.analysis.QueryStmt;
+import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.planner.JoinNode.DistributionMode;
@@ -201,15 +204,19 @@ public class DistributedPlanner {
     // Ignore constants for the sake of partitioning.
     Expr.removeConstants(partitionExprs);
 
-    // Do nothing if the input fragment is already appropriately partitioned.
+    // Do nothing if the input fragment is already appropriately partitioned. 
TODO: handle
+    // Kudu tables here (IMPALA-5254).
     DataPartition inputPartition = inputFragment.getDataPartition();
-    if (!partitionExprs.isEmpty() &&
-        analyzer.equivSets(inputPartition.getPartitionExprs(), 
partitionExprs)) {
+    if (!partitionExprs.isEmpty()
+        && analyzer.equivSets(inputPartition.getPartitionExprs(), 
partitionExprs)
+        && !(insertStmt.getTargetTable() instanceof KuduTable)) {
       return inputFragment;
     }
 
-    // Make a cost-based decision only if no user hint was supplied.
-    if (!insertStmt.hasShuffleHint()) {
+    // Make a cost-based decision only if no user hint was supplied and this 
is not a Kudu
+    // table. TODO: consider making a cost based decision for Kudu tables.
+    if (!insertStmt.hasShuffleHint()
+        && !(insertStmt.getTargetTable() instanceof KuduTable)) {
       // If the existing partition exprs are a subset of the table partition 
exprs, check
       // if it is distributed across all nodes. If so, don't repartition.
       if (Expr.isSubset(inputPartition.getPartitionExprs(), partitionExprs)) {
@@ -238,6 +245,11 @@ public class DistributedPlanner {
     DataPartition partition;
     if (partitionExprs.isEmpty()) {
       partition = DataPartition.UNPARTITIONED;
+    } else if (insertStmt.getTargetTable() instanceof KuduTable) {
+      Expr kuduPartitionExpr = new 
KuduPartitionExpr(DescriptorTable.TABLE_SINK_ID,
+          partitionExprs, insertStmt.getPartitionColPos());
+      kuduPartitionExpr.analyze(ctx_.getRootAnalyzer());
+      partition = DataPartition.kuduPartitioned(kuduPartitionExpr);
     } else {
       partition = DataPartition.hashPartitioned(partitionExprs);
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java 
b/fe/src/main/java/org/apache/impala/planner/Planner.java
index ad5bba5..418e0c6 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -24,10 +24,12 @@ import java.util.List;
 import org.apache.impala.analysis.AnalysisContext;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.ColumnLineageGraph;
+import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.ExprSubstitutionMap;
 import org.apache.impala.analysis.InsertStmt;
 import org.apache.impala.analysis.JoinOperator;
+import org.apache.impala.analysis.KuduPartitionExpr;
 import org.apache.impala.analysis.QueryStmt;
 import org.apache.impala.analysis.SortInfo;
 import org.apache.impala.catalog.HBaseTable;
@@ -38,6 +40,7 @@ import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TExplainLevel;
+import org.apache.impala.thrift.TPartitionType;
 import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryExecRequest;
 import org.apache.impala.thrift.TQueryOptions;
@@ -493,12 +496,17 @@ public class Planner {
        Analyzer analyzer) throws ImpalaException {
     List<Expr> orderingExprs = Lists.newArrayList();
 
-    if (insertStmt.hasClusteredHint()) {
-      if (insertStmt.getTargetTable() instanceof KuduTable) {
+    if (insertStmt.getTargetTable() instanceof KuduTable) {
+      if (inputFragment.getDataPartition().getType() == TPartitionType.KUDU) {
+        Preconditions.checkState(
+            inputFragment.getDataPartition().getPartitionExprs().size() == 1);
+        // Only sort for Kudu if we've already partitioned so that we can sort 
the
+        // partitions separately. This will be true if this is a distributed 
exec.
+        
orderingExprs.add(inputFragment.getDataPartition().getPartitionExprs().get(0));
         orderingExprs.addAll(insertStmt.getPrimaryKeyExprs());
-      } else {
-        orderingExprs.addAll(insertStmt.getPartitionKeyExprs());
       }
+    } else if (insertStmt.hasClusteredHint()) {
+      orderingExprs.addAll(insertStmt.getPartitionKeyExprs());
     }
     orderingExprs.addAll(insertStmt.getSortByExprs());
     // Ignore constants for the sake of clustering.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/fe/src/main/java/org/apache/impala/planner/TableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java 
b/fe/src/main/java/org/apache/impala/planner/TableSink.java
index 8595eea..13e79c3 100644
--- a/fe/src/main/java/org/apache/impala/planner/TableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java
@@ -119,8 +119,6 @@ public abstract class TableSink extends DataSink {
     } else if (table instanceof KuduTable) {
       // Kudu doesn't have a way to perform INSERT OVERWRITE.
       Preconditions.checkState(overwrite == false);
-      // Partition clauses don't make sense for Kudu inserts.
-      Preconditions.checkState(partitionKeyExprs.isEmpty());
       // sortby() hint is not supported for Kudu tables.
       Preconditions.checkState(sortByColumns.isEmpty());
       return new KuduTableSink(table, sinkAction, referencedColumns);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index c6a71e3..88297fe 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -1736,8 +1736,14 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
       AnalysisError(String.format(
           "insert into table functional_hbase.alltypes %sshuffle%s " +
           "select * from functional_hbase.alltypes", prefix, suffix),
-          "INSERT hints are only supported for inserting into Hdfs and Kudu 
tables: " +
+          "INSERT hints are only supported for inserting into Hdfs tables: " +
           "functional_hbase.alltypes");
+      // Plan hints do not make sense for inserting into Kudu tables.
+      AnalysisError(String.format(
+          "insert into table functional_kudu.alltypes %sshuffle%s " +
+          "select * from functional_kudu.alltypes", prefix, suffix),
+          "INSERT hints are only supported for inserting into Hdfs tables: " +
+          "functional_kudu.alltypes");
       // Conflicting plan hints.
       AnalysisError("insert into table functional.alltypessmall " +
           "partition (year, month) /* +shuffle,noshuffle */ " +
@@ -1791,14 +1797,6 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
           "partition (year, month) %ssortby(year)%s select * from " +
           "functional.alltypes", prefix, suffix),
           "SORTBY hint column list must not contain Hdfs partition column: 
'year'");
-      // Column in sortby hint must not be a Kudu primary key column.
-      AnalysisError(String.format("insert into functional_kudu.alltypessmall " 
+
-          "%ssortby(id)%s select * from functional_kudu.alltypes", prefix, 
suffix),
-          "SORTBY hint column list must not contain Kudu primary key column: 
'id'");
-      // sortby() hint is not supported in UPSERT queries
-      AnalysisError(String.format("upsert into functional_kudu.alltypessmall " 
+
-          "%ssortby(id)%s select * from functional_kudu.alltypes", prefix, 
suffix),
-          "SORTBY hint is not supported in UPSERT statements.");
     }
 
     // Multiple non-conflicting hints and case insensitivity of hints.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
----------------------------------------------------------------------
diff --git 
a/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
index 118b322..2a20c2a 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
@@ -60,12 +60,9 @@ public class AnalyzeUpsertStmtTest extends AnalyzerTest {
         "from functional.alltypes a, functional.allcomplextypes b, " +
         "(select item from b.int_array_col) v1 " +
         "where a.id = b.id");
-    // Hint
-    AnalyzesOk("upsert into table functional_kudu.testtbl [clustered] select * 
from " +
-        "functional_kudu.testtbl");
-    // Incorrect hint, results in warning
-    AnalyzesOk("upsert into table functional_kudu.testtbl [badhint] select * 
from " +
-        "functional_kudu.testtbl", "INSERT hint not recognized: badhint");
+    // Hint, not supported for Kudu tables.
+    AnalysisError("upsert into table functional_kudu.testtbl [clustered] 
select * from " +
+        "functional_kudu.testtbl", "Hints not supported in UPSERT 
statements.");
 
     // Key columns missing from permutation
     AnalysisError("upsert into functional_kudu.testtbl(zip) values(1)",

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
index bbcb014..e04278d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
@@ -7,6 +7,16 @@ UPSERT INTO KUDU [functional_kudu.testtbl]
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=1/24 files=1 size=20.36KB
+---- DISTRIBUTEDPLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+02:SORT
+|  order by: KuduPartition(bigint_col) ASC NULLS LAST, bigint_col ASC NULLS 
LAST
+|
+01:EXCHANGE [KUDU(KuduPartition(bigint_col))]
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=1/24 files=1 size=20.36KB
 ====
 # simple upsert with values clause
 upsert into table functional_kudu.testtbl
@@ -41,6 +51,11 @@ UPSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 UPSERT INTO KUDU [functional_kudu.testtbl]
 |
+08:SORT
+|  order by: KuduPartition(a.bigint_col) ASC NULLS LAST, bigint_col ASC NULLS 
LAST
+|
+07:EXCHANGE [KUDU(KuduPartition(a.bigint_col))]
+|
 03:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: a.string_col = string_col
 |  runtime filters: RF000 <- string_col
@@ -79,14 +94,99 @@ UPSERT INTO KUDU [functional_kudu.testtbl]
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+05:SORT
+|  order by: KuduPartition(id) ASC NULLS LAST, id ASC NULLS LAST
+|
+04:EXCHANGE [KUDU(KuduPartition(id))]
+|
+03:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: id, string_col
+|  having: CAST(count(*) AS INT) < 10
+|
+02:EXCHANGE [HASH(id,string_col)]
+|
+01:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: id, string_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
 ====
-upsert into functional_kudu.testtbl /*+ clustered */
+upsert into functional_kudu.testtbl
 select * from functional_kudu.testtbl
 ---- PLAN
 UPSERT INTO KUDU [functional_kudu.testtbl]
 |
-01:SORT
-|  order by: id ASC NULLS LAST
+00:SCAN KUDU [functional_kudu.testtbl]
+---- DISTRIBUTEDPLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+02:SORT
+|  order by: KuduPartition(functional_kudu.testtbl.id) ASC NULLS LAST, id ASC 
NULLS LAST
+|
+01:EXCHANGE [KUDU(KuduPartition(functional_kudu.testtbl.id))]
 |
 00:SCAN KUDU [functional_kudu.testtbl]
 ====
+# upsert with a union
+upsert into functional_kudu.testtbl select * from functional_kudu.testtbl 
where id % 3 = 0
+union all select * from functional_kudu.testtbl where id % 3 = 1
+---- PLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+00:UNION
+|
+|--02:SCAN KUDU [functional_kudu.testtbl]
+|     predicates: id % 3 = 1
+|
+01:SCAN KUDU [functional_kudu.testtbl]
+   predicates: id % 3 = 0
+---- DISTRIBUTEDPLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+04:SORT
+|  order by: KuduPartition(id) ASC NULLS LAST, id ASC NULLS LAST
+|
+03:EXCHANGE [KUDU(KuduPartition(id))]
+|
+00:UNION
+|
+|--02:SCAN KUDU [functional_kudu.testtbl]
+|     predicates: id % 3 = 1
+|
+01:SCAN KUDU [functional_kudu.testtbl]
+   predicates: id % 3 = 0
+====
+# upsert with agg on col that is already partitioned in the input and target 
table
+# TODO: we shouldn't need to do any repartioning here (IMPALA-5254).
+upsert into functional_kudu.alltypes
+select id, true, 0, 0, 0, 0, 0, 0, '', '', '', 0, 0 from 
functional_kudu.alltypes group by id
+---- PLAN
+UPSERT INTO KUDU [functional_kudu.alltypes]
+|
+01:AGGREGATE [FINALIZE]
+|  group by: id
+|
+00:SCAN KUDU [functional_kudu.alltypes]
+---- DISTRIBUTEDPLAN
+UPSERT INTO KUDU [functional_kudu.alltypes]
+|
+05:SORT
+|  order by: KuduPartition(id) ASC NULLS LAST, id ASC NULLS LAST
+|
+04:EXCHANGE [KUDU(KuduPartition(id))]
+|
+03:AGGREGATE [FINALIZE]
+|  group by: id
+|
+02:EXCHANGE [HASH(id)]
+|
+01:AGGREGATE [STREAMING]
+|  group by: id
+|
+00:SCAN KUDU [functional_kudu.alltypes]
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index a978b8b..12ab9fc 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -50,6 +50,11 @@ INSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
+02:SORT
+|  order by: KuduPartition(10) ASC NULLS LAST, 10 ASC NULLS LAST
+|
+01:EXCHANGE [KUDU(KuduPartition(10))]
+|
 00:UNION
    constant-operands=1
 ====
@@ -61,6 +66,11 @@ INSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
+02:SORT
+|  order by: KuduPartition(int_col) ASC NULLS LAST, int_col ASC NULLS LAST
+|
+01:EXCHANGE [KUDU(KuduPartition(int_col))]
+|
 00:SCAN KUDU [functional_kudu.tinyinttable]
 ====
 insert into functional_kudu.testtbl(id, name)
@@ -80,6 +90,11 @@ INSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
+06:SORT
+|  order by: KuduPartition(count(id)) ASC NULLS LAST, count(id) ASC NULLS LAST
+|
+05:EXCHANGE [KUDU(KuduPartition(count(id)))]
+|
 02:AGGREGATE [FINALIZE]
 |  output: count(id)
 |  group by: name
@@ -240,26 +255,23 @@ PLAN-ROOT SINK
 00:SCAN KUDU [functional_kudu.alltypes]
    kudu predicates: id < 1475059775, id > 1475059665
 ====
-# IMPALA-2521: clustered insert into table adds sort node.
-insert into table functional_kudu.alltypes /*+ clustered */
+insert into table functional_kudu.alltypes
 select * from functional_kudu.alltypes
 ---- PLAN
 INSERT INTO KUDU [functional_kudu.alltypes]
 |
-01:SORT
-|  order by: id ASC NULLS LAST
-|
 00:SCAN KUDU [functional_kudu.alltypes]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.alltypes]
 |
-01:SORT
-|  order by: id ASC NULLS LAST
+02:SORT
+|  order by: KuduPartition(functional_kudu.alltypes.id) ASC NULLS LAST, id ASC 
NULLS LAST
+|
+01:EXCHANGE [KUDU(KuduPartition(functional_kudu.alltypes.id))]
 |
 00:SCAN KUDU [functional_kudu.alltypes]
 ====
-# IMPALA-2521: clustered insert into table adds sort node, correctly 
substituting exprs.
-insert into table functional_kudu.testtbl /*+ clustered */
+insert into table functional_kudu.testtbl
 select id, name, maxzip as zip
 from (
 select id, max(zip) as maxzip, name
@@ -268,9 +280,6 @@ from functional_kudu.testtbl group by id, name
 ---- PLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
-02:SORT
-|  order by: id ASC NULLS LAST
-|
 01:AGGREGATE [FINALIZE]
 |  output: max(zip)
 |  group by: id, name
@@ -279,8 +288,10 @@ INSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
-04:SORT
-|  order by: id ASC NULLS LAST
+05:SORT
+|  order by: KuduPartition(id) ASC NULLS LAST, id ASC NULLS LAST
+|
+04:EXCHANGE [KUDU(KuduPartition(id))]
 |
 03:AGGREGATE [FINALIZE]
 |  output: max:merge(zip)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test 
b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
index f2b12b1..71224ce 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
@@ -278,42 +278,6 @@ NumModifiedRows: 1
 NumRowErrors: 7299
 ====
 ---- QUERY
-# IMPALA-2521: clustered insert into table.
-create table impala_2521
-(id bigint primary key, name string, zip int)
-partition by hash partitions 3 stored as kudu
----- RESULTS
-====
----- QUERY
-insert into impala_2521 /*+ clustered */
-select id, name, maxzip as zip
-from (
-select tinyint_col as id, cast(max(int_col) + 1 as int) as maxzip, string_col 
as name
-from functional_kudu.alltypessmall group by id, name
-) as sub;
----- RESULTS
-: 10
----- RUNTIME_PROFILE
-NumModifiedRows: 10
-NumRowErrors: 0
-====
----- QUERY
-select * from impala_2521
----- RESULTS
-0,'0',1
-1,'1',2
-2,'2',3
-3,'3',4
-4,'4',5
-5,'5',6
-6,'6',7
-7,'7',8
-8,'8',9
-9,'9',10
----- TYPES
-BIGINT,STRING,INT
-====
----- QUERY
 # Table with all supported types as primary key and distribution columns
 create table allkeytypes (i1 tinyint, i2 smallint, i3 int, i4 bigint, name 
string,
   valf float, vald double, primary key (i1, i2, i3, i4, name)) partition by

Reply via email to