IMPALA-5384, part 1: introduce DmlExecState

This change is based on a patch by Marcel Kornacker.

Move data structures that collect DML operation stats from the
RuntimeState and Coordinator into a new InsertExecState class, which
has it's own lock.  This removes a dependency on the coordinator's
lock, which will allow further coordinator locking cleanup in the next
patch.

Change-Id: Id4c025917620a7bff2acbeb46464f107ab4b7565
Reviewed-on: http://gerrit.cloudera.org:8080/9793
Reviewed-by: Dan Hecht <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/408ee4d6
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/408ee4d6
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/408ee4d6

Branch: refs/heads/master
Commit: 408ee4d68cbb7217eba4b020064a1e878e412ce8
Parents: b78daed
Author: Dan Hecht <[email protected]>
Authored: Fri Mar 23 16:28:27 2018 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Mar 29 06:29:16 2018 +0000

----------------------------------------------------------------------
 be/src/benchmarks/expr-benchmark.cc         |   1 +
 be/src/exec/catalog-op-executor.cc          |   1 +
 be/src/exec/data-sink.cc                    |  63 ---
 be/src/exec/data-sink.h                     |   9 -
 be/src/exec/hbase-table-sink.cc             |  11 +-
 be/src/exec/hdfs-table-sink.cc              |  38 +-
 be/src/exec/kudu-table-sink.cc              |  22 +-
 be/src/exec/plan-root-sink.cc               |   1 +
 be/src/runtime/CMakeLists.txt               |   1 +
 be/src/runtime/coordinator-backend-state.cc |  11 +-
 be/src/runtime/coordinator-backend-state.h  |  18 +-
 be/src/runtime/coordinator.cc               | 361 ++---------------
 be/src/runtime/coordinator.h                |  84 +---
 be/src/runtime/dml-exec-state.cc            | 494 +++++++++++++++++++++++
 be/src/runtime/dml-exec-state.h             | 149 +++++++
 be/src/runtime/query-state.cc               |  13 +-
 be/src/runtime/runtime-filter-bank.cc       |   1 +
 be/src/runtime/runtime-state.h              |  29 +-
 be/src/service/client-request-state.cc      |  10 +-
 be/src/service/client-request-state.h       |   1 +
 be/src/service/impala-beeswax-server.cc     |  18 +-
 be/src/service/impala-hs2-server.cc         |   1 +
 be/src/service/impala-http-handler.cc       |   2 +
 be/src/service/impala-server.cc             |   2 +
 be/src/service/impala-server.h              |   3 -
 be/src/testutil/in-process-servers.cc       |   1 +
 26 files changed, 750 insertions(+), 595 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/benchmarks/expr-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/expr-benchmark.cc 
b/be/src/benchmarks/expr-benchmark.cc
index b10a70f..1b995a5 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -53,6 +53,7 @@
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
 #include "service/fe-support.h"
+#include "service/frontend.h"
 #include "service/impala-server.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/catalog-op-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/catalog-op-executor.cc 
b/be/src/exec/catalog-op-executor.cc
index 12398cf..164187c 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -25,6 +25,7 @@
 #include "runtime/lib-cache.h"
 #include "runtime/client-cache-types.h"
 #include "runtime/exec-env.h"
+#include "service/frontend.h"
 #include "service/impala-server.h"
 #include "service/hs2-util.h"
 #include "util/string-parser.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index f8f068e..9140b3e 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -121,69 +121,6 @@ Status DataSink::Init(const vector<TExpr>& 
thrift_output_exprs,
   return ScalarExpr::Create(thrift_output_exprs, *row_desc_, state, 
&output_exprs_);
 }
 
-void DataSink::MergeDmlStats(const TInsertStats& src_stats,
-    TInsertStats* dst_stats) {
-  dst_stats->bytes_written += src_stats.bytes_written;
-  if (src_stats.__isset.kudu_stats) {
-    if (!dst_stats->__isset.kudu_stats) 
dst_stats->__set_kudu_stats(TKuduDmlStats());
-    if (!dst_stats->kudu_stats.__isset.num_row_errors) {
-      dst_stats->kudu_stats.__set_num_row_errors(0);
-    }
-    dst_stats->kudu_stats.__set_num_row_errors(
-        dst_stats->kudu_stats.num_row_errors + 
src_stats.kudu_stats.num_row_errors);
-  }
-  if (src_stats.__isset.parquet_stats) {
-    if (dst_stats->__isset.parquet_stats) {
-      MergeMapValues<string, int64_t>(src_stats.parquet_stats.per_column_size,
-          &dst_stats->parquet_stats.per_column_size);
-    } else {
-      dst_stats->__set_parquet_stats(src_stats.parquet_stats);
-    }
-  }
-}
-
-string DataSink::OutputDmlStats(const PartitionStatusMap& stats,
-    const string& prefix) {
-  const char* indent = "  ";
-  stringstream ss;
-  ss << prefix;
-  bool first = true;
-  for (const PartitionStatusMap::value_type& val: stats) {
-    if (!first) ss << endl;
-    first = false;
-    ss << "Partition: ";
-
-    const string& partition_key = val.first;
-    if (partition_key == g_ImpalaInternalService_constants.ROOT_PARTITION_KEY) 
{
-      ss << "Default" << endl;
-    } else {
-      ss << partition_key << endl;
-    }
-    if (val.second.__isset.num_modified_rows) {
-      ss << "NumModifiedRows: " << val.second.num_modified_rows << endl;
-    }
-
-    if (!val.second.__isset.stats) continue;
-    const TInsertStats& stats = val.second.stats;
-    if (stats.__isset.kudu_stats) {
-      ss << "NumRowErrors: " << stats.kudu_stats.num_row_errors << endl;
-    }
-
-    ss << indent << "BytesWritten: "
-       << PrettyPrinter::Print(stats.bytes_written, TUnit::BYTES);
-    if (stats.__isset.parquet_stats) {
-      const TParquetInsertStats& parquet_stats = stats.parquet_stats;
-      ss << endl << indent << "Per Column Sizes:";
-      for (map<string, int64_t>::const_iterator i = 
parquet_stats.per_column_size.begin();
-           i != parquet_stats.per_column_size.end(); ++i) {
-        ss << endl << indent << indent << i->first << ": "
-           << PrettyPrinter::Print(i->second, TUnit::BYTES);
-      }
-    }
-  }
-  return ss.str();
-}
-
 Status DataSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
   DCHECK(parent_mem_tracker != nullptr);
   DCHECK(profile_ != nullptr);

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/data-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index d2e80af..605b46d 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -85,15 +85,6 @@ class DataSink {
       const TPlanFragmentInstanceCtx& fragment_instance_ctx,
       const RowDescriptor* row_desc, RuntimeState* state, DataSink** sink);
 
-  /// Merges one update to the DML stats for a partition. dst_stats will have 
the
-  /// combined stats of src_stats and dst_stats after this method returns.
-  static void MergeDmlStats(const TInsertStats& src_stats,
-      TInsertStats* dst_stats);
-
-  /// Outputs the DML stats contained in the map of partition updates to a 
string
-  static std::string OutputDmlStats(const PartitionStatusMap& stats,
-      const std::string& prefix = "");
-
   MemTracker* mem_tracker() const { return mem_tracker_.get(); }
   RuntimeProfile* profile() const { return profile_; }
   const std::vector<ScalarExprEvaluator*>& output_expr_evals() const {

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/hbase-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc
index 9e591ac..567e8bd 100644
--- a/be/src/exec/hbase-table-sink.cc
+++ b/be/src/exec/hbase-table-sink.cc
@@ -59,12 +59,7 @@ Status HBaseTableSink::Prepare(RuntimeState* state, 
MemTracker* parent_mem_track
   RETURN_IF_ERROR(hbase_table_writer_->Init(state));
 
   // Add a 'root partition' status in which to collect insert statistics
-  TInsertPartitionStatus root_status;
-  root_status.__set_num_modified_rows(0L);
-  root_status.__set_stats(TInsertStats());
-  root_status.__set_id(-1L);
-  state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, 
root_status));
-
+  state->dml_exec_state()->AddPartition(ROOT_PARTITION_KEY, -1L, nullptr);
   return Status::OK();
 }
 
@@ -74,8 +69,8 @@ Status HBaseTableSink::Send(RuntimeState* state, RowBatch* 
batch) {
   RETURN_IF_ERROR(state->CheckQueryState());
   // Since everything is set up just forward everything to the writer.
   RETURN_IF_ERROR(hbase_table_writer_->AppendRows(batch));
-  (*state->per_partition_status())[ROOT_PARTITION_KEY].num_modified_rows +=
-      batch->num_rows();
+  state->dml_exec_state()->UpdatePartition(
+      ROOT_PARTITION_KEY, batch->num_rows(), nullptr);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index c12b711..ca08bac 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -403,8 +403,9 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state,
   COUNTER_ADD(files_created_counter_, 1);
 
   if (!ShouldSkipStaging(state, output_partition)) {
-    // Save the ultimate destination for this file (it will be moved by the 
coordinator)
-    (*state->hdfs_files_to_move())[output_partition->current_file_name] = 
final_location;
+    // Save the ultimate destination for this file (it will be moved by the 
coordinator).
+    state->dml_exec_state()->AddFileToMove(
+        output_partition->current_file_name, final_location);
   }
 
   ++output_partition->num_files;
@@ -573,21 +574,15 @@ inline Status 
HdfsTableSink::GetOutputPartition(RuntimeState* state, const Tuple
       return status;
     }
 
-    // Save the partition name so that the coordinator can create the 
partition directory
-    // structure if needed
-    DCHECK(state->per_partition_status()->find(partition->partition_name) ==
-        state->per_partition_status()->end());
-    TInsertPartitionStatus partition_status;
-    partition_status.__set_num_modified_rows(0L);
-    partition_status.__set_id(partition_descriptor->id());
-    partition_status.__set_stats(TInsertStats());
-    partition_status.__set_partition_base_dir(table_desc_->hdfs_base_dir());
-    state->per_partition_status()->insert(
-        make_pair(partition->partition_name, partition_status));
+    // Save the partition name so that the coordinator can create the partition
+    // directory structure if needed.
+    state->dml_exec_state()->AddPartition(
+        partition->partition_name, partition_descriptor->id(),
+        &table_desc_->hdfs_base_dir());
 
     if (!no_more_rows && !ShouldSkipStaging(state, partition.get())) {
-      // Indicate that temporary directory is to be deleted after execution
-      (*state->hdfs_files_to_move())[partition->tmp_hdfs_dir_name] = "";
+      // Indicate that temporary directory is to be deleted after execution.
+      state->dml_exec_state()->AddFileToMove(partition->tmp_hdfs_dir_name, "");
     }
 
     partition_keys_to_output_partitions_[key].first = std::move(partition);
