IMPALA-5384, part 1: introduce DmlExecState This change is based on a patch by Marcel Kornacker.
Move data structures that collect DML operation stats from the RuntimeState and Coordinator into a new InsertExecState class, which has it's own lock. This removes a dependency on the coordinator's lock, which will allow further coordinator locking cleanup in the next patch. Change-Id: Id4c025917620a7bff2acbeb46464f107ab4b7565 Reviewed-on: http://gerrit.cloudera.org:8080/9793 Reviewed-by: Dan Hecht <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/408ee4d6 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/408ee4d6 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/408ee4d6 Branch: refs/heads/master Commit: 408ee4d68cbb7217eba4b020064a1e878e412ce8 Parents: b78daed Author: Dan Hecht <[email protected]> Authored: Fri Mar 23 16:28:27 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Mar 29 06:29:16 2018 +0000 ---------------------------------------------------------------------- be/src/benchmarks/expr-benchmark.cc | 1 + be/src/exec/catalog-op-executor.cc | 1 + be/src/exec/data-sink.cc | 63 --- be/src/exec/data-sink.h | 9 - be/src/exec/hbase-table-sink.cc | 11 +- be/src/exec/hdfs-table-sink.cc | 38 +- be/src/exec/kudu-table-sink.cc | 22 +- be/src/exec/plan-root-sink.cc | 1 + be/src/runtime/CMakeLists.txt | 1 + be/src/runtime/coordinator-backend-state.cc | 11 +- be/src/runtime/coordinator-backend-state.h | 18 +- be/src/runtime/coordinator.cc | 361 ++--------------- be/src/runtime/coordinator.h | 84 +--- be/src/runtime/dml-exec-state.cc | 494 +++++++++++++++++++++++ be/src/runtime/dml-exec-state.h | 149 +++++++ be/src/runtime/query-state.cc | 13 +- be/src/runtime/runtime-filter-bank.cc | 1 + be/src/runtime/runtime-state.h | 29 +- be/src/service/client-request-state.cc | 10 +- be/src/service/client-request-state.h | 1 + be/src/service/impala-beeswax-server.cc | 18 +- be/src/service/impala-hs2-server.cc | 1 + be/src/service/impala-http-handler.cc | 2 + be/src/service/impala-server.cc | 2 + be/src/service/impala-server.h | 3 - be/src/testutil/in-process-servers.cc | 1 + 26 files changed, 750 insertions(+), 595 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/benchmarks/expr-benchmark.cc ---------------------------------------------------------------------- diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc index b10a70f..1b995a5 100644 --- a/be/src/benchmarks/expr-benchmark.cc +++ b/be/src/benchmarks/expr-benchmark.cc @@ -53,6 +53,7 @@ #include "runtime/mem-pool.h" #include "runtime/mem-tracker.h" #include "service/fe-support.h" +#include "service/frontend.h" #include "service/impala-server.h" #include "common/names.h" http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/catalog-op-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc index 12398cf..164187c 100644 --- a/be/src/exec/catalog-op-executor.cc +++ b/be/src/exec/catalog-op-executor.cc @@ -25,6 +25,7 @@ #include "runtime/lib-cache.h" #include "runtime/client-cache-types.h" #include "runtime/exec-env.h" +#include "service/frontend.h" #include "service/impala-server.h" #include "service/hs2-util.h" #include "util/string-parser.h" http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/data-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc index f8f068e..9140b3e 100644 --- a/be/src/exec/data-sink.cc +++ b/be/src/exec/data-sink.cc @@ -121,69 +121,6 @@ Status DataSink::Init(const vector<TExpr>& thrift_output_exprs, return ScalarExpr::Create(thrift_output_exprs, *row_desc_, state, &output_exprs_); } -void DataSink::MergeDmlStats(const TInsertStats& src_stats, - TInsertStats* dst_stats) { - dst_stats->bytes_written += src_stats.bytes_written; - if (src_stats.__isset.kudu_stats) { - if (!dst_stats->__isset.kudu_stats) dst_stats->__set_kudu_stats(TKuduDmlStats()); - if (!dst_stats->kudu_stats.__isset.num_row_errors) { - dst_stats->kudu_stats.__set_num_row_errors(0); - } - dst_stats->kudu_stats.__set_num_row_errors( - dst_stats->kudu_stats.num_row_errors + src_stats.kudu_stats.num_row_errors); - } - if (src_stats.__isset.parquet_stats) { - if (dst_stats->__isset.parquet_stats) { - MergeMapValues<string, int64_t>(src_stats.parquet_stats.per_column_size, - &dst_stats->parquet_stats.per_column_size); - } else { - dst_stats->__set_parquet_stats(src_stats.parquet_stats); - } - } -} - -string DataSink::OutputDmlStats(const PartitionStatusMap& stats, - const string& prefix) { - const char* indent = " "; - stringstream ss; - ss << prefix; - bool first = true; - for (const PartitionStatusMap::value_type& val: stats) { - if (!first) ss << endl; - first = false; - ss << "Partition: "; - - const string& partition_key = val.first; - if (partition_key == g_ImpalaInternalService_constants.ROOT_PARTITION_KEY) { - ss << "Default" << endl; - } else { - ss << partition_key << endl; - } - if (val.second.__isset.num_modified_rows) { - ss << "NumModifiedRows: " << val.second.num_modified_rows << endl; - } - - if (!val.second.__isset.stats) continue; - const TInsertStats& stats = val.second.stats; - if (stats.__isset.kudu_stats) { - ss << "NumRowErrors: " << stats.kudu_stats.num_row_errors << endl; - } - - ss << indent << "BytesWritten: " - << PrettyPrinter::Print(stats.bytes_written, TUnit::BYTES); - if (stats.__isset.parquet_stats) { - const TParquetInsertStats& parquet_stats = stats.parquet_stats; - ss << endl << indent << "Per Column Sizes:"; - for (map<string, int64_t>::const_iterator i = parquet_stats.per_column_size.begin(); - i != parquet_stats.per_column_size.end(); ++i) { - ss << endl << indent << indent << i->first << ": " - << PrettyPrinter::Print(i->second, TUnit::BYTES); - } - } - } - return ss.str(); -} - Status DataSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) { DCHECK(parent_mem_tracker != nullptr); DCHECK(profile_ != nullptr); http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/data-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h index d2e80af..605b46d 100644 --- a/be/src/exec/data-sink.h +++ b/be/src/exec/data-sink.h @@ -85,15 +85,6 @@ class DataSink { const TPlanFragmentInstanceCtx& fragment_instance_ctx, const RowDescriptor* row_desc, RuntimeState* state, DataSink** sink); - /// Merges one update to the DML stats for a partition. dst_stats will have the - /// combined stats of src_stats and dst_stats after this method returns. - static void MergeDmlStats(const TInsertStats& src_stats, - TInsertStats* dst_stats); - - /// Outputs the DML stats contained in the map of partition updates to a string - static std::string OutputDmlStats(const PartitionStatusMap& stats, - const std::string& prefix = ""); - MemTracker* mem_tracker() const { return mem_tracker_.get(); } RuntimeProfile* profile() const { return profile_; } const std::vector<ScalarExprEvaluator*>& output_expr_evals() const { http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/hbase-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc index 9e591ac..567e8bd 100644 --- a/be/src/exec/hbase-table-sink.cc +++ b/be/src/exec/hbase-table-sink.cc @@ -59,12 +59,7 @@ Status HBaseTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_track RETURN_IF_ERROR(hbase_table_writer_->Init(state)); // Add a 'root partition' status in which to collect insert statistics - TInsertPartitionStatus root_status; - root_status.__set_num_modified_rows(0L); - root_status.__set_stats(TInsertStats()); - root_status.__set_id(-1L); - state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, root_status)); - + state->dml_exec_state()->AddPartition(ROOT_PARTITION_KEY, -1L, nullptr); return Status::OK(); } @@ -74,8 +69,8 @@ Status HBaseTableSink::Send(RuntimeState* state, RowBatch* batch) { RETURN_IF_ERROR(state->CheckQueryState()); // Since everything is set up just forward everything to the writer. RETURN_IF_ERROR(hbase_table_writer_->AppendRows(batch)); - (*state->per_partition_status())[ROOT_PARTITION_KEY].num_modified_rows += - batch->num_rows(); + state->dml_exec_state()->UpdatePartition( + ROOT_PARTITION_KEY, batch->num_rows(), nullptr); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/hdfs-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index c12b711..ca08bac 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -403,8 +403,9 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state, COUNTER_ADD(files_created_counter_, 1); if (!ShouldSkipStaging(state, output_partition)) { - // Save the ultimate destination for this file (it will be moved by the coordinator) - (*state->hdfs_files_to_move())[output_partition->current_file_name] = final_location; + // Save the ultimate destination for this file (it will be moved by the coordinator). + state->dml_exec_state()->AddFileToMove( + output_partition->current_file_name, final_location); } ++output_partition->num_files; @@ -573,21 +574,15 @@ inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state, const Tuple return status; } - // Save the partition name so that the coordinator can create the partition directory - // structure if needed - DCHECK(state->per_partition_status()->find(partition->partition_name) == - state->per_partition_status()->end()); - TInsertPartitionStatus partition_status; - partition_status.__set_num_modified_rows(0L); - partition_status.__set_id(partition_descriptor->id()); - partition_status.__set_stats(TInsertStats()); - partition_status.__set_partition_base_dir(table_desc_->hdfs_base_dir()); - state->per_partition_status()->insert( - make_pair(partition->partition_name, partition_status)); + // Save the partition name so that the coordinator can create the partition + // directory structure if needed. + state->dml_exec_state()->AddPartition( + partition->partition_name, partition_descriptor->id(), + &table_desc_->hdfs_base_dir()); if (!no_more_rows && !ShouldSkipStaging(state, partition.get())) { - // Indicate that temporary directory is to be deleted after execution - (*state->hdfs_files_to_move())[partition->tmp_hdfs_dir_name] = ""; + // Indicate that temporary directory is to be deleted after execution. + state->dml_exec_state()->AddFileToMove(partition->tmp_hdfs_dir_name, ""); } partition_keys_to_output_partitions_[key].first = std::move(partition); @@ -643,17 +638,8 @@ Status HdfsTableSink::FinalizePartitionFile(RuntimeState* state, // OutputPartition writer could be nullptr if there is no row to output. if (partition->writer.get() != nullptr) { RETURN_IF_ERROR(partition->writer->Finalize()); - - // Track total number of appended rows per partition in runtime - // state. partition->num_rows counts number of rows appended is per-file. - PartitionStatusMap::iterator it = - state->per_partition_status()->find(partition->partition_name); - - // Should have been created in GetOutputPartition() when the partition was - // initialised. - DCHECK(it != state->per_partition_status()->end()); - it->second.num_modified_rows += partition->num_rows; - DataSink::MergeDmlStats(partition->writer->stats(), &it->second.stats); + state->dml_exec_state()->UpdatePartition( + partition->partition_name, partition->num_rows, &partition->writer->stats()); } RETURN_IF_ERROR(ClosePartitionFile(state, partition)); http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/kudu-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc index 05f1d06..67bb86e 100644 --- a/be/src/exec/kudu-table-sink.cc +++ b/be/src/exec/kudu-table-sink.cc @@ -61,9 +61,6 @@ using kudu::client::KuduError; namespace impala { -const static string& ROOT_PARTITION_KEY = - g_ImpalaInternalService_constants.ROOT_PARTITION_KEY; - // Send 7MB buffers to Kudu, matching a hard-coded size in Kudu (KUDU-1693). const static int INDIVIDUAL_BUFFER_SIZE = 7 * 1024 * 1024; @@ -92,15 +89,7 @@ Status KuduTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracke << "TableDescriptor must be an instance KuduTableDescriptor."; table_desc_ = static_cast<const KuduTableDescriptor*>(table_desc); - // Add a 'root partition' status in which to collect write statistics - TInsertPartitionStatus root_status; - root_status.__set_num_modified_rows(0L); - root_status.__set_id(-1L); - TKuduDmlStats kudu_dml_stats; - kudu_dml_stats.__set_num_row_errors(0L); - root_status.__set_stats(TInsertStats()); - root_status.stats.__set_kudu_stats(kudu_dml_stats); - state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, root_status)); + state->dml_exec_state()->InitForKuduDml(); // Add counters total_rows_ = ADD_COUNTER(profile(), "TotalNumRows", TUnit::UNIT); @@ -338,12 +327,9 @@ Status KuduTableSink::FlushFinal(RuntimeState* state) { VLOG_RPC << "Ignoring Flush() error status: " << flush_status.ToString(); } Status status = CheckForErrors(state); - TInsertPartitionStatus& insert_status = - (*state->per_partition_status())[ROOT_PARTITION_KEY]; - insert_status.__set_num_modified_rows( - total_rows_->value() - num_row_errors_->value()); - insert_status.stats.kudu_stats.__set_num_row_errors(num_row_errors_->value()); - insert_status.__set_kudu_latest_observed_ts(client_->GetLatestObservedTimestamp()); + state->dml_exec_state()->SetKuduDmlStats( + total_rows_->value() - num_row_errors_->value(), num_row_errors_->value(), + client_->GetLatestObservedTimestamp()); return status; } http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/plan-root-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc index 1cdc544..836a376 100644 --- a/be/src/exec/plan-root-sink.cc +++ b/be/src/exec/plan-root-sink.cc @@ -117,6 +117,7 @@ void PlanRootSink::Close(RuntimeState* state) { unique_lock<mutex> l(lock_); // No guarantee that FlushFinal() has been called, so need to mark sender_done_ here as // well. + // TODO: shouldn't this also set eos to true? do we need both eos and sender_done_? sender_done_ = true; consumer_cv_.NotifyAll(); // Wait for consumer to be done, in case sender tries to tear-down this sink while the http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 0d4b61c..89cbbb9 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -37,6 +37,7 @@ add_library(Runtime data-stream-sender.cc debug-options.cc descriptors.cc + dml-exec-state.cc exec-env.cc fragment-instance-state.cc hbase-table.cc http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/coordinator-backend-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc index 914a3e4..e8db00e 100644 --- a/be/src/runtime/coordinator-backend-state.cc +++ b/be/src/runtime/coordinator-backend-state.cc @@ -43,10 +43,7 @@ Coordinator::BackendState::BackendState( const TUniqueId& query_id, int state_idx, TRuntimeFilterMode::type filter_mode) : query_id_(query_id), state_idx_(state_idx), - filter_mode_(filter_mode), - rpc_latency_(0), - rpc_sent_(false), - peak_consumption_(0L) { + filter_mode_(filter_mode) { } void Coordinator::BackendState::Init( @@ -413,11 +410,7 @@ Coordinator::BackendState::InstanceStats::InstanceStats( const FInstanceExecParams& exec_params, FragmentStats* fragment_stats, ObjectPool* obj_pool) : exec_params_(exec_params), - profile_(nullptr), - done_(false), - profile_created_(false), - total_split_size_(0), - total_ranges_complete_(0) { + profile_(nullptr) { const string& profile_name = Substitute("Instance $0 (host=$1)", PrintId(exec_params.instance_id), lexical_cast<string>(exec_params.host)); profile_ = RuntimeProfile::Create(obj_pool, profile_name); http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/coordinator-backend-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h index 0973ca3..d2f122c 100644 --- a/be/src/runtime/coordinator-backend-state.h +++ b/be/src/runtime/coordinator-backend-state.h @@ -161,24 +161,24 @@ class Coordinator::BackendState { int64_t last_report_time_ms_ = 0; /// owned by coordinator object pool provided in the c'tor, created in Update() - RuntimeProfile* profile_; + RuntimeProfile* profile_ = nullptr; /// true if the final report has been received for the fragment instance. /// Used to handle duplicate done ReportExecStatus RPC messages. Used only /// in ApplyExecStatusReport() - bool done_; + bool done_ = false; /// true after the first call to profile->Update() - bool profile_created_; + bool profile_created_ = false; /// cumulative size of all splits; set in c'tor - int64_t total_split_size_; + int64_t total_split_size_ = 0; /// wall clock timer for this instance MonotonicStopWatch stopwatch_; /// total scan ranges complete across all scan nodes - int64_t total_ranges_complete_; + int64_t total_ranges_complete_ = 0; /// SCAN_RANGES_COMPLETE_COUNTERs in profile_ std::vector<RuntimeProfile::Counter*> scan_ranges_complete_counters_; @@ -215,7 +215,7 @@ class Coordinator::BackendState { boost::mutex lock_; // number of in-flight instances - int num_remaining_instances_; + int num_remaining_instances_ = 0; /// If the status indicates an error status, execution has either been aborted by the /// executing impalad (which then reported the error) or cancellation has been @@ -235,15 +235,15 @@ class Coordinator::BackendState { ErrorLogMap error_log_; /// Time, in ms, that it took to execute the ExecRemoteFragment() RPC. - int64_t rpc_latency_; + int64_t rpc_latency_ = 0; /// If true, ExecPlanFragment() rpc has been sent - even if it was not determined to be /// successful. - bool rpc_sent_; + bool rpc_sent_ = false; /// peak memory used for this query (value of that node's query memtracker's /// peak_consumption() - int64_t peak_consumption_; + int64_t peak_consumption_ = 0; /// Set in ApplyExecStatusReport(). Uses MonotonicMillis(). int64_t last_report_time_ms_ = 0; http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index a5d53b2..e6b3bca 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -63,10 +63,7 @@ using std::unique_ptr; DECLARE_int32(be_port); DECLARE_string(hostname); -DEFINE_bool(insert_inherit_permissions, false, "If true, new directories created by " - "INSERTs will inherit the permissions of their parent directories"); - -namespace impala { +using namespace impala; // Maximum number of fragment instances that can publish each broadcast filter. static const int MAX_BROADCAST_FILTER_PRODUCERS = 3; @@ -93,9 +90,6 @@ Status Coordinator::Exec() { const TQueryExecRequest& request = schedule_.request(); DCHECK(request.plan_exec_info.size() > 0); - needs_finalization_ = request.__isset.finalize_params; - if (needs_finalization_) finalize_params_ = request.finalize_params; - VLOG_QUERY << "Exec() query_id=" << schedule_.query_id() << " stmt=" << request.query_ctx.client_request.stmt; stmt_type_ = request.stmt_type; @@ -489,291 +483,32 @@ Status Coordinator::UpdateStatus(const Status& status, const string& backend_hos return query_status_; } -void Coordinator::PopulatePathPermissionCache(hdfsFS fs, const string& path_str, - PermissionCache* permissions_cache) { - // Find out if the path begins with a hdfs:// -style prefix, and remove it and the - // location (e.g. host:port) if so. - int scheme_end = path_str.find("://"); - string stripped_str; - if (scheme_end != string::npos) { - // Skip past the subsequent location:port/ prefix. - stripped_str = path_str.substr(path_str.find("/", scheme_end + 3)); - } else { - stripped_str = path_str; - } - - // Get the list of path components, used to build all path prefixes. - vector<string> components; - split(components, stripped_str, is_any_of("/")); - - // Build a set of all prefixes (including the complete string) of stripped_path. So - // /a/b/c/d leads to a vector of: /a, /a/b, /a/b/c, /a/b/c/d - vector<string> prefixes; - // Stores the current prefix - stringstream accumulator; - for (const string& component: components) { - if (component.empty()) continue; - accumulator << "/" << component; - prefixes.push_back(accumulator.str()); - } - - // Now for each prefix, stat() it to see if a) it exists and b) if so what its - // permissions are. When we meet a directory that doesn't exist, we record the fact that - // we need to create it, and the permissions of its parent dir to inherit. - // - // Every prefix is recorded in the PermissionCache so we don't do more than one stat() - // for each path. If we need to create the directory, we record it as the pair (true, - // perms) so that the caller can identify which directories need their permissions - // explicitly set. - - // Set to the permission of the immediate parent (i.e. the permissions to inherit if the - // current dir doesn't exist). - short permissions = 0; - for (const string& path: prefixes) { - PermissionCache::const_iterator it = permissions_cache->find(path); - if (it == permissions_cache->end()) { - hdfsFileInfo* info = hdfsGetPathInfo(fs, path.c_str()); - if (info != nullptr) { - // File exists, so fill the cache with its current permissions. - permissions_cache->insert( - make_pair(path, make_pair(false, info->mPermissions))); - permissions = info->mPermissions; - hdfsFreeFileInfo(info, 1); - } else { - // File doesn't exist, so we need to set its permissions to its immediate parent - // once it's been created. - permissions_cache->insert(make_pair(path, make_pair(true, permissions))); - } - } else { - permissions = it->second.second; - } - } -} - -Status Coordinator::FinalizeSuccessfulInsert() { - PermissionCache permissions_cache; - HdfsFsCache::HdfsFsMap filesystem_connection_cache; - HdfsOperationSet partition_create_ops(&filesystem_connection_cache); - - // INSERT finalization happens in the five following steps - // 1. If OVERWRITE, remove all the files in the target directory - // 2. Create all the necessary partition directories. - HdfsTableDescriptor* hdfs_table; - RETURN_IF_ERROR(DescriptorTbl::CreateHdfsTblDescriptor(query_ctx_.desc_tbl, - finalize_params_.table_id, obj_pool(), &hdfs_table)); - DCHECK(hdfs_table != nullptr) - << "INSERT target table not known in descriptor table: " - << finalize_params_.table_id; - - // Loop over all partitions that were updated by this insert, and create the set of - // filesystem operations required to create the correct partition structure on disk. - for (const PartitionStatusMap::value_type& partition: per_partition_status_) { - SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "Overwrite/PartitionCreationTimer", - "FinalizationTimer")); - // INSERT allows writes to tables that have partitions on multiple filesystems. - // So we need to open connections to different filesystems as necessary. We use a - // local connection cache and populate it with one connection per filesystem that the - // partitions are on. - hdfsFS partition_fs_connection; - RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection( - partition.second.partition_base_dir, &partition_fs_connection, - &filesystem_connection_cache)); - - // Look up the partition in the descriptor table. - stringstream part_path_ss; - if (partition.second.id == -1) { - // If this is a non-existant partition, use the default partition location of - // <base_dir>/part_key_1=val/part_key_2=val/... - part_path_ss << finalize_params_.hdfs_base_dir << "/" << partition.first; - } else { - HdfsPartitionDescriptor* part = hdfs_table->GetPartition(partition.second.id); - DCHECK(part != nullptr) - << "table_id=" << hdfs_table->id() << " partition_id=" << partition.second.id - << "\n" << PrintThrift(runtime_state()->instance_ctx()); - part_path_ss << part->location(); - } - const string& part_path = part_path_ss.str(); - bool is_s3_path = IsS3APath(part_path.c_str()); - - // If this is an overwrite insert, we will need to delete any updated partitions - if (finalize_params_.is_overwrite) { - if (partition.first.empty()) { - // If the root directory is written to, then the table must not be partitioned - DCHECK(per_partition_status_.size() == 1); - // We need to be a little more careful, and only delete data files in the root - // because the tmp directories the sink(s) wrote are there also. - // So only delete files in the table directory - all files are treated as data - // files by Hive and Impala, but directories are ignored (and may legitimately - // be used to store permanent non-table data by other applications). - int num_files = 0; - // hfdsListDirectory() only sets errno if there is an error, but it doesn't set - // it to 0 if the call succeed. When there is no error, errno could be any - // value. So need to clear errno before calling it. - // Once HDFS-8407 is fixed, the errno reset won't be needed. - errno = 0; - hdfsFileInfo* existing_files = - hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files); - if (existing_files == nullptr && errno == EAGAIN) { - errno = 0; - existing_files = - hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files); - } - // hdfsListDirectory() returns nullptr not only when there is an error but also - // when the directory is empty(HDFS-8407). Need to check errno to make sure - // the call fails. - if (existing_files == nullptr && errno != 0) { - return Status(GetHdfsErrorMsg("Could not list directory: ", part_path)); - } - for (int i = 0; i < num_files; ++i) { - const string filename = path(existing_files[i].mName).filename().string(); - if (existing_files[i].mKind == kObjectKindFile && !IsHiddenFile(filename)) { - partition_create_ops.Add(DELETE, existing_files[i].mName); - } - } - hdfsFreeFileInfo(existing_files, num_files); - } else { - // This is a partition directory, not the root directory; we can delete - // recursively with abandon, after checking that it ever existed. - // TODO: There's a potential race here between checking for the directory - // and a third-party deleting it. - if (FLAGS_insert_inherit_permissions && !is_s3_path) { - // There is no directory structure in S3, so "inheriting" permissions is not - // possible. - // TODO: Try to mimic inheriting permissions for S3. - PopulatePathPermissionCache( - partition_fs_connection, part_path, &permissions_cache); - } - // S3 doesn't have a directory structure, so we technically wouldn't need to - // CREATE_DIR on S3. However, libhdfs always checks if a path exists before - // carrying out an operation on that path. So we still need to call CREATE_DIR - // before we access that path due to this limitation. - if (hdfsExists(partition_fs_connection, part_path.c_str()) != -1) { - partition_create_ops.Add(DELETE_THEN_CREATE, part_path); - } else { - // Otherwise just create the directory. - partition_create_ops.Add(CREATE_DIR, part_path); - } - } - } else if (!is_s3_path - || !query_ctx_.client_request.query_options.s3_skip_insert_staging) { - // If the S3_SKIP_INSERT_STAGING query option is set, then the partition directories - // would have already been created by the table sinks. - if (FLAGS_insert_inherit_permissions && !is_s3_path) { - PopulatePathPermissionCache( - partition_fs_connection, part_path, &permissions_cache); - } - if (hdfsExists(partition_fs_connection, part_path.c_str()) == -1) { - partition_create_ops.Add(CREATE_DIR, part_path); - } - } - } - - // We're done with the HDFS descriptor - free up its resources. - hdfs_table->ReleaseResources(); - hdfs_table = nullptr; - - { - SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "Overwrite/PartitionCreationTimer", - "FinalizationTimer")); - if (!partition_create_ops.Execute( - ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) { - for (const HdfsOperationSet::Error& err: partition_create_ops.errors()) { - // It's ok to ignore errors creating the directories, since they may already - // exist. If there are permission errors, we'll run into them later. - if (err.first->op() != CREATE_DIR) { - return Status(Substitute( - "Error(s) deleting partition directories. First error (of $0) was: $1", - partition_create_ops.errors().size(), err.second)); - } - } - } - } - - // 3. Move all tmp files - HdfsOperationSet move_ops(&filesystem_connection_cache); - HdfsOperationSet dir_deletion_ops(&filesystem_connection_cache); - - for (FileMoveMap::value_type& move: files_to_move_) { - // Empty destination means delete, so this is a directory. These get deleted in a - // separate pass to ensure that we have moved all the contents of the directory first. - if (move.second.empty()) { - VLOG_ROW << "Deleting file: " << move.first; - dir_deletion_ops.Add(DELETE, move.first); - } else { - VLOG_ROW << "Moving tmp file: " << move.first << " to " << move.second; - if (FilesystemsMatch(move.first.c_str(), move.second.c_str())) { - move_ops.Add(RENAME, move.first, move.second); - } else { - move_ops.Add(MOVE, move.first, move.second); - } - } - } - - { - SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "FileMoveTimer", "FinalizationTimer")); - if (!move_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) { - stringstream ss; - ss << "Error(s) moving partition files. First error (of " - << move_ops.errors().size() << ") was: " << move_ops.errors()[0].second; - return Status(ss.str()); - } - } - - // 4. Delete temp directories - { - SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "FileDeletionTimer", - "FinalizationTimer")); - if (!dir_deletion_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) { - stringstream ss; - ss << "Error(s) deleting staging directories. First error (of " - << dir_deletion_ops.errors().size() << ") was: " - << dir_deletion_ops.errors()[0].second; - return Status(ss.str()); - } - } - - // 5. Optionally update the permissions of the created partition directories - // Do this last so that we don't make a dir unwritable before we write to it. - if (FLAGS_insert_inherit_permissions) { - HdfsOperationSet chmod_ops(&filesystem_connection_cache); - for (const PermissionCache::value_type& perm: permissions_cache) { - bool new_dir = perm.second.first; - if (new_dir) { - short permissions = perm.second.second; - VLOG_QUERY << "INSERT created new directory: " << perm.first - << ", inherited permissions are: " << oct << permissions; - chmod_ops.Add(CHMOD, perm.first, permissions); - } - } - if (!chmod_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) { - stringstream ss; - ss << "Error(s) setting permissions on newly created partition directories. First" - << " error (of " << chmod_ops.errors().size() << ") was: " - << chmod_ops.errors()[0].second; - return Status(ss.str()); - } - } - - return Status::OK(); -} - -Status Coordinator::FinalizeQuery() { +Status Coordinator::FinalizeHdfsInsert() { // All instances must have reported their final statuses before finalization, which is a // post-condition of Wait. If the query was not successful, still try to clean up the // staging directory. DCHECK(has_called_wait_); - DCHECK(needs_finalization_); + DCHECK(finalize_params() != nullptr); VLOG_QUERY << "Finalizing query: " << query_id(); SCOPED_TIMER(finalization_timer_); Status return_status = GetStatus(); if (return_status.ok()) { - return_status = FinalizeSuccessfulInsert(); + HdfsTableDescriptor* hdfs_table; + RETURN_IF_ERROR(DescriptorTbl::CreateHdfsTblDescriptor(query_ctx().desc_tbl, + finalize_params()->table_id, obj_pool(), &hdfs_table)); + DCHECK(hdfs_table != nullptr) + << "INSERT target table not known in descriptor table: " + << finalize_params()->table_id; + return_status = dml_exec_state_.FinalizeHdfsInsert(*finalize_params(), + query_ctx().client_request.query_options.s3_skip_insert_staging, + hdfs_table, query_profile_); + hdfs_table->ReleaseResources(); } stringstream staging_dir; - DCHECK(finalize_params_.__isset.staging_dir); - staging_dir << finalize_params_.staging_dir << "/" << PrintId(query_id(),"_") << "/"; + DCHECK(finalize_params()->__isset.staging_dir); + staging_dir << finalize_params()->staging_dir << "/" << PrintId(query_id(),"_") << "/"; hdfsFS hdfs_conn; RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(staging_dir.str(), &hdfs_conn)); @@ -813,26 +548,26 @@ Status Coordinator::Wait() { } DCHECK_EQ(stmt_type_, TStmtType::DML); - // Query finalization can only happen when all backends have reported - // relevant state. They only have relevant state to report in the parallel - // INSERT case, otherwise all the relevant state is from the coordinator - // fragment which will be available after Open() returns. - // Ignore the returned status if finalization is required., since FinalizeQuery() will - // pick it up and needs to execute regardless. + // Query finalization can only happen when all backends have reported relevant + // state. They only have relevant state to report in the parallel INSERT case, + // otherwise all the relevant state is from the coordinator fragment which will be + // available after Open() returns. Ignore the returned status if finalization is + // required., since FinalizeHdfsInsert() will pick it up and needs to execute + // regardless. Status status = WaitForBackendCompletion(); - if (!needs_finalization_ && !status.ok()) return status; + if (finalize_params() == nullptr && !status.ok()) return status; // Execution of query fragments has finished. We don't need to hold onto query execution // resources while we finalize the query. ReleaseExecResources(); // Query finalization is required only for HDFS table sinks - if (needs_finalization_) RETURN_IF_ERROR(FinalizeQuery()); + if (finalize_params() != nullptr) RETURN_IF_ERROR(FinalizeHdfsInsert()); // Release admission control resources after we'd done the potentially heavyweight // finalization. ReleaseAdmissionControlResources(); query_profile_->AddInfoString( - "DML Stats", DataSink::OutputDmlStats(per_partition_status_, "\n")); + "DML Stats", dml_exec_state_.OutputPartitionStats("\n")); // For DML queries, when Wait is done, the query is complete. ComputeQuerySummary(); return status; @@ -929,7 +664,7 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param // TODO: only do this when the sink is done; probably missing a done field // in TReportExecStatus for that if (params.__isset.insert_exec_status) { - UpdateInsertExecStatus(params.insert_exec_status); + dml_exec_state_.Update(params.insert_exec_status); } if (backend_state->ApplyExecStatusReport(params, &exec_summary_, &progress_)) { @@ -971,52 +706,10 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param return Status::OK(); } -void Coordinator::UpdateInsertExecStatus(const TInsertExecStatus& insert_exec_status) { - lock_guard<mutex> l(lock_); - for (const PartitionStatusMap::value_type& partition: - insert_exec_status.per_partition_status) { - TInsertPartitionStatus* status = &(per_partition_status_[partition.first]); - status->__set_num_modified_rows( - status->num_modified_rows + partition.second.num_modified_rows); - status->__set_kudu_latest_observed_ts(std::max( - partition.second.kudu_latest_observed_ts, status->kudu_latest_observed_ts)); - status->__set_id(partition.second.id); - status->__set_partition_base_dir(partition.second.partition_base_dir); - - if (partition.second.__isset.stats) { - if (!status->__isset.stats) status->__set_stats(TInsertStats()); - DataSink::MergeDmlStats(partition.second.stats, &status->stats); - } - } - files_to_move_.insert( - insert_exec_status.files_to_move.begin(), insert_exec_status.files_to_move.end()); -} - - -uint64_t Coordinator::GetLatestKuduInsertTimestamp() const { - uint64_t max_ts = 0; - for (const auto& entry : per_partition_status_) { - max_ts = std::max(max_ts, - static_cast<uint64_t>(entry.second.kudu_latest_observed_ts)); - } - return max_ts; -} - RuntimeState* Coordinator::runtime_state() { return coord_instance_ == nullptr ? nullptr : coord_instance_->runtime_state(); } -bool Coordinator::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) { - // Assume we are called only after all fragments have completed - DCHECK(has_called_wait_); - - for (const PartitionStatusMap::value_type& partition: per_partition_status_) { - catalog_update->created_partitions.insert(partition.first); - } - - return catalog_update->created_partitions.size() != 0; -} - // TODO: add histogram/percentile void Coordinator::ComputeQuerySummary() { // In this case, the query did not even get to start all fragment instances. @@ -1285,4 +978,8 @@ void Coordinator::FInstanceStatsToJson(Document* doc) { } doc->AddMember("backend_instances", states, doc->GetAllocator()); } + +const TFinalizeParams* Coordinator::finalize_params() const { + return schedule_.request().__isset.finalize_params + ? &schedule_.request().finalize_params : nullptr; } http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/coordinator.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h index d630b9a..6665c08 100644 --- a/be/src/runtime/coordinator.h +++ b/be/src/runtime/coordinator.h @@ -38,7 +38,8 @@ #include "common/status.h" #include "gen-cpp/Frontend_types.h" #include "gen-cpp/Types_types.h" -#include "runtime/runtime-state.h" // for PartitionStatusMap; TODO: disentangle +#include "runtime/dml-exec-state.h" +#include "runtime/query-state.h" #include "scheduling/query-schedule.h" #include "util/condition-variable.h" #include "util/progress-updater.h" @@ -130,9 +131,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// Idempotent. void Cancel(const Status* cause = nullptr); - /// Updates execution status of a particular backend as well as Insert-related - /// status (per_partition_status_ and files_to_move_). Also updates - /// num_remaining_backends_ and cancels execution if the backend has an error status. + /// Updates execution status of a particular backend as well as dml_exec_state_. + /// Also updates num_remaining_backends_ and cancels execution if the backend has an + /// error status. Status UpdateBackendExecStatus(const TReportExecStatusParams& params) WARN_UNUSED_RESULT; @@ -149,17 +150,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save MemTracker* query_mem_tracker() const; /// This is safe to call only after Wait() - const PartitionStatusMap& per_partition_status() { return per_partition_status_; } - - /// Returns the latest Kudu timestamp observed across any backends where DML into Kudu - /// was executed, or 0 if there were no Kudu timestamps reported. - /// This should only be called after Wait(). - uint64_t GetLatestKuduInsertTimestamp() const; - - /// Gathers all updates to the catalog required once this query has completed execution. - /// Returns true if a catalog update is required, false otherwise. - /// Must only be called after Wait() - bool PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update); + DmlExecState* dml_exec_state() { return &dml_exec_state_; } /// Return error log for coord and all the fragments. The error messages from the /// individual fragment instances are merged into a single output to retain readability. @@ -229,12 +220,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// GetNext() hits eos. PlanRootSink* coord_sink_ = nullptr; - /// True if the query needs a post-execution step to tidy up - bool needs_finalization_ = false; - - /// Only valid if needs_finalization is true - TFinalizeParams finalize_params_; - /// ensures single-threaded execution of Wait(); must not hold lock_ when acquiring this boost::mutex wait_lock_; @@ -275,6 +260,11 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save ExecSummary exec_summary_; + /// Filled in as the query completes and tracks the results of DML queries. This is + /// either the union of the reports from all fragment instances, or taken from the + /// coordinator fragment: only one of the two can legitimately produce updates. + DmlExecState dml_exec_state_; + /// Aggregate counters for the entire query. Lives in 'obj_pool_'. RuntimeProfile* query_profile_ = nullptr; @@ -308,21 +298,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// hits 0, any Wait()'ing thread is notified int num_remaining_backends_ = 0; - /// The following two structures, partition_row_counts_ and files_to_move_ are filled in - /// as the query completes, and track the results of INSERT queries that alter the - /// structure of tables. They are either the union of the reports from all fragment - /// instances, or taken from the coordinator fragment: only one of the two can - /// legitimately produce updates. - - /// The set of partitions that have been written to or updated by all fragment - /// instances, along with statistics such as the number of rows written (may be 0). For - /// unpartitioned tables, the empty string denotes the entire table. - PartitionStatusMap per_partition_status_; - - /// The set of files to move after an INSERT query has run, in (src, dest) form. An - /// empty string for the destination means that a file is to be deleted. - FileMoveMap files_to_move_; - /// Event timeline for this query. Not owned. RuntimeProfile::EventSequence* query_events_ = nullptr; @@ -357,6 +332,12 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// Returns a local object pool. ObjectPool* obj_pool() { return obj_pool_.get(); } + /// Returns request's finalize params, or nullptr if not present. If not present, then + /// HDFS INSERT finalization is not required. + const TFinalizeParams* finalize_params() const; + + const TQueryCtx& query_ctx() const { return schedule_.request().query_ctx; } + /// Only valid *after* calling Exec(). Return nullptr if the running query does not /// produce any rows. RuntimeState* runtime_state(); @@ -381,9 +362,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save Status UpdateStatus(const Status& status, const std::string& backend_hostname, bool is_fragment_failure, const TUniqueId& failed_fragment) WARN_UNUSED_RESULT; - /// Update per_partition_status_ and files_to_move_. - void UpdateInsertExecStatus(const TInsertExecStatus& insert_exec_status); - /// Returns only when either all execution backends have reported success or the query /// is in error. Returns the status of the query. /// It is safe to call this concurrently, but any calls must be made only after Exec(). @@ -403,29 +381,11 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// profiles must not be updated while this is running. void ComputeQuerySummary(); - /// TODO: move the next 3 functions into a separate class - - /// Determines what the permissions of directories created by INSERT statements should - /// be if permission inheritance is enabled. Populates a map from all prefixes of - /// path_str (including the full path itself) which is a path in Hdfs, to pairs - /// (does_not_exist, permissions), where does_not_exist is true if the path does not - /// exist in Hdfs. If does_not_exist is true, permissions is set to the permissions of - /// the most immediate ancestor of the path that does exist, i.e. the permissions that - /// the path should inherit when created. Otherwise permissions is set to the actual - /// permissions of the path. The PermissionCache argument is also used to cache the - /// output across repeated calls, to avoid repeatedly calling hdfsGetPathInfo() on the - /// same path. - typedef boost::unordered_map<std::string, std::pair<bool, short>> PermissionCache; - void PopulatePathPermissionCache(hdfsFS fs, const std::string& path_str, - PermissionCache* permissions_cache); - - /// Moves all temporary staging files to their final destinations. - Status FinalizeSuccessfulInsert() WARN_UNUSED_RESULT; - - /// Perform any post-query cleanup required. Called by Wait() only after all fragment - /// instances have returned, or if the query has failed, in which case it only cleans up - /// temporary data rather than finishing the INSERT in flight. - Status FinalizeQuery() WARN_UNUSED_RESULT; + /// Perform any post-query cleanup required for HDFS (or other Hadoop FileSystem) + /// INSERT. Called by Wait() only after all fragment instances have returned, or if + /// the query has failed, in which case it only cleans up temporary data rather than + /// finishing the INSERT in flight. + Status FinalizeHdfsInsert() WARN_UNUSED_RESULT; /// Populates backend_states_, starts query execution at all backends in parallel, and /// blocks until startup completes. http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/dml-exec-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/dml-exec-state.cc b/be/src/runtime/dml-exec-state.cc new file mode 100644 index 0000000..6853da5 --- /dev/null +++ b/be/src/runtime/dml-exec-state.cc @@ -0,0 +1,494 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/dml-exec-state.h" + +#include <boost/thread/locks.hpp> +#include <boost/thread/lock_guard.hpp> +#include <boost/algorithm/string.hpp> +#include <boost/algorithm/string/join.hpp> +#include <boost/filesystem.hpp> +#include <gutil/strings/substitute.h> + +#include "common/logging.h" +#include "util/pretty-printer.h" +#include "util/container-util.h" +#include "util/hdfs-bulk-ops.h" +#include "util/hdfs-util.h" +#include "util/runtime-profile-counters.h" +#include "runtime/descriptors.h" +#include "runtime/hdfs-fs-cache.h" +#include "runtime/exec-env.h" +#include "gen-cpp/ImpalaService_types.h" +#include "gen-cpp/ImpalaInternalService_constants.h" +#include "gen-cpp/ImpalaInternalService_types.h" +#include "gen-cpp/Frontend_types.h" + +#include "common/names.h" + +DEFINE_bool(insert_inherit_permissions, false, "If true, new directories created by " + "INSERTs will inherit the permissions of their parent directories"); + +using namespace impala; +using boost::algorithm::is_any_of; +using boost::algorithm::split; + +string DmlExecState::OutputPartitionStats(const string& prefix) { + lock_guard<mutex> l(lock_); + const char* indent = " "; + stringstream ss; + ss << prefix; + bool first = true; + for (const PartitionStatusMap::value_type& val: per_partition_status_) { + if (!first) ss << endl; + first = false; + ss << "Partition: "; + const string& partition_key = val.first; + if (partition_key == g_ImpalaInternalService_constants.ROOT_PARTITION_KEY) { + ss << "Default" << endl; + } else { + ss << partition_key << endl; + } + if (val.second.__isset.num_modified_rows) { + ss << "NumModifiedRows: " << val.second.num_modified_rows << endl; + } + + if (!val.second.__isset.stats) continue; + const TInsertStats& stats = val.second.stats; + if (stats.__isset.kudu_stats) { + ss << "NumRowErrors: " << stats.kudu_stats.num_row_errors << endl; + } + + ss << indent << "BytesWritten: " + << PrettyPrinter::Print(stats.bytes_written, TUnit::BYTES); + if (stats.__isset.parquet_stats) { + const TParquetInsertStats& parquet_stats = stats.parquet_stats; + ss << endl << indent << "Per Column Sizes:"; + for (map<string, int64_t>::const_iterator i = parquet_stats.per_column_size.begin(); + i != parquet_stats.per_column_size.end(); ++i) { + ss << endl << indent << indent << i->first << ": " + << PrettyPrinter::Print(i->second, TUnit::BYTES); + } + } + } + return ss.str(); +} + +void DmlExecState::Update(const TInsertExecStatus& dml_exec_status) { + lock_guard<mutex> l(lock_); + for (const PartitionStatusMap::value_type& partition: + dml_exec_status.per_partition_status) { + TInsertPartitionStatus* status = &(per_partition_status_[partition.first]); + status->__set_num_modified_rows( + status->num_modified_rows + partition.second.num_modified_rows); + status->__set_kudu_latest_observed_ts(max<uint64_t>( + partition.second.kudu_latest_observed_ts, status->kudu_latest_observed_ts)); + status->__set_id(partition.second.id); + status->__set_partition_base_dir(partition.second.partition_base_dir); + + if (partition.second.__isset.stats) { + if (!status->__isset.stats) status->__set_stats(TInsertStats()); + MergeDmlStats(partition.second.stats, &status->stats); + } + } + files_to_move_.insert( + dml_exec_status.files_to_move.begin(), dml_exec_status.files_to_move.end()); +} + +void DmlExecState::AddFileToMove(const string& file_name, const string& location) { + lock_guard<mutex> l(lock_); + files_to_move_[file_name] = location; +} + +uint64_t DmlExecState::GetKuduLatestObservedTimestamp() { + lock_guard<mutex> l(lock_); + uint64_t max_ts = 0; + for (const auto& entry : per_partition_status_) { + max_ts = max<uint64_t>(max_ts, entry.second.kudu_latest_observed_ts); + } + return max_ts; +} + +int64_t DmlExecState::GetNumModifiedRows() { + lock_guard<mutex> l(lock_); + int64_t result = 0; + for (const PartitionStatusMap::value_type& p: per_partition_status_) { + result += p.second.num_modified_rows; + } + return result; +} + +bool DmlExecState::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) { + lock_guard<mutex> l(lock_); + for (const PartitionStatusMap::value_type& partition: per_partition_status_) { + catalog_update->created_partitions.insert(partition.first); + } + return catalog_update->created_partitions.size() != 0; +} + +Status DmlExecState::FinalizeHdfsInsert(const TFinalizeParams& params, + bool s3_skip_insert_staging, HdfsTableDescriptor* hdfs_table, + RuntimeProfile* profile) { + lock_guard<mutex> l(lock_); + PermissionCache permissions_cache; + HdfsFsCache::HdfsFsMap filesystem_connection_cache; + HdfsOperationSet partition_create_ops(&filesystem_connection_cache); + + // INSERT finalization happens in the five following steps + // 1. If OVERWRITE, remove all the files in the target directory + // 2. Create all the necessary partition directories. + + // Loop over all partitions that were updated by this insert, and create the set of + // filesystem operations required to create the correct partition structure on disk. + for (const PartitionStatusMap::value_type& partition: per_partition_status_) { + SCOPED_TIMER(ADD_CHILD_TIMER(profile, "Overwrite/PartitionCreationTimer", + "FinalizationTimer")); + // INSERT allows writes to tables that have partitions on multiple filesystems. + // So we need to open connections to different filesystems as necessary. We use a + // local connection cache and populate it with one connection per filesystem that the + // partitions are on. + hdfsFS partition_fs_connection; + RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection( + partition.second.partition_base_dir, &partition_fs_connection, + &filesystem_connection_cache)); + + // Look up the partition in the descriptor table. + stringstream part_path_ss; + if (partition.second.id == -1) { + // If this is a non-existant partition, use the default partition location of + // <base_dir>/part_key_1=val/part_key_2=val/... + part_path_ss << params.hdfs_base_dir << "/" << partition.first; + } else { + HdfsPartitionDescriptor* part = hdfs_table->GetPartition(partition.second.id); + DCHECK(part != nullptr) + << "table_id=" << hdfs_table->id() << " partition_id=" << partition.second.id; + part_path_ss << part->location(); + } + const string& part_path = part_path_ss.str(); + bool is_s3_path = IsS3APath(part_path.c_str()); + + // If this is an overwrite insert, we will need to delete any updated partitions + if (params.is_overwrite) { + if (partition.first.empty()) { + // If the root directory is written to, then the table must not be partitioned + DCHECK(per_partition_status_.size() == 1); + // We need to be a little more careful, and only delete data files in the root + // because the tmp directories the sink(s) wrote are there also. + // So only delete files in the table directory - all files are treated as data + // files by Hive and Impala, but directories are ignored (and may legitimately + // be used to store permanent non-table data by other applications). + int num_files = 0; + // hfdsListDirectory() only sets errno if there is an error, but it doesn't set + // it to 0 if the call succeed. When there is no error, errno could be any + // value. So need to clear errno before calling it. + // Once HDFS-8407 is fixed, the errno reset won't be needed. + errno = 0; + hdfsFileInfo* existing_files = + hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files); + if (existing_files == nullptr && errno == EAGAIN) { + errno = 0; + existing_files = + hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files); + } + // hdfsListDirectory() returns nullptr not only when there is an error but also + // when the directory is empty(HDFS-8407). Need to check errno to make sure + // the call fails. + if (existing_files == nullptr && errno != 0) { + return Status(GetHdfsErrorMsg("Could not list directory: ", part_path)); + } + for (int i = 0; i < num_files; ++i) { + const string filename = + boost::filesystem::path(existing_files[i].mName).filename().string(); + if (existing_files[i].mKind == kObjectKindFile && !IsHiddenFile(filename)) { + partition_create_ops.Add(DELETE, existing_files[i].mName); + } + } + hdfsFreeFileInfo(existing_files, num_files); + } else { + // This is a partition directory, not the root directory; we can delete + // recursively with abandon, after checking that it ever existed. + // TODO: There's a potential race here between checking for the directory + // and a third-party deleting it. + if (FLAGS_insert_inherit_permissions && !is_s3_path) { + // There is no directory structure in S3, so "inheriting" permissions is not + // possible. + // TODO: Try to mimic inheriting permissions for S3. + PopulatePathPermissionCache( + partition_fs_connection, part_path, &permissions_cache); + } + // S3 doesn't have a directory structure, so we technically wouldn't need to + // CREATE_DIR on S3. However, libhdfs always checks if a path exists before + // carrying out an operation on that path. So we still need to call CREATE_DIR + // before we access that path due to this limitation. + if (hdfsExists(partition_fs_connection, part_path.c_str()) != -1) { + partition_create_ops.Add(DELETE_THEN_CREATE, part_path); + } else { + // Otherwise just create the directory. + partition_create_ops.Add(CREATE_DIR, part_path); + } + } + } else if (!is_s3_path || !s3_skip_insert_staging) { + // If the S3_SKIP_INSERT_STAGING query option is set, then the partition directories + // would have already been created by the table sinks. + if (FLAGS_insert_inherit_permissions && !is_s3_path) { + PopulatePathPermissionCache( + partition_fs_connection, part_path, &permissions_cache); + } + if (hdfsExists(partition_fs_connection, part_path.c_str()) == -1) { + partition_create_ops.Add(CREATE_DIR, part_path); + } + } + } + + { + SCOPED_TIMER(ADD_CHILD_TIMER(profile, "Overwrite/PartitionCreationTimer", + "FinalizationTimer")); + if (!partition_create_ops.Execute( + ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) { + for (const HdfsOperationSet::Error& err: partition_create_ops.errors()) { + // It's ok to ignore errors creating the directories, since they may already + // exist. If there are permission errors, we'll run into them later. + if (err.first->op() != CREATE_DIR) { + return Status(Substitute( + "Error(s) deleting partition directories. First error (of $0) was: $1", + partition_create_ops.errors().size(), err.second)); + } + } + } + } + + // 3. Move all tmp files + HdfsOperationSet move_ops(&filesystem_connection_cache); + HdfsOperationSet dir_deletion_ops(&filesystem_connection_cache); + + for (FileMoveMap::value_type& move: files_to_move_) { + // Empty destination means delete, so this is a directory. These get deleted in a + // separate pass to ensure that we have moved all the contents of the directory first. + if (move.second.empty()) { + VLOG_ROW << "Deleting file: " << move.first; + dir_deletion_ops.Add(DELETE, move.first); + } else { + VLOG_ROW << "Moving tmp file: " << move.first << " to " << move.second; + if (FilesystemsMatch(move.first.c_str(), move.second.c_str())) { + move_ops.Add(RENAME, move.first, move.second); + } else { + move_ops.Add(MOVE, move.first, move.second); + } + } + } + + { + SCOPED_TIMER(ADD_CHILD_TIMER(profile, "FileMoveTimer", "FinalizationTimer")); + if (!move_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) { + stringstream ss; + ss << "Error(s) moving partition files. First error (of " + << move_ops.errors().size() << ") was: " << move_ops.errors()[0].second; + return Status(ss.str()); + } + } + + // 4. Delete temp directories + { + SCOPED_TIMER(ADD_CHILD_TIMER(profile, "FileDeletionTimer", "FinalizationTimer")); + if (!dir_deletion_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) { + stringstream ss; + ss << "Error(s) deleting staging directories. First error (of " + << dir_deletion_ops.errors().size() << ") was: " + << dir_deletion_ops.errors()[0].second; + return Status(ss.str()); + } + } + + // 5. Optionally update the permissions of the created partition directories + // Do this last so that we don't make a dir unwritable before we write to it. + if (FLAGS_insert_inherit_permissions) { + HdfsOperationSet chmod_ops(&filesystem_connection_cache); + for (const PermissionCache::value_type& perm: permissions_cache) { + bool new_dir = perm.second.first; + if (new_dir) { + short permissions = perm.second.second; + VLOG_QUERY << "INSERT created new directory: " << perm.first + << ", inherited permissions are: " << oct << permissions; + chmod_ops.Add(CHMOD, perm.first, permissions); + } + } + if (!chmod_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) { + stringstream ss; + ss << "Error(s) setting permissions on newly created partition directories. First" + << " error (of " << chmod_ops.errors().size() << ") was: " + << chmod_ops.errors()[0].second; + return Status(ss.str()); + } + } + return Status::OK(); +} + +void DmlExecState::PopulatePathPermissionCache(hdfsFS fs, const string& path_str, + PermissionCache* permissions_cache) { + // Find out if the path begins with a hdfs:// -style prefix, and remove it and the + // location (e.g. host:port) if so. + int scheme_end = path_str.find("://"); + string stripped_str; + if (scheme_end != string::npos) { + // Skip past the subsequent location:port/ prefix. + stripped_str = path_str.substr(path_str.find("/", scheme_end + 3)); + } else { + stripped_str = path_str; + } + + // Get the list of path components, used to build all path prefixes. + vector<string> components; + split(components, stripped_str, is_any_of("/")); + + // Build a set of all prefixes (including the complete string) of stripped_path. So + // /a/b/c/d leads to a vector of: /a, /a/b, /a/b/c, /a/b/c/d + vector<string> prefixes; + // Stores the current prefix + stringstream accumulator; + for (const string& component: components) { + if (component.empty()) continue; + accumulator << "/" << component; + prefixes.push_back(accumulator.str()); + } + + // Now for each prefix, stat() it to see if a) it exists and b) if so what its + // permissions are. When we meet a directory that doesn't exist, we record the fact that + // we need to create it, and the permissions of its parent dir to inherit. + // + // Every prefix is recorded in the PermissionCache so we don't do more than one stat() + // for each path. If we need to create the directory, we record it as the pair (true, + // perms) so that the caller can identify which directories need their permissions + // explicitly set. + + // Set to the permission of the immediate parent (i.e. the permissions to inherit if the + // current dir doesn't exist). + short permissions = 0; + for (const string& path: prefixes) { + PermissionCache::const_iterator it = permissions_cache->find(path); + if (it == permissions_cache->end()) { + hdfsFileInfo* info = hdfsGetPathInfo(fs, path.c_str()); + if (info != nullptr) { + // File exists, so fill the cache with its current permissions. + permissions_cache->insert( + make_pair(path, make_pair(false, info->mPermissions))); + permissions = info->mPermissions; + hdfsFreeFileInfo(info, 1); + } else { + // File doesn't exist, so we need to set its permissions to its immediate parent + // once it's been created. + permissions_cache->insert(make_pair(path, make_pair(true, permissions))); + } + } else { + permissions = it->second.second; + } + } +} + +bool DmlExecState::ToThrift(TInsertExecStatus* dml_status) { + lock_guard<mutex> l(lock_); + bool set_thrift = false; + if (files_to_move_.size() > 0) { + dml_status->__set_files_to_move(files_to_move_); + set_thrift = true; + } + if (per_partition_status_.size() > 0) { + dml_status->__set_per_partition_status(per_partition_status_); + set_thrift = true; + } + return set_thrift; +} + +void DmlExecState::ToTInsertResult(TInsertResult* insert_result) { + lock_guard<mutex> l(lock_); + int64_t num_row_errors = 0; + bool has_kudu_stats = false; + for (const PartitionStatusMap::value_type& v: per_partition_status_) { + insert_result->rows_modified[v.first] = v.second.num_modified_rows; + if (v.second.__isset.stats && v.second.stats.__isset.kudu_stats) { + has_kudu_stats = true; + } + num_row_errors += v.second.stats.kudu_stats.num_row_errors; + } + if (has_kudu_stats) insert_result->__set_num_row_errors(num_row_errors); +} + +void DmlExecState::AddPartition( + const string& name, int64_t id, const string* base_dir) { + lock_guard<mutex> l(lock_); + DCHECK(per_partition_status_.find(name) == per_partition_status_.end()); + TInsertPartitionStatus status; + status.__set_num_modified_rows(0L); + status.__set_id(id); + status.__isset.stats = true; + if (base_dir != nullptr) status.__set_partition_base_dir(*base_dir); + per_partition_status_.insert(make_pair(name, status)); +} + +void DmlExecState::UpdatePartition(const string& partition_name, + int64_t num_modified_rows_delta, const TInsertStats* insert_stats) { + lock_guard<mutex> l(lock_); + PartitionStatusMap::iterator entry = per_partition_status_.find(partition_name); + DCHECK(entry != per_partition_status_.end()); + entry->second.num_modified_rows += num_modified_rows_delta; + if (insert_stats == nullptr) return; + MergeDmlStats(*insert_stats, &entry->second.stats); +} + +void DmlExecState::MergeDmlStats(const TInsertStats& src, TInsertStats* dst) { + dst->bytes_written += src.bytes_written; + if (src.__isset.kudu_stats) { + dst->__isset.kudu_stats = true; + if (!dst->kudu_stats.__isset.num_row_errors) { + dst->kudu_stats.__set_num_row_errors(0); + } + dst->kudu_stats.__set_num_row_errors( + dst->kudu_stats.num_row_errors + src.kudu_stats.num_row_errors); + } + if (src.__isset.parquet_stats) { + if (dst->__isset.parquet_stats) { + MergeMapValues<string, int64_t>(src.parquet_stats.per_column_size, + &dst->parquet_stats.per_column_size); + } else { + dst->__set_parquet_stats(src.parquet_stats); + } + } +} + +void DmlExecState::InitForKuduDml() { + // For Kudu, track only one set of DML stats, so use the ROOT_PARTITION_KEY. + const string& partition_name = g_ImpalaInternalService_constants.ROOT_PARTITION_KEY; + lock_guard<mutex> l(lock_); + DCHECK(per_partition_status_.find(partition_name) == per_partition_status_.end()); + TInsertPartitionStatus status; + status.__set_num_modified_rows(0L); + status.__set_id(-1L); + status.__isset.stats = true; + status.stats.__isset.kudu_stats = true; + per_partition_status_.insert(make_pair(partition_name, status)); +} + +void DmlExecState::SetKuduDmlStats(int64_t num_modified_rows, int64_t num_row_errors, + int64_t latest_ts) { + // For Kudu, track only one set of DML stats, so use the ROOT_PARTITION_KEY. + const string& partition_name = g_ImpalaInternalService_constants.ROOT_PARTITION_KEY; + lock_guard<mutex> l(lock_); + PartitionStatusMap::iterator entry = per_partition_status_.find(partition_name); + DCHECK(entry != per_partition_status_.end()); + entry->second.__set_num_modified_rows(num_modified_rows); + entry->second.stats.kudu_stats.__set_num_row_errors(num_row_errors); + entry->second.__set_kudu_latest_observed_ts(latest_ts); +} http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/dml-exec-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/dml-exec-state.h b/be/src/runtime/dml-exec-state.h new file mode 100644 index 0000000..728284a --- /dev/null +++ b/be/src/runtime/dml-exec-state.h @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#ifndef IMPALA_RUNTIME_DML_EXEC_STATE_H +#define IMPALA_RUNTIME_DML_EXEC_STATE_H + +#include <string> +#include <map> +#include <boost/unordered_map.hpp> +#include <boost/thread/mutex.hpp> + +#include "common/hdfs.h" +#include "common/status.h" + +namespace impala { + +class TInsertExecStatus; +class TInsertResult; +class TInsertStats; +class TFinalizeParams; +class TUpdateCatalogRequest; +class TInsertPartitionStatus; +class RuntimeProfile; +class HdfsTableDescriptor; + +/// DmlExecState manages the state related to the execution of a DML statement +/// (creation of new files, new partitions, etc.). +/// +/// During DML execution, the table sink adds per-partition status using AddPartition() +/// and then UpdatePartition() for non-Kudu tables. For Kudu tables, the sink adds DML +/// stats using InitForKuduDml() followed by SetKuduDmlStats(). In the case of the +/// HDFS sink, it will also record the collection of files that should be moved by the +/// coordinator on finalization using AddFileToMove(). +/// +/// The state is then serialized to thrift and merged at the coordinator using +/// Update(). The coordinator will then use OutputPartitionStats(), +/// GetKuduLatestObservedTimestamp(), PrepareCatalogUpdate() and FinalizeHdfsInsert() +/// to perform various finalization tasks. +/// + +/// Thread-safe. +class DmlExecState { + public: + /// Merge values from 'dml_exec_status'. + void Update(const TInsertExecStatus& dml_exec_status); + + /// Add a new partition with the given parameters. Ignores 'base_dir' if nullptr. + /// It is an error to call this for an existing partition. + void AddPartition(const std::string& name, int64_t id, const std::string* base_dir); + + /// Merge given values into stats for partition with name 'partition_name'. + /// Ignores 'insert_stats' if nullptr. + /// Requires that the partition already exist. + void UpdatePartition(const std::string& partition_name, + int64_t num_modified_rows_delta, const TInsertStats* insert_stats); + + /// Used to initialize this state when execute Kudu DML. Must be called before + /// SetKuduDmlStats(). + void InitForKuduDml(); + + /// Update stats for a Kudu DML sink. Requires that InitForKuduDml() was already called. + void SetKuduDmlStats(int64_t num_modified_rows, int64_t num_row_errors, + int64_t latest_ts); + + /// Adds new file/location to the move map. + void AddFileToMove(const std::string& file_name, const std::string& location); + + /// Outputs the partition stats to a string. + std::string OutputPartitionStats(const std::string& prefix); + + /// Returns the latest Kudu timestamp observed across any backends where DML into Kudu + /// was executed, or 0 if there were no Kudu timestamps reported. + uint64_t GetKuduLatestObservedTimestamp(); + + /// Return the total number of modified rows across all partitions. + int64_t GetNumModifiedRows(); + + /// Populates 'catalog_update' with PartitionStatusMap data. + /// Returns true if a catalog update is required, false otherwise. + bool PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update); + + /// For HDFS (and other Hadoop FileSystem) INSERT, moves all temporary staging files + /// to their final destinations, as indicated by 'params', and creates new partitions + /// for 'hdfs_table' as required. Adds child timers to profile for the various + /// stages of finalization. If the table is on an S3 path and + /// 's3_skip_insert_staging' is true, does not create new partition directories. + Status FinalizeHdfsInsert(const TFinalizeParams& params, bool s3_skip_insert_staging, + HdfsTableDescriptor* hdfs_table, RuntimeProfile* profile) WARN_UNUSED_RESULT; + + // Serialize to thrift. Returns true if any fields of 'dml_status' were set. + bool ToThrift(TInsertExecStatus* dml_status); + + // Populates 'insert_result' with PartitionStatusMap data, for Impala's extension of + // Beeswax. + void ToTInsertResult(TInsertResult* insert_result); + + private: + // protects all fields below + boost::mutex lock_; + + /// Counts how many rows an DML query has added to a particular partition (partitions + /// are identified by their partition keys: k1=v1/k2=v2 etc. Unpartitioned tables + /// have a single 'default' partition which is identified by ROOT_PARTITION_KEY. + /// Uses ordered map so that iteration order is deterministic. + typedef std::map<std::string, TInsertPartitionStatus> PartitionStatusMap; + PartitionStatusMap per_partition_status_; + + /// Tracks files to move from a temporary (key) to a final destination (value) as + /// part of query finalization. If the destination is empty, the file is to be + /// deleted. Uses ordered map so that iteration order is deterministic. + typedef std::map<std::string, std::string> FileMoveMap; + FileMoveMap files_to_move_; + + /// Determines what the permissions of directories created by INSERT statements should + /// be if permission inheritance is enabled. Populates a map from all prefixes of + /// 'path_str' (including the full path itself) which is a path in Hdfs, to pairs + /// (does_not_exist, permissions), where does_not_exist is true if the path does not + /// exist in Hdfs. If does_not_exist is true, permissions is set to the permissions of + /// the most immediate ancestor of the path that does exist, i.e. the permissions that + /// the path should inherit when created. Otherwise permissions is set to the actual + /// permissions of the path. The PermissionCache argument is also used to cache the + /// output across repeated calls, to avoid repeatedly calling hdfsGetPathInfo() on the + /// same path. + typedef boost::unordered_map<std::string, std::pair<bool, short>> PermissionCache; + void PopulatePathPermissionCache(hdfsFS fs, const std::string& path_str, + PermissionCache* permissions_cache); + + /// Merge 'src' into 'dst'. Not thread-safe. + void MergeDmlStats(const TInsertStats& src, TInsertStats* dst); +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/query-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index 1cc646d..ad5748f 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -236,18 +236,9 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status, // Only send updates to insert status if fragment is finished, the coordinator waits // until query execution is done to use them anyhow. RuntimeState* state = fis->runtime_state(); - if (done && (state->hdfs_files_to_move()->size() > 0 - || state->per_partition_status()->size() > 0)) { - TInsertExecStatus insert_status; - if (state->hdfs_files_to_move()->size() > 0) { - insert_status.__set_files_to_move(*state->hdfs_files_to_move()); - } - if (state->per_partition_status()->size() > 0) { - insert_status.__set_per_partition_status(*state->per_partition_status()); - } - params.__set_insert_exec_status(insert_status); + if (done && state->dml_exec_state()->ToThrift(¶ms.insert_exec_status)) { + params.__isset.insert_exec_status = true; } - // Send new errors to coordinator state->GetUnreportedErrors(¶ms.error_log); params.__isset.error_log = (params.error_log.size() > 0); http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/runtime-filter-bank.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc index 239e066..4e23a42 100644 --- a/be/src/runtime/runtime-filter-bank.cc +++ b/be/src/runtime/runtime-filter-bank.cc @@ -27,6 +27,7 @@ #include "runtime/mem-tracker.h" #include "runtime/query-state.h" #include "runtime/runtime-filter.inline.h" +#include "runtime/runtime-state.h" #include "service/impala-server.h" #include "util/bit-util.h" #include "util/bloom-filter.h" http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/runtime-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index cd2b061..4b005b2 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -27,6 +27,7 @@ #include "common/global-types.h" // for PlanNodeId #include "runtime/client-cache-types.h" #include "runtime/thread-resource-mgr.h" +#include "runtime/dml-exec-state.h" #include "util/runtime-profile.h" #include "gen-cpp/ImpalaInternalService_types.h" @@ -56,22 +57,6 @@ namespace io { class DiskIoMgr; } -/// TODO: move the typedefs into a separate .h (and fix the includes for that) - -/// Counts how many rows an INSERT query has added to a particular partition -/// (partitions are identified by their partition keys: k1=v1/k2=v2 -/// etc. Unpartitioned tables have a single 'default' partition which is -/// identified by ROOT_PARTITION_KEY. -typedef std::map<std::string, TInsertPartitionStatus> PartitionStatusMap; - -/// Stats per partition for insert queries. They key is the same as for PartitionRowCount -typedef std::map<std::string, TInsertStats> PartitionInsertStats; - -/// Tracks files to move from a temporary (key) to a final destination (value) as -/// part of query finalization. If the destination is empty, the file is to be -/// deleted. -typedef std::map<std::string, std::string> FileMoveMap; - /// A collection of items that are part of the global state of a query and shared across /// all execution nodes of that query. After initialisation, callers must call /// ReleaseResources() to ensure that all resources are correctly freed before @@ -133,8 +118,6 @@ class RuntimeState { } ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; } - FileMoveMap* hdfs_files_to_move() { return &hdfs_files_to_move_; } - void set_fragment_root_id(PlanNodeId id) { DCHECK_EQ(root_node_id_, -1) << "Should not set this twice."; root_node_id_ = id; @@ -146,7 +129,7 @@ class RuntimeState { RuntimeFilterBank* filter_bank() { return filter_bank_.get(); } - PartitionStatusMap* per_partition_status() { return &per_partition_status_; } + DmlExecState* dml_exec_state() { return &dml_exec_state_; } /// Returns runtime state profile RuntimeProfile* runtime_profile() { return profile_; } @@ -341,12 +324,8 @@ class RuntimeState { /// state is responsible for returning this pool to the thread mgr. ThreadResourceMgr::ResourcePool* resource_pool_ = nullptr; - /// Temporary Hdfs files created, and where they should be moved to ultimately. - /// Mapping a filename to a blank destination causes it to be deleted. - FileMoveMap hdfs_files_to_move_; - - /// Records summary statistics for the results of inserts into Hdfs partitions. - PartitionStatusMap per_partition_status_; + /// Execution state for DML statements. + DmlExecState dml_exec_state_; RuntimeProfile* const profile_; http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/client-request-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index ac6178c..2aedcab 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -21,6 +21,7 @@ #include <limits> #include <gutil/strings/substitute.h> +#include "runtime/coordinator.h" #include "runtime/mem-tracker.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" @@ -570,7 +571,8 @@ void ClientRequestState::Done() { // must happen before taking lock_ below. if (coord_.get() != NULL) { // This is safe to access on coord_ after Wait() has been called. - uint64_t latest_kudu_ts = coord_->GetLatestKuduInsertTimestamp(); + uint64_t latest_kudu_ts = + coord_->dml_exec_state()->GetKuduLatestObservedTimestamp(); if (latest_kudu_ts > 0) { VLOG_RPC << "Updating session (id=" << session_id() << ") with latest " << "observed Kudu timestamp: " << latest_kudu_ts; @@ -918,7 +920,7 @@ Status ClientRequestState::UpdateCatalog() { catalog_update.__set_sync_ddl(exec_request().query_options.sync_ddl); catalog_update.__set_header(TCatalogServiceRequestHeader()); catalog_update.header.__set_requesting_user(effective_user()); - if (!coord()->PrepareCatalogUpdate(&catalog_update)) { + if (!coord()->dml_exec_state()->PrepareCatalogUpdate(&catalog_update)) { VLOG_QUERY << "No partitions altered, not updating metastore (query id: " << query_id() << ")"; } else { @@ -1027,9 +1029,7 @@ void ClientRequestState::SetCreateTableAsSelectResultSet() { // operation. if (catalog_op_executor_->ddl_exec_response()->new_table_created) { DCHECK(coord_.get()); - for (const PartitionStatusMap::value_type& p: coord_->per_partition_status()) { - total_num_rows_inserted += p.second.num_modified_rows; - } + total_num_rows_inserted = coord_->dml_exec_state()->GetNumModifiedRows(); } const string& summary_msg = Substitute("Inserted $0 row(s)", total_num_rows_inserted); VLOG_QUERY << summary_msg; http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/client-request-state.h ---------------------------------------------------------------------- diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h index 0c05bc4..657f3de 100644 --- a/be/src/service/client-request-state.h +++ b/be/src/service/client-request-state.h @@ -24,6 +24,7 @@ #include "scheduling/query-schedule.h" #include "service/child-query.h" #include "service/impala-server.h" +#include "service/query-result-set.h" #include "util/auth-util.h" #include "util/condition-variable.h" #include "util/runtime-profile.h" http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/impala-beeswax-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc index b827ff3..c441285 100644 --- a/be/src/service/impala-beeswax-server.cc +++ b/be/src/service/impala-beeswax-server.cc @@ -20,10 +20,12 @@ #include "common/logging.h" #include "gen-cpp/Frontend_types.h" #include "rpc/thrift-util.h" +#include "runtime/coordinator.h" #include "runtime/exec-env.h" #include "runtime/raw-value.inline.h" #include "runtime/timestamp-value.h" #include "service/client-request-state.h" +#include "service/frontend.h" #include "service/query-options.h" #include "service/query-result-set.h" #include "util/auth-util.h" @@ -563,22 +565,8 @@ Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id, // Note that when IMPALA-87 is fixed (INSERT without FROM clause) we might // need to revisit this, since that might lead us to insert a row without a // coordinator, depending on how we choose to drive the table sink. - int64_t num_row_errors = 0; - bool has_kudu_stats = false; if (request_state->coord() != nullptr) { - for (const PartitionStatusMap::value_type& v: - request_state->coord()->per_partition_status()) { - const pair<string, TInsertPartitionStatus> partition_status = v; - insert_result->rows_modified[partition_status.first] = - partition_status.second.num_modified_rows; - - if (partition_status.second.__isset.stats && - partition_status.second.stats.__isset.kudu_stats) { - has_kudu_stats = true; - } - num_row_errors += partition_status.second.stats.kudu_stats.num_row_errors; - } - if (has_kudu_stats) insert_result->__set_num_row_errors(num_row_errors); + request_state->coord()->dml_exec_state()->ToTInsertResult(insert_result); } } } http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/impala-hs2-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc index 4381f81..80ace87 100644 --- a/be/src/service/impala-hs2-server.cc +++ b/be/src/service/impala-hs2-server.cc @@ -34,6 +34,7 @@ #include "common/logging.h" #include "common/version.h" #include "rpc/thrift-util.h" +#include "runtime/coordinator.h" #include "runtime/raw-value.h" #include "runtime/exec-env.h" #include "service/hs2-util.h" http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/impala-http-handler.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc index 43b03d1..3841bfe 100644 --- a/be/src/service/impala-http-handler.cc +++ b/be/src/service/impala-http-handler.cc @@ -24,6 +24,7 @@ #include "catalog/catalog-util.h" #include "gen-cpp/beeswax_types.h" +#include "runtime/coordinator.h" #include "runtime/exec-env.h" #include "runtime/mem-tracker.h" #include "runtime/query-state.h" @@ -31,6 +32,7 @@ #include "runtime/timestamp-value.inline.h" #include "service/impala-server.h" #include "service/client-request-state.h" +#include "service/frontend.h" #include "thrift/protocol/TDebugProtocol.h" #include "util/coding-util.h" #include "util/logging-support.h"
