IMPALA-2523: Make HdfsTableSink aware of clustered input

IMPALA-2521 introduced clustering for insert statements. This change
makes the HdfsTableSink aware of clustered inputs, so that partitions
are opened, written, and closed one by one.

This change also adds/modifies tests in several ways:

- clustered insert tests switch from selecting all rows from
  alltypessmall to alltypes. Together with varying settings for
  batch_size, this results in a larger number of row batches being
  written.
- clustered insert tests select from alltypes instead of
  functional.alltypes to make sure we also select from various input
  formats.
- clustered insert tests have been added to select from alltypestiny to
  create inserts with 1 and 2 rows per partition respectively.
- exhaustive insert tests now use different values for batch_size: 1,
  16, 0 (meaning default, 1024). This is limited to uncompressed parquet
  files, to maintain a reasonable runtime. On my machine execution of
  test.insert took 1778 seconds, compared to 1002 seconds with the just
  default row batch size.
- There is additional testing in test_insert_behaviour.py to make sure
  that insertion over several row batches only creates one file per
  partition.
- It renames the test_insert method to make it unique in the file and
  allow for effective filtering with -k.
- It adds tests to the Analyzer test suite.

Change-Id: Ibeda0bdabbfe44c8ac95bf7c982a75649e1b82d0
Reviewed-on: http://gerrit.cloudera.org:8080/4863
Reviewed-by: Lars Volker <[email protected]>
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/8ea21d09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/8ea21d09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/8ea21d09

Branch: refs/heads/master
Commit: 8ea21d099fde57bd358cc073017bdcf80c8d74ca
Parents: 178fd59
Author: Lars Volker <[email protected]>
Authored: Tue Oct 25 17:33:27 2016 -0700
Committer: Internal Jenkins <[email protected]>
Committed: Tue Nov 22 02:51:20 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hbase-table-sink.cc                 |   2 +-
 be/src/exec/hbase-table-writer.cc               |   8 +-
 be/src/exec/hbase-table-writer.h                |   4 +-
 be/src/exec/hdfs-avro-table-writer.cc           |   4 +-
 be/src/exec/hdfs-avro-table-writer.h            |   5 +-
 be/src/exec/hdfs-parquet-table-writer.cc        |   4 +-
 be/src/exec/hdfs-parquet-table-writer.h         |   5 +-
 be/src/exec/hdfs-sequence-table-writer.cc       |   5 +-
 be/src/exec/hdfs-sequence-table-writer.h        |   5 +-
 be/src/exec/hdfs-table-sink.cc                  | 232 ++++++++++++-------
 be/src/exec/hdfs-table-sink.h                   |  56 +++--
 be/src/exec/hdfs-table-writer.cc                |   1 +
 be/src/exec/hdfs-table-writer.h                 |  21 +-
 be/src/exec/hdfs-text-table-writer.cc           |   5 +-
 be/src/exec/hdfs-text-table-writer.h            |   4 +-
 bin/impala-config.sh                            |   4 +
 common/thrift/DataSinks.thrift                  |  15 +-
 .../org/apache/impala/analysis/DeleteStmt.java  |   2 +-
 .../org/apache/impala/analysis/InsertStmt.java  |   2 +-
 .../org/apache/impala/analysis/UpdateStmt.java  |   2 +-
 .../apache/impala/planner/HdfsTableSink.java    |   9 +-
 .../org/apache/impala/planner/TableSink.java    |   4 +-
 .../impala/analysis/AnalyzeStmtsTest.java       |  16 ++
 .../queries/QueryTest/insert.test               |  85 ++++++-
 tests/query_test/test_insert.py                 |  10 +-
 tests/query_test/test_insert_behaviour.py       | 108 ++++++++-
 26 files changed, 457 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/be/src/exec/hbase-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc
index a89384e..9c5998b 100644
--- a/be/src/exec/hbase-table-sink.cc
+++ b/be/src/exec/hbase-table-sink.cc
@@ -89,7 +89,7 @@ Status HBaseTableSink::Send(RuntimeState* state, RowBatch* 
batch) {
   ExprContext::FreeLocalAllocations(output_expr_ctxs_);
   RETURN_IF_ERROR(state->CheckQueryState());
   // Since everything is set up just forward everything to the writer.
-  RETURN_IF_ERROR(hbase_table_writer_->AppendRowBatch(batch));
+  RETURN_IF_ERROR(hbase_table_writer_->AppendRows(batch));
   (*state->per_partition_status())[ROOT_PARTITION_KEY].num_modified_rows +=
       batch->num_rows();
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/be/src/exec/hbase-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-writer.cc 
b/be/src/exec/hbase-table-writer.cc
index 41ea184..7574adc 100644
--- a/be/src/exec/hbase-table-writer.cc
+++ b/be/src/exec/hbase-table-writer.cc
@@ -114,7 +114,7 @@ Status HBaseTableWriter::InitJNI() {
   return Status::OK();
 }
 
-Status HBaseTableWriter::AppendRowBatch(RowBatch* batch) {
+Status HBaseTableWriter::AppendRows(RowBatch* batch) {
   JNIEnv* env = getJNIEnv();
   if (env == NULL) return Status("Error getting JNIEnv.");
 
@@ -270,9 +270,9 @@ void HBaseTableWriter::Close(RuntimeState* state) {
     table_.reset();
   }
 
-  // The jni should already have everything cleaned at this point
-  // but try again just in case there was an error that caused
-  // AppendRowBatch to exit out before calling CleanUpJni.
+  // The jni should already have everything cleaned at this point but try 
again just in
+  // case there was an error that caused AppendRows() to exit out before 
calling
+  // CleanUpJni.
   Status status = CleanUpJni();
   if (!status.ok()) {
     stringstream ss;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/be/src/exec/hbase-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-writer.h b/be/src/exec/hbase-table-writer.h
index 94947c9..bc414ab 100644
--- a/be/src/exec/hbase-table-writer.h
+++ b/be/src/exec/hbase-table-writer.h
@@ -41,13 +41,13 @@ class RowBatch;
 ///    HBaseTableWriter::InitJni();
 ///    writer = new HBaseTableWriter(state, table_desc_, output_exprs_);
 ///    writer.Init(state);
-///    writer.AppendRowBatch(batch);
+///    writer.AppendRows(batch);
 class HBaseTableWriter {
  public:
   HBaseTableWriter(HBaseTableDescriptor* table_desc,
                    const std::vector<ExprContext*>& output_expr_ctxs,
                    RuntimeProfile* profile);
-  Status AppendRowBatch(RowBatch* batch);
+  Status AppendRows(RowBatch* batch);
 
   /// Calls to Close release the HBaseTable.
   void Close(RuntimeState* state);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/be/src/exec/hdfs-avro-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-table-writer.cc 
b/be/src/exec/hdfs-avro-table-writer.cc
index 1934304..ec0ee08 100644
--- a/be/src/exec/hdfs-avro-table-writer.cc
+++ b/be/src/exec/hdfs-avro-table-writer.cc
@@ -175,8 +175,8 @@ void HdfsAvroTableWriter::Close() {
   mem_pool_->FreeAll();
 }
 
-Status HdfsAvroTableWriter::AppendRowBatch(RowBatch* batch,
-    const vector<int32_t>& row_group_indices, bool* new_file) {
+Status HdfsAvroTableWriter::AppendRows(
+    RowBatch* batch, const vector<int32_t>& row_group_indices, bool* new_file) 
{
   int32_t limit;
   bool all_rows = row_group_indices.empty();
   if (all_rows) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/be/src/exec/hdfs-avro-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-table-writer.h 
b/be/src/exec/hdfs-avro-table-writer.h
index aeff0ed..38e820e 100644
--- a/be/src/exec/hdfs-avro-table-writer.h
+++ b/be/src/exec/hdfs-avro-table-writer.h
@@ -75,9 +75,8 @@ class HdfsAvroTableWriter : public HdfsTableWriter {
 
   /// Outputs the given rows into an HDFS sequence file. The rows are buffered
   /// to fill a sequence file block.
-  virtual Status AppendRowBatch(RowBatch* rows,
-                                const std::vector<int32_t>& row_group_indices,
-                                bool* new_file);
+  virtual Status AppendRows(
+      RowBatch* rows, const std::vector<int32_t>& row_group_indices, bool* 
new_file);
 
  private:
   /// Processes a single row, appending to out_

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc 
b/be/src/exec/hdfs-parquet-table-writer.cc
index cc708ab..7ae55b7 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -872,8 +872,8 @@ Status HdfsParquetTableWriter::InitNewFile() {
   return Status::OK();
 }
 
-Status HdfsParquetTableWriter::AppendRowBatch(RowBatch* batch,
-    const vector<int32_t>& row_group_indices, bool* new_file) {
+Status HdfsParquetTableWriter::AppendRows(
+    RowBatch* batch, const vector<int32_t>& row_group_indices, bool* new_file) 
{
   SCOPED_TIMER(parent_->encode_timer());
   *new_file = false;
   int limit;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/be/src/exec/hdfs-parquet-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.h 
b/be/src/exec/hdfs-parquet-table-writer.h
index 581160d..4e16707 100644
--- a/be/src/exec/hdfs-parquet-table-writer.h
+++ b/be/src/exec/hdfs-parquet-table-writer.h
@@ -67,9 +67,8 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
   virtual Status InitNewFile();
 
   /// Appends parquet representation of rows in the batch to the current file.
-  virtual Status AppendRowBatch(RowBatch* batch,
-                                const std::vector<int32_t>& row_group_indices,
-                                bool* new_file);
+  virtual Status AppendRows(
+      RowBatch* batch, const std::vector<int32_t>& row_group_indices, bool* 
new_file);
 
   /// Write out all the data.
   virtual Status Finalize();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/be/src/exec/hdfs-sequence-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-table-writer.cc 
b/be/src/exec/hdfs-sequence-table-writer.cc
index 99910e2..3c522ba 100644
--- a/be/src/exec/hdfs-sequence-table-writer.cc
+++ b/be/src/exec/hdfs-sequence-table-writer.cc
@@ -91,9 +91,8 @@ Status HdfsSequenceTableWriter::Init() {
   return Status::OK();
 }
 
-Status HdfsSequenceTableWriter::AppendRowBatch(RowBatch* batch,
-                                           const vector<int32_t>& 
row_group_indices,
-                                           bool* new_file) {
+Status HdfsSequenceTableWriter::AppendRows(
+    RowBatch* batch, const vector<int32_t>& row_group_indices, bool* new_file) 
{
   int32_t limit;
   if (row_group_indices.empty()) {
     limit = batch->num_rows();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/be/src/exec/hdfs-sequence-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-table-writer.h 
b/be/src/exec/hdfs-sequence-table-writer.h
index c94ab1c..7f6a888 100644
--- a/be/src/exec/hdfs-sequence-table-writer.h
+++ b/be/src/exec/hdfs-sequence-table-writer.h
@@ -57,9 +57,8 @@ class HdfsSequenceTableWriter : public HdfsTableWriter {
 
   /// Outputs the given rows into an HDFS sequence file. The rows are buffered
   /// to fill a sequence file block.
-  virtual Status AppendRowBatch(RowBatch* rows,
-                                const std::vector<int32_t>& row_group_indices,
-                                bool* new_file);
+  virtual Status AppendRows(
+      RowBatch* rows, const std::vector<int32_t>& row_group_indices, bool* 
new_file);
 
  private:
   /// processes a single row, delegates to Compress or NoCompress ConsumeRow().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 8c74797..3a9725c 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -56,26 +56,30 @@ const static string& ROOT_PARTITION_KEY =
     g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
 
 HdfsTableSink::HdfsTableSink(const RowDescriptor& row_desc,
-    const vector<TExpr>& select_list_texprs,
-    const TDataSink& tsink)
-    :  DataSink(row_desc),
-       table_desc_(NULL),
-       default_partition_(NULL),
-       current_row_(NULL),
-       table_id_(tsink.table_sink.target_table_id),
-       skip_header_line_count_(
-           tsink.table_sink.hdfs_table_sink.__isset.skip_header_line_count
-           ? tsink.table_sink.hdfs_table_sink.skip_header_line_count : 0),
-       select_list_texprs_(select_list_texprs),
-       
partition_key_texprs_(tsink.table_sink.hdfs_table_sink.partition_key_exprs),
-       overwrite_(tsink.table_sink.hdfs_table_sink.overwrite) {
+    const vector<TExpr>& select_list_texprs, const TDataSink& tsink)
+  : DataSink(row_desc),
+    table_desc_(nullptr),
+    default_partition_(nullptr),
+    table_id_(tsink.table_sink.target_table_id),
+    skip_header_line_count_(
+        tsink.table_sink.hdfs_table_sink.__isset.skip_header_line_count ?
+            tsink.table_sink.hdfs_table_sink.skip_header_line_count :
+            0),
+    select_list_texprs_(select_list_texprs),
+    
partition_key_texprs_(tsink.table_sink.hdfs_table_sink.partition_key_exprs),
+    overwrite_(tsink.table_sink.hdfs_table_sink.overwrite),
+    input_is_clustered_(tsink.table_sink.hdfs_table_sink.input_is_clustered),
+    current_clustered_partition_(nullptr) {
   DCHECK(tsink.__isset.table_sink);
 }
 
 OutputPartition::OutputPartition()
-    : hdfs_connection(NULL), tmp_hdfs_file(NULL), num_rows(0), num_files(0),
-      partition_descriptor(NULL), block_size(0) {
-}
+  : hdfs_connection(nullptr),
+    tmp_hdfs_file(nullptr),
+    num_rows(0),
+    num_files(0),
+    partition_descriptor(nullptr),
+    block_size(0) {}
 
 Status HdfsTableSink::PrepareExprs(RuntimeState* state) {
   // Prepare select list expressions.
@@ -125,7 +129,7 @@ Status HdfsTableSink::Prepare(RuntimeState* state, 
MemTracker* parent_mem_tracke
   table_desc_ = static_cast<const HdfsTableDescriptor*>(
       state->desc_tbl().GetTableDescriptor(table_id_));
 
-  if (table_desc_ == NULL) {
+  if (table_desc_ == nullptr) {
     stringstream error_msg("Failed to get table descriptor for table id: ");
     error_msg << table_id_;
     return Status(error_msg.str());
@@ -192,14 +196,17 @@ Status HdfsTableSink::Open(RuntimeState* state) {
           // partition keys. So only keep a reference to the partition which 
matches
           // partition_key_values for constant values, since only that is 
written to.
           void* table_partition_key_value =
-              partition->partition_key_value_ctxs()[i]->GetValue(NULL);
-          void* target_partition_key_value = 
partition_key_expr_ctxs_[i]->GetValue(NULL);
-          if (table_partition_key_value == NULL && target_partition_key_value 
== NULL) {
+              partition->partition_key_value_ctxs()[i]->GetValue(nullptr);
+          void* target_partition_key_value =
+              partition_key_expr_ctxs_[i]->GetValue(nullptr);
+          if (table_partition_key_value == nullptr
+              && target_partition_key_value == nullptr) {
             continue;
           }
-          if (table_partition_key_value == NULL || target_partition_key_value 
== NULL
+          if (table_partition_key_value == nullptr
+              || target_partition_key_value == nullptr
               || !RawValue::Eq(table_partition_key_value, 
target_partition_key_value,
-                               partition_key_expr_ctxs_[i]->root()->type())) {
+                     partition_key_expr_ctxs_[i]->root()->type())) {
             relevant_partition = false;
             break;
           }
@@ -207,17 +214,16 @@ Status HdfsTableSink::Open(RuntimeState* state) {
       }
       if (relevant_partition) {
         string key;
-        // It's ok if current_row_ is NULL (which it should be here), since 
all of these
-        // expressions are constant, and can therefore be evaluated without a 
valid row
-        // context.
-        GetHashTblKey(dynamic_partition_key_value_ctxs, &key);
+        // Pass nullptr as row, since all of these expressions are constant, 
and can
+        // therefore be evaluated without a valid row context.
+        GetHashTblKey(nullptr, dynamic_partition_key_value_ctxs, &key);
         DCHECK(partition_descriptor_map_.find(key) == 
partition_descriptor_map_.end())
             << "Partitions with duplicate 'static' keys found during INSERT";
         partition_descriptor_map_[key] = partition;
       }
     }
   }
-  if (default_partition_ == NULL) {
+  if (default_partition_ == nullptr) {
     return Status("No default partition found for HdfsTextTableSink");
   }
   return Status::OK();
@@ -262,6 +268,87 @@ void HdfsTableSink::BuildHdfsFileNames(
   output_partition->num_files = 0;
 }
 
+Status HdfsTableSink::WriteRowsToPartition(
+    RuntimeState* state, RowBatch* batch, PartitionPair* partition_pair) {
+  // The rows of this batch may span multiple files. We repeatedly pass the 
row batch to
+  // the writer until it sets new_file to false, indicating that all rows have 
been
+  // written. The writer tracks where it is in the batch when it returns with 
new_file
+  // set.
+  bool new_file;
+  while (true) {
+    OutputPartition* output_partition = partition_pair->first;
+    RETURN_IF_ERROR(
+        output_partition->writer->AppendRows(batch, partition_pair->second, 
&new_file));
+    if (!new_file) break;
+    RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition));
+    RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition));
+  }
+  partition_pair->second.clear();
+  return Status::OK();
+}
+
+Status HdfsTableSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch* 
batch) {
+  DCHECK_GT(batch->num_rows(), 0);
+  DCHECK(!dynamic_partition_key_expr_ctxs_.empty());
+  DCHECK(input_is_clustered_);
+
+  // Initialize the clustered partition and key.
+  if (current_clustered_partition_ == nullptr) {
+    const TupleRow* current_row = batch->GetRow(0);
+    GetHashTblKey(
+        current_row, dynamic_partition_key_expr_ctxs_, 
&current_clustered_partition_key_);
+    RETURN_IF_ERROR(GetOutputPartition(state, current_row,
+        current_clustered_partition_key_, &current_clustered_partition_, 
false));
+  }
+
+  // Compare the last row of the batch to the last current partition key. If 
they match,
+  // then all the rows in the batch have the same key and can be written as a 
whole.
+  string last_row_key;
+  GetHashTblKey(batch->GetRow(batch->num_rows() - 1), 
dynamic_partition_key_expr_ctxs_,
+      &last_row_key);
+  if (last_row_key == current_clustered_partition_key_) {
+    DCHECK(current_clustered_partition_->second.empty());
+    RETURN_IF_ERROR(WriteRowsToPartition(state, batch, 
current_clustered_partition_));
+    return Status::OK();
+  }
+
+  // Not all rows in this batch match the previously written partition key, so 
we process
+  // them individually.
+  for (int i = 0; i < batch->num_rows(); ++i) {
+    const TupleRow* current_row = batch->GetRow(i);
+
+    string key;
+    GetHashTblKey(current_row, dynamic_partition_key_expr_ctxs_, &key);
+
+    if (current_clustered_partition_key_ != key) {
+      DCHECK(current_clustered_partition_ != nullptr);
+      // Done with previous partition - write rows and close.
+      if (!current_clustered_partition_->second.empty()) {
+        RETURN_IF_ERROR(WriteRowsToPartition(state, batch, 
current_clustered_partition_));
+        current_clustered_partition_->second.clear();
+      }
+      RETURN_IF_ERROR(FinalizePartitionFile(state, 
current_clustered_partition_->first));
+      if (current_clustered_partition_->first->writer.get() != nullptr) {
+        current_clustered_partition_->first->writer->Close();
+      }
+      
partition_keys_to_output_partitions_.erase(current_clustered_partition_key_);
+      current_clustered_partition_key_ = std::move(key);
+      RETURN_IF_ERROR(GetOutputPartition(state, current_row,
+          current_clustered_partition_key_, &current_clustered_partition_, 
false));
+    }
+#ifdef DEBUG
+    string debug_row_key;
+    GetHashTblKey(current_row, dynamic_partition_key_expr_ctxs_, 
&debug_row_key);
+    DCHECK_EQ(current_clustered_partition_key_, debug_row_key);
+#endif
+    DCHECK(current_clustered_partition_ != nullptr);
+    current_clustered_partition_->second.push_back(i);
+  }
+  // Write final set of rows to the partition but keep its file open.
+  RETURN_IF_ERROR(WriteRowsToPartition(state, batch, 
current_clustered_partition_));
+  return Status::OK();
+}
+
 Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state,
     OutputPartition* output_partition) {
   SCOPED_TIMER(ADD_TIMER(profile(), "TmpFileCreateTimer"));
@@ -295,7 +382,7 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state,
       tmp_hdfs_file_name_cstr, O_WRONLY, 0, 0, block_size);
 
   VLOG_FILE << "hdfsOpenFile() file=" << tmp_hdfs_file_name_cstr;
-  if (output_partition->tmp_hdfs_file == NULL) {
+  if (output_partition->tmp_hdfs_file == nullptr) {
     return Status(GetHdfsErrorMsg("Failed to open HDFS file for writing: ",
         output_partition->current_file_name));
   }
@@ -310,7 +397,7 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state,
     // files, we get the block size by stat-ing the file.
     hdfsFileInfo* info = hdfsGetPathInfo(output_partition->hdfs_connection,
         output_partition->current_file_name.c_str());
-    if (info == NULL) {
+    if (info == nullptr) {
       return Status(GetHdfsErrorMsg("Failed to get info on temporary HDFS 
file: ",
           output_partition->current_file_name));
     }
@@ -338,16 +425,16 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* 
state,
 }
 
 Status HdfsTableSink::InitOutputPartition(RuntimeState* state,
-    const HdfsPartitionDescriptor& partition_descriptor,
+    const HdfsPartitionDescriptor& partition_descriptor, const TupleRow* row,
     OutputPartition* output_partition, bool empty_partition) {
   // Build the unique name for this partition from the partition keys, e.g. 
"j=1/f=foo/"
   // etc.
   stringstream partition_name_ss;
   for (int j = 0; j < partition_key_expr_ctxs_.size(); ++j) {
     partition_name_ss << table_desc_->col_descs()[j].name() << "=";
-    void* value = partition_key_expr_ctxs_[j]->GetValue(current_row_);
-    // NULL partition keys get a special value to be compatible with Hive.
-    if (value == NULL) {
+    void* value = partition_key_expr_ctxs_[j]->GetValue(row);
+    // nullptr partition keys get a special value to be compatible with Hive.
+    if (value == nullptr) {
       partition_name_ss << table_desc_->null_partition_key_value();
     } else {
       string value_str;
@@ -362,7 +449,7 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* 
state,
       // decoded partition key value.
       string encoded_str;
       UrlEncode(value_str, &encoded_str, true);
-      // If the string is empty, map it to NULL (mimicking Hive's behaviour)
+      // If the string is empty, map it to nullptr (mimicking Hive's behaviour)
       partition_name_ss << (encoded_str.empty() ?
                         table_desc_->null_partition_key_value() : encoded_str);
     }
@@ -459,24 +546,25 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* 
state,
   return CreateNewTmpFile(state, output_partition);
 }
 
-void HdfsTableSink::GetHashTblKey(const vector<ExprContext*>& ctxs, string* 
key) {
+void HdfsTableSink::GetHashTblKey(
+    const TupleRow* row, const vector<ExprContext*>& ctxs, string* key) {
   stringstream hash_table_key;
   for (int i = 0; i < ctxs.size(); ++i) {
     RawValue::PrintValueAsBytes(
-        ctxs[i]->GetValue(current_row_), ctxs[i]->root()->type(), 
&hash_table_key);
+        ctxs[i]->GetValue(row), ctxs[i]->root()->type(), &hash_table_key);
     // Additionally append "/" to avoid accidental key collisions.
     hash_table_key << "/";
   }
   *key = hash_table_key.str();
 }
 
-inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state,
+inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state, const 
TupleRow* row,
     const string& key, PartitionPair** partition_pair, bool no_more_rows) {
+  DCHECK(row != nullptr || key == ROOT_PARTITION_KEY);
   PartitionMap::iterator existing_partition;
   existing_partition = partition_keys_to_output_partitions_.find(key);
   if (existing_partition == partition_keys_to_output_partitions_.end()) {
-    // Create a new OutputPartition, and add it to
-    // partition_keys_to_output_partitions.
+    // Create a new OutputPartition, and add it to 
partition_keys_to_output_partitions.
     const HdfsPartitionDescriptor* partition_descriptor = default_partition_;
     PartitionDescriptorMap::const_iterator it = 
partition_descriptor_map_.find(key);
     if (it != partition_descriptor_map_.end()) {
@@ -484,13 +572,13 @@ inline Status 
HdfsTableSink::GetOutputPartition(RuntimeState* state,
     }
 
     OutputPartition* partition = state->obj_pool()->Add(new OutputPartition());
-    Status status = InitOutputPartition(state, *partition_descriptor, 
partition,
-        no_more_rows);
+    Status status =
+        InitOutputPartition(state, *partition_descriptor, row, partition, 
no_more_rows);
     if (!status.ok()) {
       // We failed to create the output partition successfully. Clean it up now
       // as it is not added to partition_keys_to_output_partitions_ so won't be
       // cleaned up in Close().
-      if (partition->writer.get() != NULL) partition->writer->Close();
+      if (partition->writer.get() != nullptr) partition->writer->Close();
       return status;
     }
 
@@ -533,45 +621,25 @@ Status HdfsTableSink::Send(RuntimeState* state, RowBatch* 
batch) {
     // If there are no dynamic keys just use an empty key.
     PartitionPair* partition_pair;
     RETURN_IF_ERROR(
-        GetOutputPartition(state, ROOT_PARTITION_KEY, &partition_pair, false));
-    // Pass the row batch to the writer. If new_file is returned true then the 
current
-    // file is finalized and a new file is opened.
-    // The writer tracks where it is in the batch when it returns with 
new_file set.
-    OutputPartition* output_partition = partition_pair->first;
-    bool new_file;
-    do {
-      RETURN_IF_ERROR(output_partition->writer->AppendRowBatch(
-          batch, partition_pair->second, &new_file));
-      if (new_file) {
-        RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition));
-        RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition));
-      }
-    } while (new_file);
+        GetOutputPartition(state, nullptr, ROOT_PARTITION_KEY, 
&partition_pair, false));
+    RETURN_IF_ERROR(WriteRowsToPartition(state, batch, partition_pair));
+  } else if (input_is_clustered_) {
+    WriteClusteredRowBatch(state, batch);
   } else {
     for (int i = 0; i < batch->num_rows(); ++i) {
-      current_row_ = batch->GetRow(i);
+      const TupleRow* current_row = batch->GetRow(i);
 
       string key;
-      GetHashTblKey(dynamic_partition_key_expr_ctxs_, &key);
-      PartitionPair* partition_pair = NULL;
-      RETURN_IF_ERROR(GetOutputPartition(state, key, &partition_pair, false));
+      GetHashTblKey(current_row, dynamic_partition_key_expr_ctxs_, &key);
+      PartitionPair* partition_pair = nullptr;
+      RETURN_IF_ERROR(
+          GetOutputPartition(state, current_row, key, &partition_pair, false));
       partition_pair->second.push_back(i);
     }
-    for (PartitionMap::iterator partition = 
partition_keys_to_output_partitions_.begin();
-         partition != partition_keys_to_output_partitions_.end(); ++partition) 
{
-      OutputPartition* output_partition = partition->second.first;
-      if (partition->second.second.empty()) continue;
-
-      bool new_file;
-      do {
-        RETURN_IF_ERROR(output_partition->writer->AppendRowBatch(
-            batch, partition->second.second, &new_file));
-        if (new_file) {
-          RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition));
-          RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition));
-        }
-      } while (new_file);
-      partition->second.second.clear();
+    for (PartitionMap::value_type& partition : 
partition_keys_to_output_partitions_) {
+      if (!partition.second.second.empty()) {
+        RETURN_IF_ERROR(WriteRowsToPartition(state, batch, &partition.second));
+      }
     }
   }
   return Status::OK();
@@ -579,11 +647,11 @@ Status HdfsTableSink::Send(RuntimeState* state, RowBatch* 
batch) {
 
 Status HdfsTableSink::FinalizePartitionFile(RuntimeState* state,
                                             OutputPartition* partition) {
-  if (partition->tmp_hdfs_file == NULL && !overwrite_) return Status::OK();
+  if (partition->tmp_hdfs_file == nullptr && !overwrite_) return Status::OK();
   SCOPED_TIMER(ADD_TIMER(profile(), "FinalizePartitionFileTimer"));
 
-  // OutputPartition writer could be NULL if there is no row to output.
-  if (partition->writer.get() != NULL) {
+  // OutputPartition writer could be nullptr if there is no row to output.
+  if (partition->writer.get() != nullptr) {
     RETURN_IF_ERROR(partition->writer->Finalize());
 
     // Track total number of appended rows per partition in runtime
@@ -604,10 +672,10 @@ Status HdfsTableSink::FinalizePartitionFile(RuntimeState* 
state,
 
 Status HdfsTableSink::ClosePartitionFile(
     RuntimeState* state, OutputPartition* partition) {
-  if (partition->tmp_hdfs_file == NULL) return Status::OK();
+  if (partition->tmp_hdfs_file == nullptr) return Status::OK();
   int hdfs_ret = hdfsCloseFile(partition->hdfs_connection, 
partition->tmp_hdfs_file);
   VLOG_FILE << "hdfsCloseFile() file=" << partition->current_file_name;
-  partition->tmp_hdfs_file = NULL;
+  partition->tmp_hdfs_file = nullptr;
   ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT->Increment(-1);
   if (hdfs_ret != 0) {
     return Status(ErrorMsg(TErrorCode::GENERAL,
@@ -625,7 +693,7 @@ Status HdfsTableSink::FlushFinal(RuntimeState* state) {
     // Make sure we create an output partition even if the input is empty 
because we need
     // it to delete the existing data for 'insert overwrite'.
     PartitionPair* dummy;
-    RETURN_IF_ERROR(GetOutputPartition(state, ROOT_PARTITION_KEY, &dummy, 
true));
+    RETURN_IF_ERROR(GetOutputPartition(state, nullptr, ROOT_PARTITION_KEY, 
&dummy, true));
   }
 
   // Close Hdfs files, and update stats in runtime state.
@@ -646,7 +714,7 @@ void HdfsTableSink::Close(RuntimeState* state) {
           partition_keys_to_output_partitions_.begin();
       cur_partition != partition_keys_to_output_partitions_.end();
       ++cur_partition) {
-    if (cur_partition->second.first->writer.get() != NULL) {
+    if (cur_partition->second.first->writer.get() != nullptr) {
       cur_partition->second.first->writer->Close();
     }
     Status close_status = ClosePartitionFile(state, 
cur_partition->second.first);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/be/src/exec/hdfs-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index ae32270..22b6a44 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -165,10 +165,11 @@ class HdfsTableSink : public DataSink {
 
  private:
   /// Initialises the filenames of a given output partition, and opens the 
temporary file.
-  /// If the partition will not have any rows added to it, empty_partition 
must be true.
+  /// The partition key is derived from 'row'. If the partition will not have 
any rows
+  /// added to it, empty_partition must be true.
   Status InitOutputPartition(RuntimeState* state,
-                             const HdfsPartitionDescriptor& 
partition_descriptor,
-                             OutputPartition* output_partition, bool 
empty_partition);
+      const HdfsPartitionDescriptor& partition_descriptor, const TupleRow* row,
+      OutputPartition* output_partition, bool empty_partition);
 
   /// Add a temporary file to an output partition.  Files are created in a
   /// temporary directory and then moved to the real partition directory by the
@@ -177,24 +178,23 @@ class HdfsTableSink : public DataSink {
   /// If this function fails, the tmp file is cleaned up.
   Status CreateNewTmpFile(RuntimeState* state, OutputPartition* 
output_partition);
 
-  /// Key is the concatenation of the evaluated
-  /// dynamic_partition_key_exprs_ generated by GetHashTblKey().
-  /// Maps to an OutputPartition, which are owned by the object pool and
-  /// a vector of rows to insert into this partition from the current row 
batch.
+  /// Key is the concatenation of the evaluated dynamic_partition_key_exprs_ 
generated by
+  /// GetHashTblKey(). Maps to an OutputPartition, which are owned by the 
object pool, and
+  /// a vector of indices of the rows in the current batch to insert into the 
partition.
   typedef std::pair<OutputPartition*, std::vector<int32_t>> PartitionPair;
   typedef boost::unordered_map<std::string, PartitionPair> PartitionMap;
 
-
-  /// Generates string key for hash_tbl_ as a concatenation
-  /// of all evaluated exprs, evaluated against current_row_.
-  /// The generated string is much shorter than the full Hdfs file name.
-  void GetHashTblKey(const std::vector<ExprContext*>& ctxs, std::string* key);
+  /// Generates string key for hash_tbl_ as a concatenation of all evaluated 
exprs,
+  /// evaluated against 'row'. The generated string is much shorter than the 
full Hdfs
+  /// file name.
+  void GetHashTblKey(
+      const TupleRow* row, const std::vector<ExprContext*>& ctxs, std::string* 
key);
 
   /// Given a hashed partition key, get the output partition structure from
-  /// the partition_keys_to_output_partitions_.
-  /// no_more_rows indicates that no more rows will be added to the partition.
-  Status GetOutputPartition(RuntimeState* state, const std::string& key,
-      PartitionPair** partition_pair, bool no_more_rows);
+  /// the 'partition_keys_to_output_partitions_'. 'no_more_rows' indicates 
that no more
+  /// rows will be added to the partition.
+  Status GetOutputPartition(RuntimeState* state, const TupleRow* row,
+      const std::string& key, PartitionPair** partition_pair, bool 
no_more_rows);
 
   /// Initialise and prepare select and partition key expressions
   Status PrepareExprs(RuntimeState* state);
@@ -206,6 +206,15 @@ class HdfsTableSink : public DataSink {
   void BuildHdfsFileNames(const HdfsPartitionDescriptor& partition_descriptor,
       OutputPartition* output);
 
+  /// Writes all rows referenced by the row index vector in 'partition_pair' 
to the
+  /// partition's writer and clears the row index vector afterwards.
+  Status WriteRowsToPartition(
+      RuntimeState* state, RowBatch* batch, PartitionPair* partition_pair);
+
+  /// Maps all rows in 'batch' to partitions and appends them to their 
temporary Hdfs
+  /// files. The input must be ordered by the partition key expressions.
+  Status WriteClusteredRowBatch(RuntimeState* state, RowBatch* batch);
+
   /// Updates runtime stats of HDFS with rows written, then closes the file 
associated
   /// with the partition by calling ClosePartitionFile()
   Status FinalizePartitionFile(RuntimeState* state, OutputPartition* 
partition);
@@ -230,9 +239,6 @@ class HdfsTableSink : public DataSink {
   /// Exprs that materialize output values
   std::vector<ExprContext*> output_expr_ctxs_;
 
-  /// Current row from the current RowBatch to output
-  TupleRow* current_row_;
-
   /// Table id resolved in Prepare() to set tuple_desc_;
   TableId table_id_;
 
@@ -255,6 +261,18 @@ class HdfsTableSink : public DataSink {
   /// Indicates whether the existing partitions should be overwritten.
   bool overwrite_;
 
+  /// Indicates whether the input is ordered by the partition keys, meaning 
partitions can
+  /// be opened, written, and closed one by one.
+  bool input_is_clustered_;
+
+  /// Stores the current partition during clustered inserts across subsequent 
row batches.
+  /// Only set if 'input_is_clustered_' is true.
+  PartitionPair* current_clustered_partition_;
+
+  /// Stores the current partition key during clustered inserts across 
subsequent row
+  /// batches. Only set if 'input_is_clustered_' is true.
+  std::string current_clustered_partition_key_;
+
   /// The directory in which to write intermediate results. Set to
   /// <hdfs_table_base_dir>/_impala_insert_staging/ during Prepare()
   std::string staging_dir_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/be/src/exec/hdfs-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-writer.cc b/be/src/exec/hdfs-table-writer.cc
index 71e9278..48349e3 100644
--- a/be/src/exec/hdfs-table-writer.cc
+++ b/be/src/exec/hdfs-table-writer.cc
@@ -39,6 +39,7 @@ HdfsTableWriter::HdfsTableWriter(HdfsTableSink* parent,
 
 Status HdfsTableWriter::Write(const uint8_t* data, int32_t len) {
   DCHECK_GE(len, 0);
+  DCHECK(output_->tmp_hdfs_file != nullptr);
   int ret = hdfsWrite(output_->hdfs_connection, output_->tmp_hdfs_file, data, 
len);
   if (ret == -1) {
     string error_msg = GetHdfsErrorMsg("");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/be/src/exec/hdfs-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-writer.h b/be/src/exec/hdfs-table-writer.h
index b3d344f..8066315 100644
--- a/be/src/exec/hdfs-table-writer.h
+++ b/be/src/exec/hdfs-table-writer.h
@@ -52,7 +52,7 @@ class HdfsTableWriter {
   /// The sequence of calls to this object are:
   /// 1. Init()
   /// 2. InitNewFile()
-  /// 3. AppendRowBatch() - called repeatedly
+  /// 3. AppendRows() - called repeatedly
   /// 4. Finalize()
   /// For files formats that are splittable (and therefore can be written to an
   /// arbitrarily large file), 1-4 is called once.
@@ -65,17 +65,14 @@ class HdfsTableWriter {
   /// Called when a new file is started.
   virtual Status InitNewFile() = 0;
 
-  /// Appends the current batch of rows to the partition.  If there are 
multiple
-  /// partitions then row_group_indices will contain the rows that are for this
-  /// partition, otherwise all rows in the batch are appended.
-  /// If the current file is full, the writer stops appending and
-  /// returns with *new_file == true.  A new file will be opened and
-  /// the same row batch will be passed again.  The writer must track how
-  /// much of the batch it had already processed asking for a new file.
-  /// Otherwise the writer will return with *newfile == false.
-  virtual Status AppendRowBatch(RowBatch* batch,
-                                const std::vector<int32_t>& row_group_indices,
-                                bool* new_file) = 0;
+  /// Appends rows of 'batch' to the partition that are selected via 
'row_group_indices',
+  /// and if the latter is empty, appends every row.
+  /// If the current file is full, the writer stops appending and returns with
+  /// *new_file == true. A new file will be opened and the same row batch will 
be passed
+  /// again. The writer must track how much of the batch it had already 
processed asking
+  /// for a new file. Otherwise the writer will return with *newfile == false.
+  virtual Status AppendRows(
+      RowBatch* batch, const std::vector<int32_t>& row_group_indices, bool* 
new_file) = 0;
 
   /// Finalize this partition. The writer needs to finish processing
   /// all data have written out after the return from this call.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/be/src/exec/hdfs-text-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-table-writer.cc 
b/be/src/exec/hdfs-text-table-writer.cc
index 053c821..cba4032 100644
--- a/be/src/exec/hdfs-text-table-writer.cc
+++ b/be/src/exec/hdfs-text-table-writer.cc
@@ -99,9 +99,8 @@ string HdfsTextTableWriter::file_extension() const {
   return compressor_->file_extension();
 }
 
-Status HdfsTextTableWriter::AppendRowBatch(RowBatch* batch,
-                                           const vector<int32_t>& 
row_group_indices,
-                                           bool* new_file) {
+Status HdfsTextTableWriter::AppendRows(
+    RowBatch* batch, const vector<int32_t>& row_group_indices, bool* new_file) 
{
   int32_t limit;
   if (row_group_indices.empty()) {
     limit = batch->num_rows();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/be/src/exec/hdfs-text-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-table-writer.h 
b/be/src/exec/hdfs-text-table-writer.h
index 1d65f11..2944f23 100644
--- a/be/src/exec/hdfs-text-table-writer.h
+++ b/be/src/exec/hdfs-text-table-writer.h
@@ -60,8 +60,8 @@ class HdfsTextTableWriter : public HdfsTableWriter {
   /// Appends delimited string representation of the rows in the batch to 
output partition.
   /// The resulting output is buffered until HDFS_FLUSH_WRITE_SIZE before 
being written
   /// to HDFS.
-  Status AppendRowBatch(RowBatch* current_row,
-                        const std::vector<int32_t>& row_group_indices, bool* 
new_file);
+  Status AppendRows(RowBatch* current_row, const std::vector<int32_t>& 
row_group_indices,
+      bool* new_file);
 
  private:
   /// Escapes occurrences of field_delim_ and escape_char_ with escape_char_ 
and

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index caae5e9..834f216 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -197,6 +197,10 @@ elif [ "${TARGET_FILESYSTEM}" = "isilon" ]; then
   # isilon manages its own replication.
   export HDFS_REPLICATION=1
 elif [ "${TARGET_FILESYSTEM}" = "local" ]; then
+  if [[ "${WAREHOUSE_LOCATION_PREFIX}" = "" ]]; then
+    echo "WAREHOUSE_LOCATION_PREFIX cannot be an empty string for local 
filesystem"
+    return 1
+  fi
   if [ ! -d "${WAREHOUSE_LOCATION_PREFIX}" ]; then
     echo "'$WAREHOUSE_LOCATION_PREFIX' is not a directory on the local 
filesystem."
     return 1

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/common/thrift/DataSinks.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index 2a57304..0b136b2 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -62,10 +62,14 @@ struct THdfsTableSink {
   1: required list<Exprs.TExpr> partition_key_exprs
   2: required bool overwrite
 
-  /// The 'skip.header.line.count' property of the target Hdfs table. We will 
insert this
-  /// many empty lines at the beginning of new text files, which will be 
skipped by the
-  /// scanners while reading from the files.
+  // The 'skip.header.line.count' property of the target Hdfs table. We will 
insert this
+  // many empty lines at the beginning of new text files, which will be 
skipped by the
+  // scanners while reading from the files.
   3: optional i32 skip_header_line_count
+
+  // This property indicates to the table sink whether the input is ordered by 
the
+  // partition keys, meaning partitions can be opened, written, and closed one 
by one.
+  4: required bool input_is_clustered
 }
 
 // Structure to encapsulate specific options that are passed down to the 
KuduTableSink
@@ -73,7 +77,10 @@ struct TKuduTableSink {
   // The position in this vector is equal to the position in the output 
expressions of the
   // sink and holds the index of the corresponsding column in the Kudu schema,
   // e.g. 'exprs[i]' references 'kudu_table.column(referenced_cols[i])'
-  1: optional list<i32> referenced_columns;
+  1: optional list<i32> referenced_columns
+
+  // Defines if duplicate or not found keys should be ignored
+  2: optional bool ignore_not_found_or_duplicate
 }
 
 // Sink to create the build side of a JoinNode.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
index 9d84baa..2f7f670 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
@@ -56,7 +56,7 @@ public class DeleteStmt extends ModifyStmt {
     // analyze() must have been called before.
     Preconditions.checkState(table_ != null);
     TableSink tableSink = TableSink.create(table_, TableSink.Op.DELETE,
-        ImmutableList.<Expr>of(), referencedColumns_, false);
+        ImmutableList.<Expr>of(), referencedColumns_, false, false);
     Preconditions.checkState(!referencedColumns_.isEmpty());
     return tableSink;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index 87f7cef..5528da9 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -793,7 +793,7 @@ public class InsertStmt extends StatementBase {
     // analyze() must have been called before.
     Preconditions.checkState(table_ != null);
     return TableSink.create(table_, isUpsert_ ? TableSink.Op.UPSERT : 
TableSink.Op.INSERT,
-        partitionKeyExprs_, mentionedColumns_, overwrite_);
+        partitionKeyExprs_, mentionedColumns_, overwrite_, hasClusteredHint_);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
index 9a1bc9e..de74bd8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
@@ -65,7 +65,7 @@ public class UpdateStmt extends ModifyStmt {
     // analyze() must have been called before.
     Preconditions.checkState(table_ != null);
     DataSink dataSink = TableSink.create(table_, TableSink.Op.UPDATE,
-        ImmutableList.<Expr>of(), referencedColumns_, false);
+        ImmutableList.<Expr>of(), referencedColumns_, false, false);
     Preconditions.checkState(!referencedColumns_.isEmpty());
     return dataSink;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index bb06b5e..fc7f9b1 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -46,12 +46,17 @@ public class HdfsTableSink extends TableSink {
   // Whether to overwrite the existing partition(s).
   protected final boolean overwrite_;
 
+  // Indicates whether the input is ordered by the partition keys, meaning 
partitions can
+  // be opened, written, and closed one by one.
+  protected final boolean inputIsClustered_;
+
   public HdfsTableSink(Table targetTable, List<Expr> partitionKeyExprs,
-      boolean overwrite) {
+      boolean overwrite, boolean inputIsClustered) {
     super(targetTable, Op.INSERT);
     Preconditions.checkState(targetTable instanceof HdfsTable);
     partitionKeyExprs_ = partitionKeyExprs;
     overwrite_ = overwrite;
+    inputIsClustered_ = inputIsClustered;
   }
 
   @Override
@@ -140,7 +145,7 @@ public class HdfsTableSink extends TableSink {
   protected TDataSink toThrift() {
     TDataSink result = new TDataSink(TDataSinkType.TABLE_SINK);
     THdfsTableSink hdfsTableSink = new THdfsTableSink(
-        Expr.treesToThrift(partitionKeyExprs_), overwrite_);
+        Expr.treesToThrift(partitionKeyExprs_), overwrite_, inputIsClustered_);
     HdfsTable table = (HdfsTable) targetTable_;
     StringBuilder error = new StringBuilder();
     int skipHeaderLineCount = table.parseSkipHeaderLineCount(error);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/fe/src/main/java/org/apache/impala/planner/TableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java 
b/fe/src/main/java/org/apache/impala/planner/TableSink.java
index 742e6c9..fb3cea2 100644
--- a/fe/src/main/java/org/apache/impala/planner/TableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java
@@ -90,13 +90,13 @@ public abstract class TableSink extends DataSink {
    */
   public static TableSink create(Table table, Op sinkAction,
       List<Expr> partitionKeyExprs,  List<Integer> referencedColumns,
-      boolean overwrite) {
+      boolean overwrite, boolean inputIsClustered) {
     if (table instanceof HdfsTable) {
       // Hdfs only supports inserts.
       Preconditions.checkState(sinkAction == Op.INSERT);
       // Referenced columns don't make sense for an Hdfs table.
       Preconditions.checkState(referencedColumns.isEmpty());
-      return new HdfsTableSink(table, partitionKeyExprs, overwrite);
+      return new HdfsTableSink(table, partitionKeyExprs, overwrite, 
inputIsClustered);
     } else if (table instanceof HBaseTable) {
       // HBase only supports inserts.
       Preconditions.checkState(sinkAction == Op.INSERT);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index cacdd83..11f6842 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -1750,6 +1750,22 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
           "partition (year, month) /* +shuffle,noshuffle */ " +
           "select * from functional.alltypes",
           "Conflicting INSERT hints: shuffle and noshuffle");
+
+      // Test clustered hint.
+      AnalyzesOk(String.format(
+          "insert into functional.alltypessmall partition (year, month) 
%sclustered%s " +
+          "select * from functional.alltypes", prefix, suffix));
+      AnalyzesOk(String.format(
+          "insert into table functional.alltypesnopart %sclustered%s " +
+          "select * from functional.alltypesnopart", prefix, suffix));
+      AnalyzesOk(String.format(
+          "insert into table functional.alltypesnopart %snoclustered%s " +
+          "select * from functional.alltypesnopart", prefix, suffix));
+      // Conflicting clustered hints.
+      AnalysisError(String.format(
+          "insert into table functional.alltypessmall partition (year, month) 
" +
+          "/* +clustered,noclustered */ select * from functional.alltypes", 
prefix,
+          suffix), "Conflicting INSERT hints: clustered and noclustered");
     }
 
     // Multiple non-conflicting hints and case insensitivity of hints.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/testdata/workloads/functional-query/queries/QueryTest/insert.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/insert.test 
b/testdata/workloads/functional-query/queries/QueryTest/insert.test
index b0c5c7c..5601b35 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/insert.test
@@ -861,14 +861,87 @@ Memory limit exceeded
 ---- QUERY
 # IMPALA-2521: clustered insert into table
 insert into table alltypesinsert
-partition (year, month) /*+ clustered */
-select * from functional.alltypessmall;
+partition (year, month) /*+ clustered,shuffle */
+select * from alltypes;
 ---- SETUP
 DROP PARTITIONS alltypesinsert
 RESET alltypesinsert
 ---- RESULTS
-year=2009/month=1/: 25
-year=2009/month=2/: 25
-year=2009/month=3/: 25
-year=2009/month=4/: 25
+year=2009/month=1/: 310
+year=2009/month=10/: 310
+year=2009/month=11/: 300
+year=2009/month=12/: 310
+year=2009/month=2/: 280
+year=2009/month=3/: 310
+year=2009/month=4/: 300
+year=2009/month=5/: 310
+year=2009/month=6/: 300
+year=2009/month=7/: 310
+year=2009/month=8/: 310
+year=2009/month=9/: 300
+year=2010/month=1/: 310
+year=2010/month=10/: 310
+year=2010/month=11/: 300
+year=2010/month=12/: 310
+year=2010/month=2/: 280
+year=2010/month=3/: 310
+year=2010/month=4/: 300
+year=2010/month=5/: 310
+year=2010/month=6/: 300
+year=2010/month=7/: 310
+year=2010/month=8/: 310
+year=2010/month=9/: 300
+====
+---- QUERY
+# IMPALA-2521: clustered insert into table
+insert into table alltypesinsert
+partition (year, month) /*+ clustered,shuffle */
+select * from alltypestiny;
+---- SETUP
+DROP PARTITIONS alltypesinsert
+RESET alltypesinsert
+---- RESULTS
+year=2009/month=1/: 2
+year=2009/month=2/: 2
+year=2009/month=3/: 2
+year=2009/month=4/: 2
+====
+---- QUERY
+# IMPALA-2521: clustered insert into table
+insert into table alltypesinsert
+partition (year, month) /*+ clustered,noshuffle */
+select * from alltypestiny;
+---- SETUP
+DROP PARTITIONS alltypesinsert
+RESET alltypesinsert
+---- RESULTS
+year=2009/month=1/: 2
+year=2009/month=2/: 2
+year=2009/month=3/: 2
+year=2009/month=4/: 2
+====
+---- QUERY
+# IMPALA-2521: clustered insert into table
+insert into table alltypesinsert
+partition (year, month) /*+ clustered,shuffle */
+select * from alltypestiny where int_col = 0;
+---- SETUP
+DROP PARTITIONS alltypesinsert
+RESET alltypesinsert
+---- RESULTS
+year=2009/month=1/: 1
+year=2009/month=2/: 1
+year=2009/month=3/: 1
+year=2009/month=4/: 1
+====
+---- QUERY
+# IMPALA-2521: clustered, unpartitioned insert into table
+insert into table alltypesnopart_insert
+ /*+ clustered,shuffle */
+select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col,
+double_col, date_string_col, string_col, timestamp_col from alltypessmall;
+---- SETUP
+RESET alltypesnopart_insert
+---- RESULTS
+: 100
 ====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/tests/query_test/test_insert.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py
index e5e74ed..d195050 100644
--- a/tests/query_test/test_insert.py
+++ b/tests/query_test/test_insert.py
@@ -55,7 +55,7 @@ class TestInsertQueries(ImpalaTestSuite):
       
cls.TestMatrix.add_dimension(create_uncompressed_text_dimension(cls.get_workload()))
     else:
       cls.TestMatrix.add_dimension(create_exec_option_dimension(
-          cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0],
+          cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0, 
1, 16],
           sync_ddl=[0, 1]))
       cls.TestMatrix.add_dimension(TestDimension("compression_codec", 
*PARQUET_CODECS));
       # Insert is currently only supported for text and parquet
@@ -68,6 +68,12 @@ class TestInsertQueries(ImpalaTestSuite):
             v.get_value('compression_codec') == 'none'))
       cls.TestMatrix.add_constraint(lambda v:\
           v.get_value('table_format').compression_codec == 'none')
+      # Only test other batch sizes for uncompressed parquet to keep the 
execution time
+      # within reasonable bounds.
+      cls.TestMatrix.add_constraint(lambda v:\
+          v.get_value('exec_option')['batch_size'] == 0 or \
+            (v.get_value('table_format').file_format == 'parquet' and \
+            v.get_value('compression_codec') == 'none'))
 
   def test_insert_large_string(self, vector, unique_database):
     """Test handling of large strings in inserter and scanner."""
@@ -103,7 +109,7 @@ class TestInsertQueries(ImpalaTestSuite):
     super(TestInsertQueries, cls).setup_class()
 
   @pytest.mark.execute_serially
-  def test_insert(self, vector):
+  def test_insert_test(self, vector):
     if (vector.get_value('table_format').file_format == 'parquet'):
       vector.get_value('exec_option')['COMPRESSION_CODEC'] = \
           vector.get_value('compression_codec')

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ea21d09/tests/query_test/test_insert_behaviour.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_insert_behaviour.py 
b/tests/query_test/test_insert_behaviour.py
index 53c9dbf..1584343 100644
--- a/tests/query_test/test_insert_behaviour.py
+++ b/tests/query_test/test_insert_behaviour.py
@@ -23,11 +23,14 @@ import pytest
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.parametrize import UniqueDatabase
 from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal
-from tests.util.filesystem_utils import WAREHOUSE, get_fs_path
+from tests.util.filesystem_utils import WAREHOUSE, get_fs_path, IS_S3
 
 @SkipIfLocal.hdfs_client
 class TestInsertBehaviour(ImpalaTestSuite):
   """Tests for INSERT behaviour that isn't covered by checking query results"""
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
 
   TEST_DB_NAME = "insert_empty_result_db"
 
@@ -473,3 +476,106 @@ class TestInsertBehaviour(ImpalaTestSuite):
         "other::---".format(groups[0]))
     self.execute_query_expect_success(self.client, "REFRESH " + table)
     self.execute_query_expect_failure(self.client, insert_query)
+
+  def test_clustered_partition_single_file(self, unique_database):
+    """IMPALA-2523: Tests that clustered insert creates one file per 
partition, even when
+    inserting over multiple row batches."""
+    # On s3 this test takes about 220 seconds and we are unlikely to break it, 
so only run
+    # it in exhaustive strategy.
+    if self.exploration_strategy() != 'exhaustive' and IS_S3:
+      pytest.skip("only runs in exhaustive")
+    table = "{0}.insert_clustered".format(unique_database)
+    table_path = 
"test-warehouse/{0}.db/insert_clustered".format(unique_database)
+    table_location = get_fs_path("/" + table_path)
+
+    create_stmt = """create table {0} like functional.alltypes""".format(table)
+    self.execute_query_expect_success(self.client, create_stmt)
+
+    set_location_stmt = """alter table {0} set location '{1}'""".format(
+        table, table_location)
+    self.execute_query_expect_success(self.client, set_location_stmt)
+
+    # Setting a lower batch size will result in multiple row batches being 
written.
+    self.execute_query_expect_success(self.client, "set batch_size=10")
+
+    insert_stmt = """insert into {0} partition(year, month) /*+ 
clustered,shuffle */
+                     select * from functional.alltypes""".format(table)
+    self.execute_query_expect_success(self.client, insert_stmt)
+
+    # We expect exactly one partition per year and month, since subsequent row 
batches of
+    # a partition will be written into the same file.
+    expected_partitions = \
+        ["year=%s/month=%s" % (y, m) for y in [2009, 2010] for m in 
range(1,13)]
+
+    for partition in expected_partitions:
+      partition_path = "{0}/{1}".format(table_path, partition)
+      files = self.filesystem_client.ls(partition_path)
+      assert len(files) == 1, "%s: %s" % (partition, files)
+
+  def test_clustered_partition_multiple_files(self, unique_database):
+    """IMPALA-2523: Tests that clustered insert creates the right number of 
files per
+    partition when inserting over multiple row batches."""
+    # This test takes about 30 seconds and we are unlikely to break it, so 
only run it in
+    # exhaustive strategy.
+    if self.exploration_strategy() != 'exhaustive':
+      pytest.skip("only runs in exhaustive")
+    table = "{0}.insert_clustered".format(unique_database)
+    table_path = 
"test-warehouse/{0}.db/insert_clustered".format(unique_database)
+    table_location = get_fs_path("/" + table_path)
+
+    create_stmt = """create table {0} (
+                     l_orderkey BIGINT,
+                     l_partkey BIGINT,
+                     l_suppkey BIGINT,
+                     l_linenumber INT,
+                     l_quantity DECIMAL(12,2),
+                     l_extendedprice DECIMAL(12,2),
+                     l_discount DECIMAL(12,2),
+                     l_tax DECIMAL(12,2),
+                     l_linestatus STRING,
+                     l_shipdate STRING,
+                     l_commitdate STRING,
+                     l_receiptdate STRING,
+                     l_shipinstruct STRING,
+                     l_shipmode STRING,
+                     l_comment STRING)
+                     partitioned by (l_returnflag STRING) stored as parquet
+                     """.format(table)
+    self.execute_query_expect_success(self.client, create_stmt)
+
+    set_location_stmt = """alter table {0} set location '{1}'""".format(
+        table, table_location)
+    self.execute_query_expect_success(self.client, set_location_stmt)
+
+    # Setting a lower parquet file size will result in multiple files being 
written.
+    self.execute_query_expect_success(self.client, "set 
parquet_file_size=10485760")
+
+    insert_stmt = """insert into {0} partition(l_returnflag) /*+ 
clustered,shuffle */
+        select l_orderkey,
+               l_partkey,
+               l_suppkey,
+               l_linenumber,
+               l_quantity,
+               l_extendedprice,
+               l_discount,
+               l_tax,
+               l_linestatus,
+               l_shipdate,
+               l_commitdate,
+               l_receiptdate,
+               l_shipinstruct,
+               l_shipmode,
+               l_comment,
+               l_returnflag
+               from tpch_parquet.lineitem""".format(table)
+    self.execute_query_expect_success(self.client, insert_stmt)
+
+    expected_partition_files = [("l_returnflag=A", 3, 30),
+                                ("l_returnflag=N", 3, 30),
+                                ("l_returnflag=R", 3, 30)]
+
+    for partition, min_files, max_files in expected_partition_files:
+      partition_path = "{0}/{1}".format(table_path, partition)
+      files = self.filesystem_client.ls(partition_path)
+      assert min_files <= len(files) and len(files) <= max_files, \
+          "%s: %s" % (partition, files)

Reply via email to