@@ -643,17 +638,8 @@ Status HdfsTableSink::FinalizePartitionFile(RuntimeState* 
state,
   // OutputPartition writer could be nullptr if there is no row to output.
   if (partition->writer.get() != nullptr) {
     RETURN_IF_ERROR(partition->writer->Finalize());
-
-    // Track total number of appended rows per partition in runtime
-    // state. partition->num_rows counts number of rows appended is per-file.
-    PartitionStatusMap::iterator it =
-        state->per_partition_status()->find(partition->partition_name);
-
-    // Should have been created in GetOutputPartition() when the partition was
-    // initialised.
-    DCHECK(it != state->per_partition_status()->end());
-    it->second.num_modified_rows += partition->num_rows;
-    DataSink::MergeDmlStats(partition->writer->stats(), &it->second.stats);
+    state->dml_exec_state()->UpdatePartition(
+        partition->partition_name, partition->num_rows, 
&partition->writer->stats());
   }
 
   RETURN_IF_ERROR(ClosePartitionFile(state, partition));

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index 05f1d06..67bb86e 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -61,9 +61,6 @@ using kudu::client::KuduError;
 
 namespace impala {
 
-const static string& ROOT_PARTITION_KEY =
-    g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
-
 // Send 7MB buffers to Kudu, matching a hard-coded size in Kudu (KUDU-1693).
 const static int INDIVIDUAL_BUFFER_SIZE = 7 * 1024 * 1024;
 
@@ -92,15 +89,7 @@ Status KuduTableSink::Prepare(RuntimeState* state, 
MemTracker* parent_mem_tracke
       << "TableDescriptor must be an instance KuduTableDescriptor.";
   table_desc_ = static_cast<const KuduTableDescriptor*>(table_desc);
 
-  // Add a 'root partition' status in which to collect write statistics
-  TInsertPartitionStatus root_status;
-  root_status.__set_num_modified_rows(0L);
-  root_status.__set_id(-1L);
-  TKuduDmlStats kudu_dml_stats;
-  kudu_dml_stats.__set_num_row_errors(0L);
-  root_status.__set_stats(TInsertStats());
-  root_status.stats.__set_kudu_stats(kudu_dml_stats);
-  state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, 
root_status));
+  state->dml_exec_state()->InitForKuduDml();
 
   // Add counters
   total_rows_ = ADD_COUNTER(profile(), "TotalNumRows", TUnit::UNIT);
@@ -338,12 +327,9 @@ Status KuduTableSink::FlushFinal(RuntimeState* state) {
     VLOG_RPC << "Ignoring Flush() error status: " << flush_status.ToString();
   }
   Status status = CheckForErrors(state);
-  TInsertPartitionStatus& insert_status =
-      (*state->per_partition_status())[ROOT_PARTITION_KEY];
-  insert_status.__set_num_modified_rows(
-      total_rows_->value() - num_row_errors_->value());
-  
insert_status.stats.kudu_stats.__set_num_row_errors(num_row_errors_->value());
-  
insert_status.__set_kudu_latest_observed_ts(client_->GetLatestObservedTimestamp());
+  state->dml_exec_state()->SetKuduDmlStats(
+      total_rows_->value() - num_row_errors_->value(), 
num_row_errors_->value(),
+      client_->GetLatestObservedTimestamp());
   return status;
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/plan-root-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index 1cdc544..836a376 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -117,6 +117,7 @@ void PlanRootSink::Close(RuntimeState* state) {
   unique_lock<mutex> l(lock_);
   // No guarantee that FlushFinal() has been called, so need to mark 
sender_done_ here as
   // well.
+  // TODO: shouldn't this also set eos to true? do we need both eos and 
sender_done_?
   sender_done_ = true;
   consumer_cv_.NotifyAll();
   // Wait for consumer to be done, in case sender tries to tear-down this sink 
while the

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 0d4b61c..89cbbb9 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -37,6 +37,7 @@ add_library(Runtime
   data-stream-sender.cc
   debug-options.cc
   descriptors.cc
+  dml-exec-state.cc
   exec-env.cc
   fragment-instance-state.cc
   hbase-table.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc 
b/be/src/runtime/coordinator-backend-state.cc
index 914a3e4..e8db00e 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -43,10 +43,7 @@ Coordinator::BackendState::BackendState(
     const TUniqueId& query_id, int state_idx, TRuntimeFilterMode::type 
filter_mode)
   : query_id_(query_id),
     state_idx_(state_idx),
-    filter_mode_(filter_mode),
-    rpc_latency_(0),
-    rpc_sent_(false),
-    peak_consumption_(0L) {
+    filter_mode_(filter_mode) {
 }
 
 void Coordinator::BackendState::Init(
@@ -413,11 +410,7 @@ Coordinator::BackendState::InstanceStats::InstanceStats(
     const FInstanceExecParams& exec_params, FragmentStats* fragment_stats,
     ObjectPool* obj_pool)
   : exec_params_(exec_params),
-    profile_(nullptr),
-    done_(false),
-    profile_created_(false),
-    total_split_size_(0),
-    total_ranges_complete_(0) {
+    profile_(nullptr) {
   const string& profile_name = Substitute("Instance $0 (host=$1)",
       PrintId(exec_params.instance_id), 
lexical_cast<string>(exec_params.host));
   profile_ = RuntimeProfile::Create(obj_pool, profile_name);

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h 
b/be/src/runtime/coordinator-backend-state.h
index 0973ca3..d2f122c 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -161,24 +161,24 @@ class Coordinator::BackendState {
     int64_t last_report_time_ms_ = 0;
 
     /// owned by coordinator object pool provided in the c'tor, created in 
Update()
-    RuntimeProfile* profile_;
+    RuntimeProfile* profile_ = nullptr;
 
     /// true if the final report has been received for the fragment instance.
     /// Used to handle duplicate done ReportExecStatus RPC messages. Used only
     /// in ApplyExecStatusReport()
-    bool done_;
+    bool done_ = false;
 
     /// true after the first call to profile->Update()
-    bool profile_created_;
+    bool profile_created_ = false;
 
     /// cumulative size of all splits; set in c'tor
-    int64_t total_split_size_;
+    int64_t total_split_size_ = 0;
 
     /// wall clock timer for this instance
     MonotonicStopWatch stopwatch_;
 
     /// total scan ranges complete across all scan nodes
-    int64_t total_ranges_complete_;
+    int64_t total_ranges_complete_ = 0;
 
     /// SCAN_RANGES_COMPLETE_COUNTERs in profile_
     std::vector<RuntimeProfile::Counter*> scan_ranges_complete_counters_;
@@ -215,7 +215,7 @@ class Coordinator::BackendState {
   boost::mutex lock_;
 
   // number of in-flight instances
-  int num_remaining_instances_;
+  int num_remaining_instances_ = 0;
 
   /// If the status indicates an error status, execution has either been 
aborted by the
   /// executing impalad (which then reported the error) or cancellation has 
been
@@ -235,15 +235,15 @@ class Coordinator::BackendState {
   ErrorLogMap error_log_;
 
   /// Time, in ms, that it took to execute the ExecRemoteFragment() RPC.
-  int64_t rpc_latency_;
+  int64_t rpc_latency_ = 0;
 
   /// If true, ExecPlanFragment() rpc has been sent - even if it was not 
determined to be
   /// successful.
-  bool rpc_sent_;
+  bool rpc_sent_ = false;
 
   /// peak memory used for this query (value of that node's query memtracker's
   /// peak_consumption()
-  int64_t peak_consumption_;
+  int64_t peak_consumption_ = 0;
 
   /// Set in ApplyExecStatusReport(). Uses MonotonicMillis().
   int64_t last_report_time_ms_ = 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index a5d53b2..e6b3bca 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -63,10 +63,7 @@ using std::unique_ptr;
 DECLARE_int32(be_port);
 DECLARE_string(hostname);
 
-DEFINE_bool(insert_inherit_permissions, false, "If true, new directories 
created by "
-    "INSERTs will inherit the permissions of their parent directories");
-
-namespace impala {
+using namespace impala;
 
 // Maximum number of fragment instances that can publish each broadcast filter.
 static const int MAX_BROADCAST_FILTER_PRODUCERS = 3;
@@ -93,9 +90,6 @@ Status Coordinator::Exec() {
   const TQueryExecRequest& request = schedule_.request();
   DCHECK(request.plan_exec_info.size() > 0);
 
-  needs_finalization_ = request.__isset.finalize_params;
-  if (needs_finalization_) finalize_params_ = request.finalize_params;
-
   VLOG_QUERY << "Exec() query_id=" << schedule_.query_id()
              << " stmt=" << request.query_ctx.client_request.stmt;
   stmt_type_ = request.stmt_type;
@@ -489,291 +483,32 @@ Status Coordinator::UpdateStatus(const Status& status, 
const string& backend_hos
   return query_status_;
 }
 
-void Coordinator::PopulatePathPermissionCache(hdfsFS fs, const string& 
path_str,
-    PermissionCache* permissions_cache) {
-  // Find out if the path begins with a hdfs:// -style prefix, and remove it 
and the
-  // location (e.g. host:port) if so.
-  int scheme_end = path_str.find("://");
-  string stripped_str;
-  if (scheme_end != string::npos) {
-    // Skip past the subsequent location:port/ prefix.
-    stripped_str = path_str.substr(path_str.find("/", scheme_end + 3));
-  } else {
-    stripped_str = path_str;
-  }
-
-  // Get the list of path components, used to build all path prefixes.
-  vector<string> components;
-  split(components, stripped_str, is_any_of("/"));
-
-  // Build a set of all prefixes (including the complete string) of 
stripped_path. So
-  // /a/b/c/d leads to a vector of: /a, /a/b, /a/b/c, /a/b/c/d
-  vector<string> prefixes;
-  // Stores the current prefix
-  stringstream accumulator;
-  for (const string& component: components) {
-    if (component.empty()) continue;
-    accumulator << "/" << component;
-    prefixes.push_back(accumulator.str());
-  }
-
-  // Now for each prefix, stat() it to see if a) it exists and b) if so what 
its
-  // permissions are. When we meet a directory that doesn't exist, we record 
the fact that
-  // we need to create it, and the permissions of its parent dir to inherit.
-  //
-  // Every prefix is recorded in the PermissionCache so we don't do more than 
one stat()
-  // for each path. If we need to create the directory, we record it as the 
pair (true,
-  // perms) so that the caller can identify which directories need their 
permissions
-  // explicitly set.
-
-  // Set to the permission of the immediate parent (i.e. the permissions to 
inherit if the
-  // current dir doesn't exist).
-  short permissions = 0;
-  for (const string& path: prefixes) {
-    PermissionCache::const_iterator it = permissions_cache->find(path);
-    if (it == permissions_cache->end()) {
-      hdfsFileInfo* info = hdfsGetPathInfo(fs, path.c_str());
-      if (info != nullptr) {
-        // File exists, so fill the cache with its current permissions.
-        permissions_cache->insert(
-            make_pair(path, make_pair(false, info->mPermissions)));
-        permissions = info->mPermissions;
-        hdfsFreeFileInfo(info, 1);
-      } else {
-        // File doesn't exist, so we need to set its permissions to its 
immediate parent
-        // once it's been created.
-        permissions_cache->insert(make_pair(path, make_pair(true, 
permissions)));
-      }
-    } else {
-      permissions = it->second.second;
-    }
-  }
-}
-
-Status Coordinator::FinalizeSuccessfulInsert() {
-  PermissionCache permissions_cache;
-  HdfsFsCache::HdfsFsMap filesystem_connection_cache;
-  HdfsOperationSet partition_create_ops(&filesystem_connection_cache);
-
-  // INSERT finalization happens in the five following steps
-  // 1. If OVERWRITE, remove all the files in the target directory
-  // 2. Create all the necessary partition directories.
-  HdfsTableDescriptor* hdfs_table;
-  RETURN_IF_ERROR(DescriptorTbl::CreateHdfsTblDescriptor(query_ctx_.desc_tbl,
-      finalize_params_.table_id, obj_pool(), &hdfs_table));
-  DCHECK(hdfs_table != nullptr)
-      << "INSERT target table not known in descriptor table: "
-      << finalize_params_.table_id;
-
-  // Loop over all partitions that were updated by this insert, and create the 
set of
-  // filesystem operations required to create the correct partition structure 
on disk.
-  for (const PartitionStatusMap::value_type& partition: per_partition_status_) 
{
-    SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, 
"Overwrite/PartitionCreationTimer",
-          "FinalizationTimer"));
-    // INSERT allows writes to tables that have partitions on multiple 
filesystems.
-    // So we need to open connections to different filesystems as necessary. 
We use a
-    // local connection cache and populate it with one connection per 
filesystem that the
-    // partitions are on.
-    hdfsFS partition_fs_connection;
-    RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
-      partition.second.partition_base_dir, &partition_fs_connection,
-          &filesystem_connection_cache));
-
-    // Look up the partition in the descriptor table.
-    stringstream part_path_ss;
-    if (partition.second.id == -1) {
-      // If this is a non-existant partition, use the default partition 
location of
-      // <base_dir>/part_key_1=val/part_key_2=val/...
-      part_path_ss << finalize_params_.hdfs_base_dir << "/" << partition.first;
-    } else {
-      HdfsPartitionDescriptor* part = 
hdfs_table->GetPartition(partition.second.id);
-      DCHECK(part != nullptr)
-          << "table_id=" << hdfs_table->id() << " partition_id=" << 
partition.second.id
-          << "\n" <<  PrintThrift(runtime_state()->instance_ctx());
-      part_path_ss << part->location();
-    }
-    const string& part_path = part_path_ss.str();
-    bool is_s3_path = IsS3APath(part_path.c_str());
-
-    // If this is an overwrite insert, we will need to delete any updated 
partitions
-    if (finalize_params_.is_overwrite) {
-      if (partition.first.empty()) {
-        // If the root directory is written to, then the table must not be 
partitioned
-        DCHECK(per_partition_status_.size() == 1);
-        // We need to be a little more careful, and only delete data files in 
the root
-        // because the tmp directories the sink(s) wrote are there also.
-        // So only delete files in the table directory - all files are treated 
as data
-        // files by Hive and Impala, but directories are ignored (and may 
legitimately
-        // be used to store permanent non-table data by other applications).
-        int num_files = 0;
-        // hfdsListDirectory() only sets errno if there is an error, but it 
doesn't set
-        // it to 0 if the call succeed. When there is no error, errno could be 
any
-        // value. So need to clear errno before calling it.
-        // Once HDFS-8407 is fixed, the errno reset won't be needed.
-        errno = 0;
-        hdfsFileInfo* existing_files =
-            hdfsListDirectory(partition_fs_connection, part_path.c_str(), 
&num_files);
-        if (existing_files == nullptr && errno == EAGAIN) {
-          errno = 0;
-          existing_files =
-              hdfsListDirectory(partition_fs_connection, part_path.c_str(), 
&num_files);
-        }
-        // hdfsListDirectory() returns nullptr not only when there is an error 
but also
-        // when the directory is empty(HDFS-8407). Need to check errno to make 
sure
-        // the call fails.
-        if (existing_files == nullptr && errno != 0) {
-          return Status(GetHdfsErrorMsg("Could not list directory: ", 
part_path));
-        }
-        for (int i = 0; i < num_files; ++i) {
-          const string filename = 
path(existing_files[i].mName).filename().string();
-          if (existing_files[i].mKind == kObjectKindFile && 
!IsHiddenFile(filename)) {
-            partition_create_ops.Add(DELETE, existing_files[i].mName);
-          }
-        }
-        hdfsFreeFileInfo(existing_files, num_files);
-      } else {
-        // This is a partition directory, not the root directory; we can delete
-        // recursively with abandon, after checking that it ever existed.
-        // TODO: There's a potential race here between checking for the 
directory
-        // and a third-party deleting it.
-        if (FLAGS_insert_inherit_permissions && !is_s3_path) {
-          // There is no directory structure in S3, so "inheriting" 
permissions is not
-          // possible.
-          // TODO: Try to mimic inheriting permissions for S3.
-          PopulatePathPermissionCache(
-              partition_fs_connection, part_path, &permissions_cache);
-        }
-        // S3 doesn't have a directory structure, so we technically wouldn't 
need to
-        // CREATE_DIR on S3. However, libhdfs always checks if a path exists 
before
-        // carrying out an operation on that path. So we still need to call 
CREATE_DIR
-        // before we access that path due to this limitation.
-        if (hdfsExists(partition_fs_connection, part_path.c_str()) != -1) {
-          partition_create_ops.Add(DELETE_THEN_CREATE, part_path);
-        } else {
-          // Otherwise just create the directory.
-          partition_create_ops.Add(CREATE_DIR, part_path);
-        }
-      }
-    } else if (!is_s3_path
-        || !query_ctx_.client_request.query_options.s3_skip_insert_staging) {
-      // If the S3_SKIP_INSERT_STAGING query option is set, then the partition 
directories
-      // would have already been created by the table sinks.
-      if (FLAGS_insert_inherit_permissions && !is_s3_path) {
-        PopulatePathPermissionCache(
-            partition_fs_connection, part_path, &permissions_cache);
-      }
-      if (hdfsExists(partition_fs_connection, part_path.c_str()) == -1) {
-        partition_create_ops.Add(CREATE_DIR, part_path);
-      }
-    }
-  }
-
-  // We're done with the HDFS descriptor - free up its resources.
-  hdfs_table->ReleaseResources();
-  hdfs_table = nullptr;
-
-  {
-    SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, 
"Overwrite/PartitionCreationTimer",
-          "FinalizationTimer"));
-    if (!partition_create_ops.Execute(
-        ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
-      for (const HdfsOperationSet::Error& err: partition_create_ops.errors()) {
-        // It's ok to ignore errors creating the directories, since they may 
already
-        // exist. If there are permission errors, we'll run into them later.
-        if (err.first->op() != CREATE_DIR) {
-          return Status(Substitute(
-              "Error(s) deleting partition directories. First error (of $0) 
was: $1",
-              partition_create_ops.errors().size(), err.second));
-        }
-      }
-    }
-  }
-
-  // 3. Move all tmp files
-  HdfsOperationSet move_ops(&filesystem_connection_cache);
-  HdfsOperationSet dir_deletion_ops(&filesystem_connection_cache);
-
-  for (FileMoveMap::value_type& move: files_to_move_) {
-    // Empty destination means delete, so this is a directory. These get 
deleted in a
-    // separate pass to ensure that we have moved all the contents of the 
directory first.
-    if (move.second.empty()) {
-      VLOG_ROW << "Deleting file: " << move.first;
-      dir_deletion_ops.Add(DELETE, move.first);
-    } else {
-      VLOG_ROW << "Moving tmp file: " << move.first << " to " << move.second;
-      if (FilesystemsMatch(move.first.c_str(), move.second.c_str())) {
-        move_ops.Add(RENAME, move.first, move.second);
-      } else {
-        move_ops.Add(MOVE, move.first, move.second);
-      }
-    }
-  }
-
-  {
-    SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "FileMoveTimer", 
"FinalizationTimer"));
-    if (!move_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), 
false)) {
-      stringstream ss;
-      ss << "Error(s) moving partition files. First error (of "
-         << move_ops.errors().size() << ") was: " << 
move_ops.errors()[0].second;
-      return Status(ss.str());
-    }
-  }
-
-  // 4. Delete temp directories
-  {
-    SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "FileDeletionTimer",
-         "FinalizationTimer"));
-    if 
(!dir_deletion_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), 
false)) {
-      stringstream ss;
-      ss << "Error(s) deleting staging directories. First error (of "
-         << dir_deletion_ops.errors().size() << ") was: "
-         << dir_deletion_ops.errors()[0].second;
-      return Status(ss.str());
-    }
-  }
-
-  // 5. Optionally update the permissions of the created partition directories
-  // Do this last so that we don't make a dir unwritable before we write to it.
-  if (FLAGS_insert_inherit_permissions) {
-    HdfsOperationSet chmod_ops(&filesystem_connection_cache);
-    for (const PermissionCache::value_type& perm: permissions_cache) {
-      bool new_dir = perm.second.first;
-      if (new_dir) {
-        short permissions = perm.second.second;
-        VLOG_QUERY << "INSERT created new directory: " << perm.first
-                   << ", inherited permissions are: " << oct << permissions;
-        chmod_ops.Add(CHMOD, perm.first, permissions);
-      }
-    }
-    if (!chmod_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), 
false)) {
-      stringstream ss;
-      ss << "Error(s) setting permissions on newly created partition 
directories. First"
-         << " error (of " << chmod_ops.errors().size() << ") was: "
-         << chmod_ops.errors()[0].second;
-      return Status(ss.str());
-    }
-  }
-
-  return Status::OK();
-}
-
-Status Coordinator::FinalizeQuery() {
+Status Coordinator::FinalizeHdfsInsert() {
   // All instances must have reported their final statuses before 
finalization, which is a
   // post-condition of Wait. If the query was not successful, still try to 
clean up the
   // staging directory.
   DCHECK(has_called_wait_);
-  DCHECK(needs_finalization_);
+  DCHECK(finalize_params() != nullptr);
 
   VLOG_QUERY << "Finalizing query: " << query_id();
   SCOPED_TIMER(finalization_timer_);
   Status return_status = GetStatus();
   if (return_status.ok()) {
-    return_status = FinalizeSuccessfulInsert();
+    HdfsTableDescriptor* hdfs_table;
+    
RETURN_IF_ERROR(DescriptorTbl::CreateHdfsTblDescriptor(query_ctx().desc_tbl,
+            finalize_params()->table_id, obj_pool(), &hdfs_table));
+    DCHECK(hdfs_table != nullptr)
+        << "INSERT target table not known in descriptor table: "
+        << finalize_params()->table_id;
+    return_status = dml_exec_state_.FinalizeHdfsInsert(*finalize_params(),
+        query_ctx().client_request.query_options.s3_skip_insert_staging,
+        hdfs_table, query_profile_);
+    hdfs_table->ReleaseResources();
   }
 
   stringstream staging_dir;
-  DCHECK(finalize_params_.__isset.staging_dir);
-  staging_dir << finalize_params_.staging_dir << "/" << 
PrintId(query_id(),"_") << "/";
+  DCHECK(finalize_params()->__isset.staging_dir);
+  staging_dir << finalize_params()->staging_dir << "/" << 
PrintId(query_id(),"_") << "/";
 
   hdfsFS hdfs_conn;
   RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(staging_dir.str(), 
&hdfs_conn));
@@ -813,26 +548,26 @@ Status Coordinator::Wait() {
   }
 
   DCHECK_EQ(stmt_type_, TStmtType::DML);
-  // Query finalization can only happen when all backends have reported
-  // relevant state. They only have relevant state to report in the parallel
-  // INSERT case, otherwise all the relevant state is from the coordinator
-  // fragment which will be available after Open() returns.
-  // Ignore the returned status if finalization is required., since 
FinalizeQuery() will
-  // pick it up and needs to execute regardless.
+  // Query finalization can only happen when all backends have reported 
relevant
+  // state. They only have relevant state to report in the parallel INSERT 
case,
+  // otherwise all the relevant state is from the coordinator fragment which 
will be
+  // available after Open() returns.  Ignore the returned status if 
finalization is
+  // required., since FinalizeHdfsInsert() will pick it up and needs to execute
+  // regardless.
   Status status = WaitForBackendCompletion();
-  if (!needs_finalization_ && !status.ok()) return status;
+  if (finalize_params() == nullptr && !status.ok()) return status;
 
   // Execution of query fragments has finished. We don't need to hold onto 
query execution
   // resources while we finalize the query.
   ReleaseExecResources();
   // Query finalization is required only for HDFS table sinks
-  if (needs_finalization_) RETURN_IF_ERROR(FinalizeQuery());
+  if (finalize_params() != nullptr) RETURN_IF_ERROR(FinalizeHdfsInsert());
   // Release admission control resources after we'd done the potentially 
heavyweight
   // finalization.
   ReleaseAdmissionControlResources();
 
   query_profile_->AddInfoString(
-      "DML Stats", DataSink::OutputDmlStats(per_partition_status_, "\n"));
+      "DML Stats", dml_exec_state_.OutputPartitionStats("\n"));
   // For DML queries, when Wait is done, the query is complete.
   ComputeQuerySummary();
   return status;
@@ -929,7 +664,7 @@ Status Coordinator::UpdateBackendExecStatus(const 
TReportExecStatusParams& param
   // TODO: only do this when the sink is done; probably missing a done field
   // in TReportExecStatus for that
   if (params.__isset.insert_exec_status) {
-    UpdateInsertExecStatus(params.insert_exec_status);
+    dml_exec_state_.Update(params.insert_exec_status);
   }
 
   if (backend_state->ApplyExecStatusReport(params, &exec_summary_, 
&progress_)) {
@@ -971,52 +706,10 @@ Status Coordinator::UpdateBackendExecStatus(const 
TReportExecStatusParams& param
   return Status::OK();
 }
 
-void Coordinator::UpdateInsertExecStatus(const TInsertExecStatus& 
insert_exec_status) {
-  lock_guard<mutex> l(lock_);
-  for (const PartitionStatusMap::value_type& partition:
-       insert_exec_status.per_partition_status) {
-    TInsertPartitionStatus* status = &(per_partition_status_[partition.first]);
-    status->__set_num_modified_rows(
-        status->num_modified_rows + partition.second.num_modified_rows);
-    status->__set_kudu_latest_observed_ts(std::max(
-        partition.second.kudu_latest_observed_ts, 
status->kudu_latest_observed_ts));
-    status->__set_id(partition.second.id);
-    status->__set_partition_base_dir(partition.second.partition_base_dir);
-
-    if (partition.second.__isset.stats) {
-      if (!status->__isset.stats) status->__set_stats(TInsertStats());
-      DataSink::MergeDmlStats(partition.second.stats, &status->stats);
-    }
-  }
-  files_to_move_.insert(
-      insert_exec_status.files_to_move.begin(), 
insert_exec_status.files_to_move.end());
-}
-
-
-uint64_t Coordinator::GetLatestKuduInsertTimestamp() const {
-  uint64_t max_ts = 0;
-  for (const auto& entry : per_partition_status_) {
-    max_ts = std::max(max_ts,
-        static_cast<uint64_t>(entry.second.kudu_latest_observed_ts));
-  }
-  return max_ts;
-}
-
 RuntimeState* Coordinator::runtime_state() {
   return coord_instance_ == nullptr ? nullptr : 
coord_instance_->runtime_state();
 }
 
-bool Coordinator::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) {
-  // Assume we are called only after all fragments have completed
-  DCHECK(has_called_wait_);
-
-  for (const PartitionStatusMap::value_type& partition: per_partition_status_) 
{
-    catalog_update->created_partitions.insert(partition.first);
-  }
-
-  return catalog_update->created_partitions.size() != 0;
-}
-
 // TODO: add histogram/percentile
 void Coordinator::ComputeQuerySummary() {
   // In this case, the query did not even get to start all fragment instances.
@@ -1285,4 +978,8 @@ void Coordinator::FInstanceStatsToJson(Document* doc) {
   }
   doc->AddMember("backend_instances", states, doc->GetAllocator());
 }
+
+const TFinalizeParams* Coordinator::finalize_params() const {
+  return schedule_.request().__isset.finalize_params
+      ? &schedule_.request().finalize_params : nullptr;
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index d630b9a..6665c08 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -38,7 +38,8 @@
 #include "common/status.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Types_types.h"
-#include "runtime/runtime-state.h" // for PartitionStatusMap; TODO: disentangle
+#include "runtime/dml-exec-state.h"
+#include "runtime/query-state.h"
 #include "scheduling/query-schedule.h"
 #include "util/condition-variable.h"
 #include "util/progress-updater.h"
@@ -130,9 +131,9 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// Idempotent.
   void Cancel(const Status* cause = nullptr);
 
-  /// Updates execution status of a particular backend as well as 
Insert-related
-  /// status (per_partition_status_ and files_to_move_). Also updates
-  /// num_remaining_backends_ and cancels execution if the backend has an 
error status.
+  /// Updates execution status of a particular backend as well as 
dml_exec_state_.
+  /// Also updates num_remaining_backends_ and cancels execution if the 
backend has an
+  /// error status.
   Status UpdateBackendExecStatus(const TReportExecStatusParams& params)
       WARN_UNUSED_RESULT;
 
@@ -149,17 +150,7 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   MemTracker* query_mem_tracker() const;
 
   /// This is safe to call only after Wait()
-  const PartitionStatusMap& per_partition_status() { return 
per_partition_status_; }
-
-  /// Returns the latest Kudu timestamp observed across any backends where DML 
into Kudu
-  /// was executed, or 0 if there were no Kudu timestamps reported.
-  /// This should only be called after Wait().
-  uint64_t GetLatestKuduInsertTimestamp() const;
-
-  /// Gathers all updates to the catalog required once this query has 
completed execution.
-  /// Returns true if a catalog update is required, false otherwise.
-  /// Must only be called after Wait()
-  bool PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update);
+  DmlExecState* dml_exec_state() { return &dml_exec_state_; }
 
   /// Return error log for coord and all the fragments. The error messages 
from the
   /// individual fragment instances are merged into a single output to retain 
readability.
@@ -229,12 +220,6 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// GetNext() hits eos.
   PlanRootSink* coord_sink_ = nullptr;
 
-  /// True if the query needs a post-execution step to tidy up
-  bool needs_finalization_ = false;
-
-  /// Only valid if needs_finalization is true
-  TFinalizeParams finalize_params_;
-
   /// ensures single-threaded execution of Wait(); must not hold lock_ when 
acquiring this
   boost::mutex wait_lock_;
 
@@ -275,6 +260,11 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
 
   ExecSummary exec_summary_;
 
+  /// Filled in as the query completes and tracks the results of DML queries.  
This is
+  /// either the union of the reports from all fragment instances, or taken 
from the
+  /// coordinator fragment: only one of the two can legitimately produce 
updates.
+  DmlExecState dml_exec_state_;
+
   /// Aggregate counters for the entire query. Lives in 'obj_pool_'.
   RuntimeProfile* query_profile_ = nullptr;
 
@@ -308,21 +298,6 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// hits 0, any Wait()'ing thread is notified
   int num_remaining_backends_ = 0;
 
-  /// The following two structures, partition_row_counts_ and files_to_move_ 
are filled in
-  /// as the query completes, and track the results of INSERT queries that 
alter the
-  /// structure of tables. They are either the union of the reports from all 
fragment
-  /// instances, or taken from the coordinator fragment: only one of the two 
can
-  /// legitimately produce updates.
-
-  /// The set of partitions that have been written to or updated by all 
fragment
-  /// instances, along with statistics such as the number of rows written (may 
be 0). For
-  /// unpartitioned tables, the empty string denotes the entire table.
-  PartitionStatusMap per_partition_status_;
-
-  /// The set of files to move after an INSERT query has run, in (src, dest) 
form. An
-  /// empty string for the destination means that a file is to be deleted.
-  FileMoveMap files_to_move_;
-
   /// Event timeline for this query. Not owned.
   RuntimeProfile::EventSequence* query_events_ = nullptr;
 
@@ -357,6 +332,12 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// Returns a local object pool.
   ObjectPool* obj_pool() { return obj_pool_.get(); }
 
+  /// Returns request's finalize params, or nullptr if not present. If not 
present, then
+  /// HDFS INSERT finalization is not required.
+  const TFinalizeParams* finalize_params() const;
+
+  const TQueryCtx& query_ctx() const { return schedule_.request().query_ctx; }
+
   /// Only valid *after* calling Exec(). Return nullptr if the running query 
does not
   /// produce any rows.
   RuntimeState* runtime_state();
@@ -381,9 +362,6 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   Status UpdateStatus(const Status& status, const std::string& 
backend_hostname,
       bool is_fragment_failure, const TUniqueId& failed_fragment) 
WARN_UNUSED_RESULT;
 
-  /// Update per_partition_status_ and files_to_move_.
-  void UpdateInsertExecStatus(const TInsertExecStatus& insert_exec_status);
-
   /// Returns only when either all execution backends have reported success or 
the query
   /// is in error. Returns the status of the query.
   /// It is safe to call this concurrently, but any calls must be made only 
after Exec().
@@ -403,29 +381,11 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// profiles must not be updated while this is running.
   void ComputeQuerySummary();
 
-  /// TODO: move the next 3 functions into a separate class
-
-  /// Determines what the permissions of directories created by INSERT 
statements should
-  /// be if permission inheritance is enabled. Populates a map from all 
prefixes of
-  /// path_str (including the full path itself) which is a path in Hdfs, to 
pairs
-  /// (does_not_exist, permissions), where does_not_exist is true if the path 
does not
-  /// exist in Hdfs. If does_not_exist is true, permissions is set to the 
permissions of
-  /// the most immediate ancestor of the path that does exist, i.e. the 
permissions that
-  /// the path should inherit when created. Otherwise permissions is set to 
the actual
-  /// permissions of the path. The PermissionCache argument is also used to 
cache the
-  /// output across repeated calls, to avoid repeatedly calling 
hdfsGetPathInfo() on the
-  /// same path.
-  typedef boost::unordered_map<std::string, std::pair<bool, short>> 
PermissionCache;
-  void PopulatePathPermissionCache(hdfsFS fs, const std::string& path_str,
-      PermissionCache* permissions_cache);
-
-  /// Moves all temporary staging files to their final destinations.
-  Status FinalizeSuccessfulInsert() WARN_UNUSED_RESULT;
-
-  /// Perform any post-query cleanup required. Called by Wait() only after all 
fragment
-  /// instances have returned, or if the query has failed, in which case it 
only cleans up
-  /// temporary data rather than finishing the INSERT in flight.
-  Status FinalizeQuery() WARN_UNUSED_RESULT;
+  /// Perform any post-query cleanup required for HDFS (or other Hadoop 
FileSystem)
+  /// INSERT. Called by Wait() only after all fragment instances have 
returned, or if
+  /// the query has failed, in which case it only cleans up temporary data 
rather than
+  /// finishing the INSERT in flight.
+  Status FinalizeHdfsInsert() WARN_UNUSED_RESULT;
 
   /// Populates backend_states_, starts query execution at all backends in 
parallel, and
   /// blocks until startup completes.

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/dml-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/dml-exec-state.cc b/be/src/runtime/dml-exec-state.cc
new file mode 100644
index 0000000..6853da5
--- /dev/null
+++ b/be/src/runtime/dml-exec-state.cc
@@ -0,0 +1,494 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/dml-exec-state.h"
+
+#include <boost/thread/locks.hpp>
+#include <boost/thread/lock_guard.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/algorithm/string/join.hpp>
+#include <boost/filesystem.hpp>
+#include <gutil/strings/substitute.h>
+
+#include "common/logging.h"
+#include "util/pretty-printer.h"
+#include "util/container-util.h"
+#include "util/hdfs-bulk-ops.h"
+#include "util/hdfs-util.h"
+#include "util/runtime-profile-counters.h"
+#include "runtime/descriptors.h"
+#include "runtime/hdfs-fs-cache.h"
+#include "runtime/exec-env.h"
+#include "gen-cpp/ImpalaService_types.h"
+#include "gen-cpp/ImpalaInternalService_constants.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "gen-cpp/Frontend_types.h"
+
+#include "common/names.h"
+
+DEFINE_bool(insert_inherit_permissions, false, "If true, new directories 
created by "
+    "INSERTs will inherit the permissions of their parent directories");
+
+using namespace impala;
+using boost::algorithm::is_any_of;
+using boost::algorithm::split;
+
+string DmlExecState::OutputPartitionStats(const string& prefix) {
+  lock_guard<mutex> l(lock_);
+  const char* indent = "  ";
+  stringstream ss;
+  ss << prefix;
+  bool first = true;
+  for (const PartitionStatusMap::value_type& val: per_partition_status_) {
+    if (!first) ss << endl;
+    first = false;
+    ss << "Partition: ";
+    const string& partition_key = val.first;
+    if (partition_key == g_ImpalaInternalService_constants.ROOT_PARTITION_KEY) 
{
+      ss << "Default" << endl;
+    } else {
+      ss << partition_key << endl;
+    }
+    if (val.second.__isset.num_modified_rows) {
+      ss << "NumModifiedRows: " << val.second.num_modified_rows << endl;
+    }
+
+    if (!val.second.__isset.stats) continue;
+    const TInsertStats& stats = val.second.stats;
+    if (stats.__isset.kudu_stats) {
+      ss << "NumRowErrors: " << stats.kudu_stats.num_row_errors << endl;
+    }
+
+    ss << indent << "BytesWritten: "
+       << PrettyPrinter::Print(stats.bytes_written, TUnit::BYTES);
+    if (stats.__isset.parquet_stats) {
+      const TParquetInsertStats& parquet_stats = stats.parquet_stats;
+      ss << endl << indent << "Per Column Sizes:";
+      for (map<string, int64_t>::const_iterator i = 
parquet_stats.per_column_size.begin();
+           i != parquet_stats.per_column_size.end(); ++i) {
+        ss << endl << indent << indent << i->first << ": "
+           << PrettyPrinter::Print(i->second, TUnit::BYTES);
+      }
+    }
+  }
+  return ss.str();
+}
+
+void DmlExecState::Update(const TInsertExecStatus& dml_exec_status) {
+  lock_guard<mutex> l(lock_);
+  for (const PartitionStatusMap::value_type& partition:
+           dml_exec_status.per_partition_status) {
+    TInsertPartitionStatus* status = &(per_partition_status_[partition.first]);
+    status->__set_num_modified_rows(
+        status->num_modified_rows + partition.second.num_modified_rows);
+    status->__set_kudu_latest_observed_ts(max<uint64_t>(
+            partition.second.kudu_latest_observed_ts, 
status->kudu_latest_observed_ts));
+    status->__set_id(partition.second.id);
+    status->__set_partition_base_dir(partition.second.partition_base_dir);
+
+    if (partition.second.__isset.stats) {
+      if (!status->__isset.stats) status->__set_stats(TInsertStats());
+      MergeDmlStats(partition.second.stats, &status->stats);
+    }
+  }
+  files_to_move_.insert(
+      dml_exec_status.files_to_move.begin(), 
dml_exec_status.files_to_move.end());
+}
+
+void DmlExecState::AddFileToMove(const string& file_name, const string& 
location) {
+  lock_guard<mutex> l(lock_);
+  files_to_move_[file_name] = location;
+}
+
+uint64_t DmlExecState::GetKuduLatestObservedTimestamp() {
+  lock_guard<mutex> l(lock_);
+  uint64_t max_ts = 0;
+  for (const auto& entry : per_partition_status_) {
+    max_ts = max<uint64_t>(max_ts, entry.second.kudu_latest_observed_ts);
+  }
+  return max_ts;
+}
+
+int64_t DmlExecState::GetNumModifiedRows() {
+  lock_guard<mutex> l(lock_);
+  int64_t result = 0;
+  for (const PartitionStatusMap::value_type& p: per_partition_status_) {
+    result += p.second.num_modified_rows;
+  }
+  return result;
+}
+
+bool DmlExecState::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) 
{
+  lock_guard<mutex> l(lock_);
+  for (const PartitionStatusMap::value_type& partition: per_partition_status_) 
{
+    catalog_update->created_partitions.insert(partition.first);
+  }
+  return catalog_update->created_partitions.size() != 0;
+}
+
+Status DmlExecState::FinalizeHdfsInsert(const TFinalizeParams& params,
+    bool s3_skip_insert_staging, HdfsTableDescriptor* hdfs_table,
+    RuntimeProfile* profile) {
+  lock_guard<mutex> l(lock_);
+  PermissionCache permissions_cache;
+  HdfsFsCache::HdfsFsMap filesystem_connection_cache;
+  HdfsOperationSet partition_create_ops(&filesystem_connection_cache);
+
+  // INSERT finalization happens in the five following steps
+  // 1. If OVERWRITE, remove all the files in the target directory
+  // 2. Create all the necessary partition directories.
+
+  // Loop over all partitions that were updated by this insert, and create the 
set of
+  // filesystem operations required to create the correct partition structure 
on disk.
+  for (const PartitionStatusMap::value_type& partition: per_partition_status_) 
{
+    SCOPED_TIMER(ADD_CHILD_TIMER(profile, "Overwrite/PartitionCreationTimer",
+            "FinalizationTimer"));
+    // INSERT allows writes to tables that have partitions on multiple 
filesystems.
+    // So we need to open connections to different filesystems as necessary. 
We use a
+    // local connection cache and populate it with one connection per 
filesystem that the
+    // partitions are on.
+    hdfsFS partition_fs_connection;
+    RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
+            partition.second.partition_base_dir, &partition_fs_connection,
+            &filesystem_connection_cache));
+
+    // Look up the partition in the descriptor table.
+    stringstream part_path_ss;
+    if (partition.second.id == -1) {
+      // If this is a non-existant partition, use the default partition 
location of
+      // <base_dir>/part_key_1=val/part_key_2=val/...
+      part_path_ss << params.hdfs_base_dir << "/" << partition.first;
+    } else {
+      HdfsPartitionDescriptor* part = 
hdfs_table->GetPartition(partition.second.id);
+      DCHECK(part != nullptr)
+          << "table_id=" << hdfs_table->id() << " partition_id=" << 
partition.second.id;
+      part_path_ss << part->location();
+    }
+    const string& part_path = part_path_ss.str();
+    bool is_s3_path = IsS3APath(part_path.c_str());
+
+    // If this is an overwrite insert, we will need to delete any updated 
partitions
+    if (params.is_overwrite) {
+      if (partition.first.empty()) {
+        // If the root directory is written to, then the table must not be 
partitioned
+        DCHECK(per_partition_status_.size() == 1);
+        // We need to be a little more careful, and only delete data files in 
the root
+        // because the tmp directories the sink(s) wrote are there also.
+        // So only delete files in the table directory - all files are treated 
as data
+        // files by Hive and Impala, but directories are ignored (and may 
legitimately
+        // be used to store permanent non-table data by other applications).
+        int num_files = 0;
+        // hfdsListDirectory() only sets errno if there is an error, but it 
doesn't set
+        // it to 0 if the call succeed. When there is no error, errno could be 
any
+        // value. So need to clear errno before calling it.
+        // Once HDFS-8407 is fixed, the errno reset won't be needed.
+        errno = 0;
+        hdfsFileInfo* existing_files =
+            hdfsListDirectory(partition_fs_connection, part_path.c_str(), 
&num_files);
+        if (existing_files == nullptr && errno == EAGAIN) {
+          errno = 0;
+          existing_files =
+              hdfsListDirectory(partition_fs_connection, part_path.c_str(), 
&num_files);
+        }
+        // hdfsListDirectory() returns nullptr not only when there is an error 
but also
+        // when the directory is empty(HDFS-8407). Need to check errno to make 
sure
+        // the call fails.
+        if (existing_files == nullptr && errno != 0) {
+          return Status(GetHdfsErrorMsg("Could not list directory: ", 
part_path));
+        }
+        for (int i = 0; i < num_files; ++i) {
+          const string filename =
+              
boost::filesystem::path(existing_files[i].mName).filename().string();
+          if (existing_files[i].mKind == kObjectKindFile && 
!IsHiddenFile(filename)) {
+            partition_create_ops.Add(DELETE, existing_files[i].mName);
+          }
+        }
+        hdfsFreeFileInfo(existing_files, num_files);
+      } else {
+        // This is a partition directory, not the root directory; we can delete
+        // recursively with abandon, after checking that it ever existed.
+        // TODO: There's a potential race here between checking for the 
directory
+        // and a third-party deleting it.
+        if (FLAGS_insert_inherit_permissions && !is_s3_path) {
+          // There is no directory structure in S3, so "inheriting" 
permissions is not
+          // possible.
+          // TODO: Try to mimic inheriting permissions for S3.
+          PopulatePathPermissionCache(
+              partition_fs_connection, part_path, &permissions_cache);
+        }
+        // S3 doesn't have a directory structure, so we technically wouldn't 
need to
+        // CREATE_DIR on S3. However, libhdfs always checks if a path exists 
before
+        // carrying out an operation on that path. So we still need to call 
CREATE_DIR
+        // before we access that path due to this limitation.
+        if (hdfsExists(partition_fs_connection, part_path.c_str()) != -1) {
+          partition_create_ops.Add(DELETE_THEN_CREATE, part_path);
+        } else {
+          // Otherwise just create the directory.
+          partition_create_ops.Add(CREATE_DIR, part_path);
+        }
+      }
+    } else if (!is_s3_path || !s3_skip_insert_staging) {
+      // If the S3_SKIP_INSERT_STAGING query option is set, then the partition 
directories
+      // would have already been created by the table sinks.
+      if (FLAGS_insert_inherit_permissions && !is_s3_path) {
+        PopulatePathPermissionCache(
+            partition_fs_connection, part_path, &permissions_cache);
+      }
+      if (hdfsExists(partition_fs_connection, part_path.c_str()) == -1) {
+        partition_create_ops.Add(CREATE_DIR, part_path);
+      }
+    }
+  }
+
+  {
+    SCOPED_TIMER(ADD_CHILD_TIMER(profile, "Overwrite/PartitionCreationTimer",
+            "FinalizationTimer"));
+    if (!partition_create_ops.Execute(
+            ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
+      for (const HdfsOperationSet::Error& err: partition_create_ops.errors()) {
+        // It's ok to ignore errors creating the directories, since they may 
already
+        // exist. If there are permission errors, we'll run into them later.
+        if (err.first->op() != CREATE_DIR) {
+          return Status(Substitute(
+                  "Error(s) deleting partition directories. First error (of 
$0) was: $1",
+                  partition_create_ops.errors().size(), err.second));
+        }
+      }
+    }
+  }
+
+  // 3. Move all tmp files
+  HdfsOperationSet move_ops(&filesystem_connection_cache);
+  HdfsOperationSet dir_deletion_ops(&filesystem_connection_cache);
+
+  for (FileMoveMap::value_type& move: files_to_move_) {
+    // Empty destination means delete, so this is a directory. These get 
deleted in a
+    // separate pass to ensure that we have moved all the contents of the 
directory first.
+    if (move.second.empty()) {
+      VLOG_ROW << "Deleting file: " << move.first;
+      dir_deletion_ops.Add(DELETE, move.first);
+    } else {
+      VLOG_ROW << "Moving tmp file: " << move.first << " to " << move.second;
+      if (FilesystemsMatch(move.first.c_str(), move.second.c_str())) {
+        move_ops.Add(RENAME, move.first, move.second);
+      } else {
+        move_ops.Add(MOVE, move.first, move.second);
+      }
+    }
+  }
+
+  {
+    SCOPED_TIMER(ADD_CHILD_TIMER(profile, "FileMoveTimer", 
"FinalizationTimer"));
+    if (!move_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), 
false)) {
+      stringstream ss;
+      ss << "Error(s) moving partition files. First error (of "
+         << move_ops.errors().size() << ") was: " << 
move_ops.errors()[0].second;
+      return Status(ss.str());
+    }
+  }
+
+  // 4. Delete temp directories
+  {
+    SCOPED_TIMER(ADD_CHILD_TIMER(profile, "FileDeletionTimer", 
"FinalizationTimer"));
+    if 
(!dir_deletion_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), 
false)) {
+      stringstream ss;
+      ss << "Error(s) deleting staging directories. First error (of "
+         << dir_deletion_ops.errors().size() << ") was: "
+         << dir_deletion_ops.errors()[0].second;
+      return Status(ss.str());
+    }
+  }
+
+  // 5. Optionally update the permissions of the created partition directories
+  // Do this last so that we don't make a dir unwritable before we write to it.
+  if (FLAGS_insert_inherit_permissions) {
+    HdfsOperationSet chmod_ops(&filesystem_connection_cache);
+    for (const PermissionCache::value_type& perm: permissions_cache) {
+      bool new_dir = perm.second.first;
+      if (new_dir) {
+        short permissions = perm.second.second;
+        VLOG_QUERY << "INSERT created new directory: " << perm.first
+                   << ", inherited permissions are: " << oct << permissions;
+        chmod_ops.Add(CHMOD, perm.first, permissions);
+      }
+    }
+    if (!chmod_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), 
false)) {
+      stringstream ss;
+      ss << "Error(s) setting permissions on newly created partition 
directories. First"
+         << " error (of " << chmod_ops.errors().size() << ") was: "
+         << chmod_ops.errors()[0].second;
+      return Status(ss.str());
+    }
+  }
+  return Status::OK();
+}
+
+void DmlExecState::PopulatePathPermissionCache(hdfsFS fs, const string& 
path_str,
+    PermissionCache* permissions_cache) {
+  // Find out if the path begins with a hdfs:// -style prefix, and remove it 
and the
+  // location (e.g. host:port) if so.
+  int scheme_end = path_str.find("://");
+  string stripped_str;
+  if (scheme_end != string::npos) {
+    // Skip past the subsequent location:port/ prefix.
+    stripped_str = path_str.substr(path_str.find("/", scheme_end + 3));
+  } else {
+    stripped_str = path_str;
+  }
+
+  // Get the list of path components, used to build all path prefixes.
+  vector<string> components;
+  split(components, stripped_str, is_any_of("/"));
+
+  // Build a set of all prefixes (including the complete string) of 
stripped_path. So
+  // /a/b/c/d leads to a vector of: /a, /a/b, /a/b/c, /a/b/c/d
+  vector<string> prefixes;
+  // Stores the current prefix
+  stringstream accumulator;
+  for (const string& component: components) {
+    if (component.empty()) continue;
+    accumulator << "/" << component;
+    prefixes.push_back(accumulator.str());
+  }
+
+  // Now for each prefix, stat() it to see if a) it exists and b) if so what 
its
+  // permissions are. When we meet a directory that doesn't exist, we record 
the fact that
+  // we need to create it, and the permissions of its parent dir to inherit.
+  //
+  // Every prefix is recorded in the PermissionCache so we don't do more than 
one stat()
+  // for each path. If we need to create the directory, we record it as the 
pair (true,
+  // perms) so that the caller can identify which directories need their 
permissions
+  // explicitly set.
+
+  // Set to the permission of the immediate parent (i.e. the permissions to 
inherit if the
+  // current dir doesn't exist).
+  short permissions = 0;
+  for (const string& path: prefixes) {
+    PermissionCache::const_iterator it = permissions_cache->find(path);
+    if (it == permissions_cache->end()) {
+      hdfsFileInfo* info = hdfsGetPathInfo(fs, path.c_str());
+      if (info != nullptr) {
+        // File exists, so fill the cache with its current permissions.
+        permissions_cache->insert(
+            make_pair(path, make_pair(false, info->mPermissions)));
+        permissions = info->mPermissions;
+        hdfsFreeFileInfo(info, 1);
+      } else {
+        // File doesn't exist, so we need to set its permissions to its 
immediate parent
+        // once it's been created.
+        permissions_cache->insert(make_pair(path, make_pair(true, 
permissions)));
+      }
+    } else {
+      permissions = it->second.second;
+    }
+  }
+}
+
+bool DmlExecState::ToThrift(TInsertExecStatus* dml_status) {
+  lock_guard<mutex> l(lock_);
+  bool set_thrift = false;
+  if (files_to_move_.size() > 0) {
+    dml_status->__set_files_to_move(files_to_move_);
+    set_thrift = true;
+  }
+  if (per_partition_status_.size() > 0) {
+    dml_status->__set_per_partition_status(per_partition_status_);
+    set_thrift = true;
+  }
+  return set_thrift;
+}
+
+void DmlExecState::ToTInsertResult(TInsertResult* insert_result) {
+  lock_guard<mutex> l(lock_);
+  int64_t num_row_errors = 0;
+  bool has_kudu_stats = false;
+  for (const PartitionStatusMap::value_type& v: per_partition_status_) {
+    insert_result->rows_modified[v.first] = v.second.num_modified_rows;
+    if (v.second.__isset.stats && v.second.stats.__isset.kudu_stats) {
+      has_kudu_stats = true;
+    }
+    num_row_errors += v.second.stats.kudu_stats.num_row_errors;
+  }
+  if (has_kudu_stats) insert_result->__set_num_row_errors(num_row_errors);
+}
+
+void DmlExecState::AddPartition(
+    const string& name, int64_t id, const string* base_dir) {
+  lock_guard<mutex> l(lock_);
+  DCHECK(per_partition_status_.find(name) == per_partition_status_.end());
+  TInsertPartitionStatus status;
+  status.__set_num_modified_rows(0L);
+  status.__set_id(id);
+  status.__isset.stats = true;
+  if (base_dir != nullptr) status.__set_partition_base_dir(*base_dir);
+  per_partition_status_.insert(make_pair(name, status));
+}
+
+void DmlExecState::UpdatePartition(const string& partition_name,
+    int64_t num_modified_rows_delta, const TInsertStats* insert_stats) {
+  lock_guard<mutex> l(lock_);
+  PartitionStatusMap::iterator entry = 
per_partition_status_.find(partition_name);
+  DCHECK(entry != per_partition_status_.end());
+  entry->second.num_modified_rows += num_modified_rows_delta;
+  if (insert_stats == nullptr) return;
+  MergeDmlStats(*insert_stats, &entry->second.stats);
+}
+
+void DmlExecState::MergeDmlStats(const TInsertStats& src, TInsertStats* dst) {
+  dst->bytes_written += src.bytes_written;
+  if (src.__isset.kudu_stats) {
+    dst->__isset.kudu_stats = true;
+    if (!dst->kudu_stats.__isset.num_row_errors) {
+      dst->kudu_stats.__set_num_row_errors(0);
+    }
+    dst->kudu_stats.__set_num_row_errors(
+        dst->kudu_stats.num_row_errors + src.kudu_stats.num_row_errors);
+  }
+  if (src.__isset.parquet_stats) {
+    if (dst->__isset.parquet_stats) {
+      MergeMapValues<string, int64_t>(src.parquet_stats.per_column_size,
+          &dst->parquet_stats.per_column_size);
+    } else {
+      dst->__set_parquet_stats(src.parquet_stats);
+    }
+  }
+}
+
+void DmlExecState::InitForKuduDml() {
+  // For Kudu, track only one set of DML stats, so use the ROOT_PARTITION_KEY.
+  const string& partition_name = 
g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
+  lock_guard<mutex> l(lock_);
+  DCHECK(per_partition_status_.find(partition_name) == 
per_partition_status_.end());
+  TInsertPartitionStatus status;
+  status.__set_num_modified_rows(0L);
+  status.__set_id(-1L);
+  status.__isset.stats = true;
+  status.stats.__isset.kudu_stats = true;
+  per_partition_status_.insert(make_pair(partition_name, status));
+}
+
+void DmlExecState::SetKuduDmlStats(int64_t num_modified_rows, int64_t 
num_row_errors,
+    int64_t latest_ts) {
+  // For Kudu, track only one set of DML stats, so use the ROOT_PARTITION_KEY.
+  const string& partition_name = 
g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
+  lock_guard<mutex> l(lock_);
+  PartitionStatusMap::iterator entry = 
per_partition_status_.find(partition_name);
+  DCHECK(entry != per_partition_status_.end());
+  entry->second.__set_num_modified_rows(num_modified_rows);
+  entry->second.stats.kudu_stats.__set_num_row_errors(num_row_errors);
+  entry->second.__set_kudu_latest_observed_ts(latest_ts);
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/dml-exec-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/dml-exec-state.h b/be/src/runtime/dml-exec-state.h
new file mode 100644
index 0000000..728284a
--- /dev/null
+++ b/be/src/runtime/dml-exec-state.h
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_RUNTIME_DML_EXEC_STATE_H
+#define IMPALA_RUNTIME_DML_EXEC_STATE_H
+
+#include <string>
+#include <map>
+#include <boost/unordered_map.hpp>
+#include <boost/thread/mutex.hpp>
+
+#include "common/hdfs.h"
+#include "common/status.h"
+
+namespace impala {
+
+class TInsertExecStatus;
+class TInsertResult;
+class TInsertStats;
+class TFinalizeParams;
+class TUpdateCatalogRequest;
+class TInsertPartitionStatus;
+class RuntimeProfile;
+class HdfsTableDescriptor;
+
+/// DmlExecState manages the state related to the execution of a DML statement
+/// (creation of new files, new partitions, etc.).
+///
+/// During DML execution, the table sink adds per-partition status using 
AddPartition()
+/// and then UpdatePartition() for non-Kudu tables.  For Kudu tables, the sink 
adds DML
+/// stats using InitForKuduDml() followed by SetKuduDmlStats().  In the case 
of the
+/// HDFS sink, it will also record the collection of files that should be 
moved by the
+/// coordinator on finalization using AddFileToMove().
+///
+/// The state is then serialized to thrift and merged at the coordinator using
+/// Update().  The coordinator will then use OutputPartitionStats(),
+/// GetKuduLatestObservedTimestamp(), PrepareCatalogUpdate() and 
FinalizeHdfsInsert()
+/// to perform various finalization tasks.
+///
+
+/// Thread-safe.
+class DmlExecState {
+ public:
+  /// Merge values from 'dml_exec_status'.
+  void Update(const TInsertExecStatus& dml_exec_status);
+
+  /// Add a new partition with the given parameters. Ignores 'base_dir' if 
nullptr.
+  /// It is an error to call this for an existing partition.
+  void AddPartition(const std::string& name, int64_t id, const std::string* 
base_dir);
+
+  /// Merge given values into stats for partition with name 'partition_name'.
+  /// Ignores 'insert_stats' if nullptr.
+  /// Requires that the partition already exist.
+  void UpdatePartition(const std::string& partition_name,
+      int64_t num_modified_rows_delta, const TInsertStats* insert_stats);
+
+  /// Used to initialize this state when execute Kudu DML. Must be called 
before
+  /// SetKuduDmlStats().
+  void InitForKuduDml();
+
+  /// Update stats for a Kudu DML sink. Requires that InitForKuduDml() was 
already called.
+  void SetKuduDmlStats(int64_t num_modified_rows, int64_t num_row_errors,
+      int64_t latest_ts);
+
+  /// Adds new file/location to the move map.
+  void AddFileToMove(const std::string& file_name, const std::string& 
location);
+
+  /// Outputs the partition stats to a string.
+  std::string OutputPartitionStats(const std::string& prefix);
+
+  /// Returns the latest Kudu timestamp observed across any backends where DML 
into Kudu
+  /// was executed, or 0 if there were no Kudu timestamps reported.
+  uint64_t GetKuduLatestObservedTimestamp();
+
+  /// Return the total number of modified rows across all partitions.
+  int64_t GetNumModifiedRows();
+
+  /// Populates 'catalog_update' with PartitionStatusMap data.
+  /// Returns true if a catalog update is required, false otherwise.
+  bool PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update);
+
+  /// For HDFS (and other Hadoop FileSystem) INSERT, moves all temporary 
staging files
+  /// to their final destinations, as indicated by 'params', and creates new 
partitions
+  /// for 'hdfs_table' as required.  Adds child timers to profile for the 
various
+  /// stages of finalization.  If the table is on an S3 path and
+  /// 's3_skip_insert_staging' is true, does not create new partition 
directories.
+  Status FinalizeHdfsInsert(const TFinalizeParams& params, bool 
s3_skip_insert_staging,
+      HdfsTableDescriptor* hdfs_table, RuntimeProfile* profile) 
WARN_UNUSED_RESULT;
+
+  // Serialize to thrift. Returns true if any fields of 'dml_status' were set.
+  bool ToThrift(TInsertExecStatus* dml_status);
+
+  // Populates 'insert_result' with PartitionStatusMap data, for Impala's 
extension of
+  // Beeswax.
+  void ToTInsertResult(TInsertResult* insert_result);
+
+ private:
+  // protects all fields below
+  boost::mutex lock_;
+
+  /// Counts how many rows an DML query has added to a particular partition 
(partitions
+  /// are identified by their partition keys: k1=v1/k2=v2 etc. Unpartitioned 
tables
+  /// have a single 'default' partition which is identified by 
ROOT_PARTITION_KEY.
+  /// Uses ordered map so that iteration order is deterministic.
+  typedef std::map<std::string, TInsertPartitionStatus> PartitionStatusMap;
+  PartitionStatusMap per_partition_status_;
+
+  /// Tracks files to move from a temporary (key) to a final destination 
(value) as
+  /// part of query finalization. If the destination is empty, the file is to 
be
+  /// deleted.  Uses ordered map so that iteration order is deterministic.
+  typedef std::map<std::string, std::string> FileMoveMap;
+  FileMoveMap files_to_move_;
+
+  /// Determines what the permissions of directories created by INSERT 
statements should
+  /// be if permission inheritance is enabled. Populates a map from all 
prefixes of
+  /// 'path_str' (including the full path itself) which is a path in Hdfs, to 
pairs
+  /// (does_not_exist, permissions), where does_not_exist is true if the path 
does not
+  /// exist in Hdfs. If does_not_exist is true, permissions is set to the 
permissions of
+  /// the most immediate ancestor of the path that does exist, i.e. the 
permissions that
+  /// the path should inherit when created. Otherwise permissions is set to 
the actual
+  /// permissions of the path. The PermissionCache argument is also used to 
cache the
+  /// output across repeated calls, to avoid repeatedly calling 
hdfsGetPathInfo() on the
+  /// same path.
+  typedef boost::unordered_map<std::string, std::pair<bool, short>> 
PermissionCache;
+  void PopulatePathPermissionCache(hdfsFS fs, const std::string& path_str,
+      PermissionCache* permissions_cache);
+
+  /// Merge 'src' into 'dst'. Not thread-safe.
+  void MergeDmlStats(const TInsertStats& src, TInsertStats* dst);
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 1cc646d..ad5748f 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -236,18 +236,9 @@ void QueryState::ReportExecStatusAux(bool done, const 
Status& status,
     // Only send updates to insert status if fragment is finished, the 
coordinator waits
     // until query execution is done to use them anyhow.
     RuntimeState* state = fis->runtime_state();
-    if (done && (state->hdfs_files_to_move()->size() > 0
-        || state->per_partition_status()->size() > 0)) {
-      TInsertExecStatus insert_status;
-      if (state->hdfs_files_to_move()->size() > 0) {
-        insert_status.__set_files_to_move(*state->hdfs_files_to_move());
-      }
-      if (state->per_partition_status()->size() > 0) {
-        
insert_status.__set_per_partition_status(*state->per_partition_status());
-      }
-      params.__set_insert_exec_status(insert_status);
+    if (done && state->dml_exec_state()->ToThrift(&params.insert_exec_status)) 
{
+      params.__isset.insert_exec_status = true;
     }
-
     // Send new errors to coordinator
     state->GetUnreportedErrors(&params.error_log);
     params.__isset.error_log = (params.error_log.size() > 0);

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc 
b/be/src/runtime/runtime-filter-bank.cc
index 239e066..4e23a42 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -27,6 +27,7 @@
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
 #include "runtime/runtime-filter.inline.h"
+#include "runtime/runtime-state.h"
 #include "service/impala-server.h"
 #include "util/bit-util.h"
 #include "util/bloom-filter.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index cd2b061..4b005b2 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -27,6 +27,7 @@
 #include "common/global-types.h"  // for PlanNodeId
 #include "runtime/client-cache-types.h"
 #include "runtime/thread-resource-mgr.h"
+#include "runtime/dml-exec-state.h"
 #include "util/runtime-profile.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 
@@ -56,22 +57,6 @@ namespace io {
   class DiskIoMgr;
 }
 
-/// TODO: move the typedefs into a separate .h (and fix the includes for that)
-
-/// Counts how many rows an INSERT query has added to a particular partition
-/// (partitions are identified by their partition keys: k1=v1/k2=v2
-/// etc. Unpartitioned tables have a single 'default' partition which is
-/// identified by ROOT_PARTITION_KEY.
-typedef std::map<std::string, TInsertPartitionStatus> PartitionStatusMap;
-
-/// Stats per partition for insert queries. They key is the same as for 
PartitionRowCount
-typedef std::map<std::string, TInsertStats> PartitionInsertStats;
-
-/// Tracks files to move from a temporary (key) to a final destination (value) 
as
-/// part of query finalization. If the destination is empty, the file is to be
-/// deleted.
-typedef std::map<std::string, std::string> FileMoveMap;
-
 /// A collection of items that are part of the global state of a query and 
shared across
 /// all execution nodes of that query. After initialisation, callers must call
 /// ReleaseResources() to ensure that all resources are correctly freed before
@@ -133,8 +118,6 @@ class RuntimeState {
   }
   ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; }
 
-  FileMoveMap* hdfs_files_to_move() { return &hdfs_files_to_move_; }
-
   void set_fragment_root_id(PlanNodeId id) {
     DCHECK_EQ(root_node_id_, -1) << "Should not set this twice.";
     root_node_id_ = id;
@@ -146,7 +129,7 @@ class RuntimeState {
 
   RuntimeFilterBank* filter_bank() { return filter_bank_.get(); }
 
-  PartitionStatusMap* per_partition_status() { return &per_partition_status_; }
+  DmlExecState* dml_exec_state() { return &dml_exec_state_; }
 
   /// Returns runtime state profile
   RuntimeProfile* runtime_profile() { return profile_; }
@@ -341,12 +324,8 @@ class RuntimeState {
   /// state is responsible for returning this pool to the thread mgr.
   ThreadResourceMgr::ResourcePool* resource_pool_ = nullptr;
 
-  /// Temporary Hdfs files created, and where they should be moved to 
ultimately.
-  /// Mapping a filename to a blank destination causes it to be deleted.
-  FileMoveMap hdfs_files_to_move_;
-
-  /// Records summary statistics for the results of inserts into Hdfs 
partitions.
-  PartitionStatusMap per_partition_status_;
+  /// Execution state for DML statements.
+  DmlExecState dml_exec_state_;
 
   RuntimeProfile* const profile_;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index ac6178c..2aedcab 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -21,6 +21,7 @@
 #include <limits>
 #include <gutil/strings/substitute.h>
 
+#include "runtime/coordinator.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
@@ -570,7 +571,8 @@ void ClientRequestState::Done() {
   // must happen before taking lock_ below.
   if (coord_.get() != NULL) {
     // This is safe to access on coord_ after Wait() has been called.
-    uint64_t latest_kudu_ts = coord_->GetLatestKuduInsertTimestamp();
+    uint64_t latest_kudu_ts =
+        coord_->dml_exec_state()->GetKuduLatestObservedTimestamp();
     if (latest_kudu_ts > 0) {
       VLOG_RPC << "Updating session (id=" << session_id()  << ") with latest "
                << "observed Kudu timestamp: " << latest_kudu_ts;
@@ -918,7 +920,7 @@ Status ClientRequestState::UpdateCatalog() {
     catalog_update.__set_sync_ddl(exec_request().query_options.sync_ddl);
     catalog_update.__set_header(TCatalogServiceRequestHeader());
     catalog_update.header.__set_requesting_user(effective_user());
-    if (!coord()->PrepareCatalogUpdate(&catalog_update)) {
+    if (!coord()->dml_exec_state()->PrepareCatalogUpdate(&catalog_update)) {
       VLOG_QUERY << "No partitions altered, not updating metastore (query id: "
                  << query_id() << ")";
     } else {
@@ -1027,9 +1029,7 @@ void 
ClientRequestState::SetCreateTableAsSelectResultSet() {
   // operation.
   if (catalog_op_executor_->ddl_exec_response()->new_table_created) {
     DCHECK(coord_.get());
-    for (const PartitionStatusMap::value_type& p: 
coord_->per_partition_status()) {
-      total_num_rows_inserted += p.second.num_modified_rows;
-    }
+    total_num_rows_inserted = coord_->dml_exec_state()->GetNumModifiedRows();
   }
   const string& summary_msg = Substitute("Inserted $0 row(s)", 
total_num_rows_inserted);
   VLOG_QUERY << summary_msg;

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/client-request-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.h 
b/be/src/service/client-request-state.h
index 0c05bc4..657f3de 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -24,6 +24,7 @@
 #include "scheduling/query-schedule.h"
 #include "service/child-query.h"
 #include "service/impala-server.h"
+#include "service/query-result-set.h"
 #include "util/auth-util.h"
 #include "util/condition-variable.h"
 #include "util/runtime-profile.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc 
b/be/src/service/impala-beeswax-server.cc
index b827ff3..c441285 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -20,10 +20,12 @@
 #include "common/logging.h"
 #include "gen-cpp/Frontend_types.h"
 #include "rpc/thrift-util.h"
+#include "runtime/coordinator.h"
 #include "runtime/exec-env.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/timestamp-value.h"
 #include "service/client-request-state.h"
+#include "service/frontend.h"
 #include "service/query-options.h"
 #include "service/query-result-set.h"
 #include "util/auth-util.h"
@@ -563,22 +565,8 @@ Status ImpalaServer::CloseInsertInternal(const TUniqueId& 
query_id,
       // Note that when IMPALA-87 is fixed (INSERT without FROM clause) we 
might
       // need to revisit this, since that might lead us to insert a row 
without a
       // coordinator, depending on how we choose to drive the table sink.
-      int64_t num_row_errors = 0;
-      bool has_kudu_stats = false;
       if (request_state->coord() != nullptr) {
-        for (const PartitionStatusMap::value_type& v:
-             request_state->coord()->per_partition_status()) {
-          const pair<string, TInsertPartitionStatus> partition_status = v;
-          insert_result->rows_modified[partition_status.first] =
-              partition_status.second.num_modified_rows;
-
-          if (partition_status.second.__isset.stats &&
-              partition_status.second.stats.__isset.kudu_stats) {
-            has_kudu_stats = true;
-          }
-          num_row_errors += 
partition_status.second.stats.kudu_stats.num_row_errors;
-        }
-        if (has_kudu_stats) 
insert_result->__set_num_row_errors(num_row_errors);
+        
request_state->coord()->dml_exec_state()->ToTInsertResult(insert_result);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc 
b/be/src/service/impala-hs2-server.cc
index 4381f81..80ace87 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -34,6 +34,7 @@
 #include "common/logging.h"
 #include "common/version.h"
 #include "rpc/thrift-util.h"
+#include "runtime/coordinator.h"
 #include "runtime/raw-value.h"
 #include "runtime/exec-env.h"
 #include "service/hs2-util.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc 
b/be/src/service/impala-http-handler.cc
index 43b03d1..3841bfe 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -24,6 +24,7 @@
 
 #include "catalog/catalog-util.h"
 #include "gen-cpp/beeswax_types.h"
+#include "runtime/coordinator.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
@@ -31,6 +32,7 @@
 #include "runtime/timestamp-value.inline.h"
 #include "service/impala-server.h"
 #include "service/client-request-state.h"
+#include "service/frontend.h"
 #include "thrift/protocol/TDebugProtocol.h"
 #include "util/coding-util.h"
 #include "util/logging-support.h"

Reply via email to