IMPALA-2550: Switch to per-query exec rpc Coordinator: - FragmentInstanceState -> BackendState, which in turn records FragmentInstanceStats
QueryState - does query-wide setup in a separate thread (which also launches the instance exec threads) - has a query-wide 'prepared' state at which point all static setup is done and all FragmentInstanceStates are accessible Also renamed QueryExecState to ClientRequestState. Simplified handling of execution status (in FragmentInstanceState): - status only transmitted via ReportExecStatus rpc - in particular, it's not returned anymore from the Cancel rpc FIS: Fixed bugs related to partially-prepared state (in Close() and ReleaseThreadToken()) Change-Id: I20769e420711737b6b385c744cef4851cee3facd Reviewed-on: http://gerrit.cloudera.org:8080/6535 Reviewed-by: Marcel Kornacker <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/368115cd Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/368115cd Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/368115cd Branch: refs/heads/master Commit: 368115cdae98344e5c826f099582a60aa536951d Parents: 12f3ecc Author: Marcel Kornacker <[email protected]> Authored: Mon Oct 31 14:42:51 2016 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Tue May 9 04:04:50 2017 +0000 ---------------------------------------------------------------------- be/src/benchmarks/expr-benchmark.cc | 2 +- be/src/codegen/codegen-anyval.h | 2 + be/src/common/status.cc | 9 + be/src/common/status.h | 4 + be/src/exec/data-sink.cc | 36 +- be/src/exec/data-sink.h | 25 +- be/src/exec/exec-node.cc | 15 +- be/src/exec/exec-node.h | 6 +- be/src/exec/hdfs-avro-table-writer.cc | 2 + be/src/exec/hdfs-avro-table-writer.h | 3 + be/src/exec/hdfs-parquet-table-writer.cc | 1 + be/src/exec/hdfs-scan-node-base.cc | 8 +- be/src/exec/hdfs-scan-node-base.h | 8 +- be/src/exec/hdfs-table-sink.cc | 1 + be/src/exec/hdfs-table-sink.h | 2 +- be/src/exec/hdfs-table-writer.cc | 4 + be/src/exec/hdfs-table-writer.h | 16 +- be/src/exprs/expr-test.cc | 4 +- be/src/runtime/CMakeLists.txt | 3 +- be/src/runtime/buffered-block-mgr-test.cc | 4 +- be/src/runtime/buffered-tuple-stream-v2-test.cc | 1 + be/src/runtime/bufferpool/buffer-pool-test.cc | 1 + be/src/runtime/coordinator-backend-state.cc | 524 ++++++ be/src/runtime/coordinator-backend-state.h | 272 +++ be/src/runtime/coordinator-filter-state.h | 123 ++ be/src/runtime/coordinator.cc | 1613 +++++------------- be/src/runtime/coordinator.h | 378 ++-- be/src/runtime/data-stream-test.cc | 35 +- be/src/runtime/debug-options.cc | 83 + be/src/runtime/debug-options.h | 58 + be/src/runtime/descriptors.cc | 45 +- be/src/runtime/descriptors.h | 15 +- be/src/runtime/exec-env.cc | 5 +- be/src/runtime/exec-env.h | 6 +- be/src/runtime/fragment-instance-state.cc | 500 ++++-- be/src/runtime/fragment-instance-state.h | 242 ++- be/src/runtime/mem-tracker.cc | 4 +- be/src/runtime/mem-tracker.h | 2 +- be/src/runtime/plan-fragment-executor.cc | 518 ------ be/src/runtime/plan-fragment-executor.h | 305 ---- be/src/runtime/query-exec-mgr.cc | 82 +- be/src/runtime/query-exec-mgr.h | 34 +- be/src/runtime/query-state.cc | 271 ++- be/src/runtime/query-state.h | 179 +- be/src/runtime/runtime-filter-bank.cc | 5 +- be/src/runtime/runtime-state.cc | 66 +- be/src/runtime/runtime-state.h | 29 +- be/src/runtime/test-env.cc | 34 +- be/src/runtime/test-env.h | 3 +- be/src/scheduling/query-schedule.cc | 16 +- be/src/scheduling/query-schedule.h | 8 +- be/src/service/CMakeLists.txt | 2 +- be/src/service/child-query.cc | 9 +- be/src/service/child-query.h | 22 +- be/src/service/client-request-state.cc | 1085 ++++++++++++ be/src/service/client-request-state.h | 413 +++++ be/src/service/fe-support.cc | 5 +- be/src/service/impala-beeswax-server.cc | 121 +- be/src/service/impala-hs2-server.cc | 143 +- be/src/service/impala-http-handler.cc | 39 +- be/src/service/impala-internal-service.cc | 60 +- be/src/service/impala-internal-service.h | 8 +- be/src/service/impala-server.cc | 314 ++-- be/src/service/impala-server.h | 334 ++-- be/src/service/query-exec-state.cc | 1084 ------------ be/src/service/query-exec-state.h | 408 ----- be/src/testutil/desc-tbl-builder.cc | 2 +- be/src/testutil/fault-injection-util.h | 4 +- be/src/util/error-util-test.cc | 8 +- be/src/util/error-util.cc | 6 +- be/src/util/error-util.h | 10 +- be/src/util/uid-util.h | 4 + common/thrift/ExecStats.thrift | 1 + common/thrift/ImpalaInternalService.thrift | 220 ++- tests/common/test_result_verifier.py | 12 +- 75 files changed, 5014 insertions(+), 4912 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/benchmarks/expr-benchmark.cc ---------------------------------------------------------------------- diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc index c6d075d..c9fd80f 100644 --- a/be/src/benchmarks/expr-benchmark.cc +++ b/be/src/benchmarks/expr-benchmark.cc @@ -74,7 +74,7 @@ class Planner { query_ctx.client_request.query_options = query_options_; query_ctx.__set_session(session_state_); ImpalaServer::PrepareQueryContext(&query_ctx); - runtime_state_.reset(new RuntimeState(query_ctx, &exec_env_, "")); + runtime_state_.reset(new RuntimeState(query_ctx, &exec_env_)); return frontend_.GetExecRequest(query_ctx, result); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/codegen/codegen-anyval.h ---------------------------------------------------------------------- diff --git a/be/src/codegen/codegen-anyval.h b/be/src/codegen/codegen-anyval.h index 696f542..063004a 100644 --- a/be/src/codegen/codegen-anyval.h +++ b/be/src/codegen/codegen-anyval.h @@ -259,6 +259,8 @@ class CodegenAnyVal { : type_(INVALID_TYPE), value_(nullptr), name_(nullptr), codegen_(nullptr), builder_(nullptr) {} + LlvmCodeGen* codegen() const { return codegen_; } + private: ColumnType type_; llvm::Value* value_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/common/status.cc ---------------------------------------------------------------------- diff --git a/be/src/common/status.cc b/be/src/common/status.cc index e3b821c..1adc654 100644 --- a/be/src/common/status.cc +++ b/be/src/common/status.cc @@ -17,10 +17,13 @@ #include <boost/algorithm/string/join.hpp> +#include <ostream> + #include "common/status.h" #include "util/debug-util.h" #include "common/names.h" +#include "gen-cpp/ErrorCodes_types.h" namespace impala { @@ -233,4 +236,10 @@ void Status::CopyMessageFrom(const Status& status) noexcept { msg_ = status.msg_ == NULL ? NULL : new ErrorMsg(*status.msg_); } +ostream& operator<<(ostream& os, const Status& status) { + os << _TErrorCode_VALUES_TO_NAMES.at(status.code()); + if (!status.ok()) os << ": " << status.GetDetail(); + return os; +} + } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/common/status.h ---------------------------------------------------------------------- diff --git a/be/src/common/status.h b/be/src/common/status.h index a3e44e0..9fa303b 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -19,6 +19,7 @@ #ifndef IMPALA_COMMON_STATUS_H #define IMPALA_COMMON_STATUS_H +#include <iosfwd> #include <string> #include <vector> @@ -260,6 +261,9 @@ class Status { ErrorMsg* msg_; }; +/// for debugging +std::ostream& operator<<(std::ostream& os, const Status& status); + /// some generally useful macros #define RETURN_IF_ERROR(stmt) \ do { \ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/data-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc index 967eda3..dad6247 100644 --- a/be/src/exec/data-sink.cc +++ b/be/src/exec/data-sink.cc @@ -48,11 +48,12 @@ DataSink::~DataSink() { DCHECK(closed_); } -Status DataSink::CreateDataSink(ObjectPool* pool, - const TDataSink& thrift_sink, const vector<TExpr>& output_exprs, +Status DataSink::Create(ObjectPool* pool, + const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& fragment_instance_ctx, - const RowDescriptor& row_desc, scoped_ptr<DataSink>* sink) { - DataSink* tmp_sink = NULL; + const RowDescriptor& row_desc, DataSink** sink) { + const TDataSink& thrift_sink = fragment_ctx.fragment.output_sink; + const vector<TExpr>& output_exprs = fragment_ctx.fragment.output_exprs; switch (thrift_sink.type) { case TDataSinkType::DATA_STREAM_SINK: if (!thrift_sink.__isset.stream_sink) { @@ -60,52 +61,45 @@ Status DataSink::CreateDataSink(ObjectPool* pool, } // TODO: figure out good buffer size based on size of output row - tmp_sink = new DataStreamSender(pool, - fragment_instance_ctx.sender_id, row_desc, thrift_sink.stream_sink, - fragment_instance_ctx.destinations, 16 * 1024); - sink->reset(tmp_sink); + *sink = pool->Add( + new DataStreamSender(pool, + fragment_instance_ctx.sender_id, row_desc, thrift_sink.stream_sink, + fragment_ctx.destinations, 16 * 1024)); break; case TDataSinkType::TABLE_SINK: if (!thrift_sink.__isset.table_sink) return Status("Missing table sink."); switch (thrift_sink.table_sink.type) { case TTableSinkType::HDFS: - tmp_sink = new HdfsTableSink(row_desc, output_exprs, thrift_sink); - sink->reset(tmp_sink); + *sink = pool->Add(new HdfsTableSink(row_desc, output_exprs, thrift_sink)); break; case TTableSinkType::HBASE: - tmp_sink = new HBaseTableSink(row_desc, output_exprs, thrift_sink); - sink->reset(tmp_sink); + *sink = pool->Add(new HBaseTableSink(row_desc, output_exprs, thrift_sink)); break; case TTableSinkType::KUDU: RETURN_IF_ERROR(CheckKuduAvailability()); - tmp_sink = new KuduTableSink(row_desc, output_exprs, thrift_sink); - sink->reset(tmp_sink); + *sink = pool->Add(new KuduTableSink(row_desc, output_exprs, thrift_sink)); break; default: stringstream error_msg; const char* str = "Unknown table sink"; map<int, const char*>::const_iterator i = _TTableSinkType_VALUES_TO_NAMES.find(thrift_sink.table_sink.type); - if (i != _TTableSinkType_VALUES_TO_NAMES.end()) { - str = i->second; - } + if (i != _TTableSinkType_VALUES_TO_NAMES.end()) str = i->second; error_msg << str << " not implemented."; return Status(error_msg.str()); } break; case TDataSinkType::PLAN_ROOT_SINK: - sink->reset(new PlanRootSink(row_desc, output_exprs, thrift_sink)); + *sink = pool->Add(new PlanRootSink(row_desc, output_exprs, thrift_sink)); break; default: stringstream error_msg; map<int, const char*>::const_iterator i = _TDataSinkType_VALUES_TO_NAMES.find(thrift_sink.type); const char* str = "Unknown data sink type "; - if (i != _TDataSinkType_VALUES_TO_NAMES.end()) { - str = i->second; - } + if (i != _TDataSinkType_VALUES_TO_NAMES.end()) str = i->second; error_msg << str << " not implemented."; return Status(error_msg.str()); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/data-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h index 0fc8eca..f20c40b 100644 --- a/be/src/exec/data-sink.h +++ b/be/src/exec/data-sink.h @@ -23,22 +23,21 @@ #include <vector> #include "common/status.h" -#include "runtime/runtime-state.h" -#include "util/runtime-profile.h" -#include "gen-cpp/DataSinks_types.h" +#include "runtime/runtime-state.h" // for PartitionStatusMap +#include "runtime/mem-tracker.h" #include "gen-cpp/Exprs_types.h" namespace impala { -class MemTracker; class ObjectPool; class RowBatch; class RuntimeProfile; -class RuntimeState; +class RowDescriptor; +class TDataSink; class TPlanExecRequest; class TPlanExecParams; class TPlanFragmentInstanceCtx; -class RowDescriptor; +class TInsertStats; /// A data sink is an abstract interface for data sinks that consume RowBatches. E.g. /// a sink may write a HDFS table, send data across the network, or build hash tables @@ -58,8 +57,8 @@ class DataSink { virtual std::string GetName() = 0; /// Setup. Call before Send(), Open(), or Close() during the prepare phase of the query - /// fragment. Creates a MemTracker for the sink that is a child of 'parent_mem_tracker'. - /// Subclasses must call DataSink::Prepare(). + /// fragment. Creates a MemTracker (in obj_pool) for the sink that is a child of + /// 'parent_mem_tracker'. Subclasses must call DataSink::Prepare(). virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker); /// Call before Send() to open the sink. @@ -78,12 +77,12 @@ class DataSink { /// Must be idempotent. virtual void Close(RuntimeState* state); - /// Creates a new data sink from thrift_sink. A pointer to the - /// new sink is written to *sink, and is owned by the caller. - static Status CreateDataSink(ObjectPool* pool, - const TDataSink& thrift_sink, const std::vector<TExpr>& output_exprs, + /// Creates a new data sink, allocated in pool and returned through *sink, from + /// thrift_sink. + static Status Create(ObjectPool* pool, + const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& fragment_instance_ctx, - const RowDescriptor& row_desc, boost::scoped_ptr<DataSink>* sink); + const RowDescriptor& row_desc, DataSink** sink); /// Merges one update to the DML stats for a partition. dst_stats will have the /// combined stats of src_stats and dst_stats after this method returns. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/exec-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index 02d4bf6..4c06ece 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -372,16 +372,17 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode, return Status::OK(); } -void ExecNode::SetDebugOptions( - int node_id, TExecNodePhase::type phase, TDebugAction::type action, - ExecNode* root) { - if (root->id_ == node_id) { - root->debug_phase_ = phase; - root->debug_action_ = action; +void ExecNode::SetDebugOptions(const TDebugOptions& debug_options, ExecNode* root) { + DCHECK(debug_options.__isset.node_id); + DCHECK(debug_options.__isset.phase); + DCHECK(debug_options.__isset.action); + if (root->id_ == debug_options.node_id) { + root->debug_phase_ = debug_options.phase; + root->debug_action_ = debug_options.action; return; } for (int i = 0; i < root->children_.size(); ++i) { - SetDebugOptions(node_id, phase, action, root->children_[i]); + SetDebugOptions(debug_options, root->children_[i]); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/exec-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h index aa0c379..c769be3 100644 --- a/be/src/exec/exec-node.h +++ b/be/src/exec/exec-node.h @@ -43,6 +43,7 @@ class TupleRow; class DataSink; class MemTracker; class SubplanNode; +class TDebugOptions; /// Superclass of all executor nodes. /// All subclasses need to make sure to check RuntimeState::is_cancelled() @@ -133,9 +134,8 @@ class ExecNode { static Status CreateTree(RuntimeState* state, const TPlan& plan, const DescriptorTbl& descs, ExecNode** root) WARN_UNUSED_RESULT; - /// Set debug action for node with given id in 'tree' - static void SetDebugOptions( - int node_id, TExecNodePhase::type phase, TDebugAction::type action, ExecNode* tree); + /// Set debug action in 'tree' according to debug_options. + static void SetDebugOptions(const TDebugOptions& debug_options, ExecNode* tree); /// Collect all nodes of given 'node_type' that are part of this subtree, and return in /// 'nodes'. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/hdfs-avro-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-table-writer.cc b/be/src/exec/hdfs-avro-table-writer.cc index ec0ee08..00a51dd 100644 --- a/be/src/exec/hdfs-avro-table-writer.cc +++ b/be/src/exec/hdfs-avro-table-writer.cc @@ -24,6 +24,7 @@ #include <gutil/strings/substitute.h> #include "exec/exec-node.h" +#include "exec/hdfs-table-sink.h" #include "util/compress.h" #include "util/hdfs-util.h" #include "util/uid-util.h" @@ -35,6 +36,7 @@ #include "runtime/row-batch.h" #include "runtime/runtime-state.h" #include "runtime/hdfs-fs-cache.h" +#include "runtime/types.h" #include "util/runtime-profile-counters.h" #include "write-stream.inline.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/hdfs-avro-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-table-writer.h b/be/src/exec/hdfs-avro-table-writer.h index 38e820e..01a79f7 100644 --- a/be/src/exec/hdfs-avro-table-writer.h +++ b/be/src/exec/hdfs-avro-table-writer.h @@ -22,13 +22,16 @@ #include <sstream> #include <string> +#include "common/status.h" #include "exec/hdfs-table-writer.h" +#include "runtime/mem-pool.h" #include "util/codec.h" #include "exec/write-stream.h" namespace impala { class Expr; +struct ColumnType; class TupleDescriptor; class TupleRow; class RuntimeState; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/hdfs-parquet-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc index 5c2d24c..ea0059d 100644 --- a/be/src/exec/hdfs-parquet-table-writer.cc +++ b/be/src/exec/hdfs-parquet-table-writer.cc @@ -18,6 +18,7 @@ #include "exec/hdfs-parquet-table-writer.h" #include "common/version.h" +#include "exec/hdfs-table-sink.h" #include "exec/parquet-column-stats.inline.h" #include "exprs/expr-context.h" #include "exprs/expr.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/hdfs-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index b22e67c..e4ba44d 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -312,7 +312,7 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) { ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID->Increment(num_ranges_missing_volume_id); // Add per volume stats to the runtime profile - PerVolumnStats per_volume_stats; + PerVolumeStats per_volume_stats; stringstream str; UpdateHdfsSplitStats(*scan_range_params_, &per_volume_stats); PrintHdfsSplitStats(per_volume_stats, &str); @@ -808,7 +808,7 @@ void HdfsScanNodeBase::ComputeSlotMaterializationOrder(vector<int>* order) const void HdfsScanNodeBase::UpdateHdfsSplitStats( const vector<TScanRangeParams>& scan_range_params_list, - PerVolumnStats* per_volume_stats) { + PerVolumeStats* per_volume_stats) { pair<int, int64_t> init_value(0, 0); for (const TScanRangeParams& scan_range_params: scan_range_params_list) { const TScanRange& scan_range = scan_range_params.scan_range; @@ -821,9 +821,9 @@ void HdfsScanNodeBase::UpdateHdfsSplitStats( } } -void HdfsScanNodeBase::PrintHdfsSplitStats(const PerVolumnStats& per_volume_stats, +void HdfsScanNodeBase::PrintHdfsSplitStats(const PerVolumeStats& per_volume_stats, stringstream* ss) { - for (PerVolumnStats::const_iterator i = per_volume_stats.begin(); + for (PerVolumeStats::const_iterator i = per_volume_stats.begin(); i != per_volume_stats.end(); ++i) { (*ss) << i->first << ":" << i->second.first << "/" << PrettyPrinter::Print(i->second.second, TUnit::BYTES) << " "; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/hdfs-scan-node-base.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index 3d97e2e..3453945 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -268,16 +268,18 @@ class HdfsScanNodeBase : public ScanNode { } /// map from volume id to <number of split, per volume split lengths> - typedef boost::unordered_map<int32_t, std::pair<int, int64_t>> PerVolumnStats; + /// TODO: move this into some global .h, no need to include this file just for this + /// typedef + typedef boost::unordered_map<int32_t, std::pair<int, int64_t>> PerVolumeStats; /// Update the per volume stats with the given scan range params list static void UpdateHdfsSplitStats( const std::vector<TScanRangeParams>& scan_range_params_list, - PerVolumnStats* per_volume_stats); + PerVolumeStats* per_volume_stats); /// Output the per_volume_stats to stringstream. The output format is a list of: /// <volume id>:<# splits>/<per volume split lengths> - static void PrintHdfsSplitStats(const PerVolumnStats& per_volume_stats, + static void PrintHdfsSplitStats(const PerVolumeStats& per_volume_stats, std::stringstream* ss); /// Description string for the per volume stats output. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/hdfs-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index ef983e2..16833c1 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -16,6 +16,7 @@ // under the License. #include "exec/hdfs-table-sink.h" +#include "exec/hdfs-table-writer.h" #include "exec/hdfs-text-table-writer.h" #include "exec/hdfs-sequence-table-writer.h" #include "exec/hdfs-avro-table-writer.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/hdfs-table-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h index d452f3d..4ccd2fe 100644 --- a/be/src/exec/hdfs-table-sink.h +++ b/be/src/exec/hdfs-table-sink.h @@ -26,6 +26,7 @@ /// needed for scoped_ptr to work on ObjectPool #include "common/object-pool.h" #include "exec/data-sink.h" +#include "exec/hdfs-table-writer.h" #include "runtime/descriptors.h" namespace impala { @@ -34,7 +35,6 @@ class Expr; class TupleDescriptor; class TupleRow; class RuntimeState; -class HdfsTableWriter; class MemTracker; /// Records the temporary and final Hdfs file name, the opened temporary Hdfs file, and http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/hdfs-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-writer.cc b/be/src/exec/hdfs-table-writer.cc index 48349e3..b84915a 100644 --- a/be/src/exec/hdfs-table-writer.cc +++ b/be/src/exec/hdfs-table-writer.cc @@ -17,8 +17,12 @@ #include "exec/hdfs-table-writer.h" +#include <sstream> + #include "common/names.h" #include "runtime/mem-tracker.h" +#include "exec/hdfs-table-sink.h" +#include "util/hdfs-util.h" namespace impala { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/hdfs-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-writer.h b/be/src/exec/hdfs-table-writer.h index 86fe56c..cad304f 100644 --- a/be/src/exec/hdfs-table-writer.h +++ b/be/src/exec/hdfs-table-writer.h @@ -19,16 +19,22 @@ #ifndef IMPALA_EXEC_HDFS_TABLE_WRITER_H #define IMPALA_EXEC_HDFS_TABLE_WRITER_H +#include <vector> #include <hdfs.h> -#include <boost/scoped_ptr.hpp> -#include <boost/unordered_map.hpp> -#include "runtime/descriptors.h" -#include "exec/hdfs-table-sink.h" -#include "util/hdfs-util.h" +#include "common/status.h" +#include "gen-cpp/ImpalaInternalService_types.h" namespace impala { +class RuntimeState; +class OutputPartition; +class ExprContext; +class RowBatch; +class HdfsPartitionDescriptor; +class HdfsTableDescriptor; +class HdfsTableSink; + /// Pure virtual class for writing to hdfs table partition files. /// Subclasses implement the code needed to write to a specific file type. /// A subclass needs to implement functions to format and add rows to the file http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exprs/expr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc index 0de6833..a775e7b 100644 --- a/be/src/exprs/expr-test.cc +++ b/be/src/exprs/expr-test.cc @@ -1057,7 +1057,7 @@ void TestSingleLiteralConstruction( const ColumnType& type, const T& value, const string& string_val) { ObjectPool pool; RowDescriptor desc; - RuntimeState state{TQueryCtx(), ExecEnv::GetInstance(), "test-pool"}; + RuntimeState state(TQueryCtx(), ExecEnv::GetInstance()); MemTracker tracker; Expr* expr = pool.Add(new Literal(type, value)); @@ -1074,7 +1074,7 @@ TEST_F(ExprTest, NullLiteral) { for (int type = TYPE_BOOLEAN; type != TYPE_DATE; ++type) { NullLiteral expr(static_cast<PrimitiveType>(type)); ExprContext ctx(&expr); - RuntimeState state{TQueryCtx(), ExecEnv::GetInstance(), "test-pool"}; + RuntimeState state(TQueryCtx(), ExecEnv::GetInstance()); MemTracker tracker; EXPECT_OK(ctx.Prepare(&state, RowDescriptor(), &tracker)); EXPECT_OK(ctx.Open(&state)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 773192d..2de0f2e 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -29,9 +29,11 @@ add_library(Runtime buffered-tuple-stream-v2.cc client-cache.cc coordinator.cc + coordinator-backend-state.cc data-stream-mgr.cc data-stream-sender.cc data-stream-recvr.cc + debug-options.cc descriptors.cc disk-io-mgr.cc disk-io-mgr-reader-context.cc @@ -48,7 +50,6 @@ add_library(Runtime mem-pool.cc multi-precision.cc parallel-executor.cc - plan-fragment-executor.cc query-exec-mgr.cc query-state.cc test-env.cc http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/buffered-block-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-block-mgr-test.cc b/be/src/runtime/buffered-block-mgr-test.cc index 65fd168..b8b90ee 100644 --- a/be/src/runtime/buffered-block-mgr-test.cc +++ b/be/src/runtime/buffered-block-mgr-test.cc @@ -573,7 +573,7 @@ class BufferedBlockMgrTest : public ::testing::Test { const int num_threads = 8; thread_group workers; // Create a shared RuntimeState with no BufferedBlockMgr. - RuntimeState shared_state(TQueryCtx(), test_env_->exec_env(), "test-pool"); + RuntimeState shared_state(TQueryCtx(), test_env_->exec_env()); for (int i = 0; i < num_threads; ++i) { thread* t = new thread( @@ -978,7 +978,7 @@ TEST_F(BufferedBlockMgrTest, WriteCompleteWithCancelledRuntimeState) { // Cancel the runtime state and re-pin the blocks while writes are in flight to check // that WriteComplete() handles the case ok. - state->set_is_cancelled(true); + state->set_is_cancelled(); PinBlocks(blocks); WaitForWrites(block_mgr); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/buffered-tuple-stream-v2-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-v2-test.cc b/be/src/runtime/buffered-tuple-stream-v2-test.cc index da75212..1ae9181 100644 --- a/be/src/runtime/buffered-tuple-stream-v2-test.cc +++ b/be/src/runtime/buffered-tuple-stream-v2-test.cc @@ -26,6 +26,7 @@ #include "codegen/llvm-codegen.h" #include "gutil/gscoped_ptr.h" #include "runtime/buffered-tuple-stream-v2.inline.h" +#include "runtime/query-state.h" #include "runtime/bufferpool/reservation-tracker.h" #include "runtime/collection-value-builder.h" #include "runtime/collection-value.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/bufferpool/buffer-pool-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc index dae7bcd..6b9177e 100644 --- a/be/src/runtime/bufferpool/buffer-pool-test.cc +++ b/be/src/runtime/bufferpool/buffer-pool-test.cc @@ -34,6 +34,7 @@ #include "runtime/bufferpool/buffer-pool.h" #include "runtime/bufferpool/reservation-tracker.h" #include "runtime/test-env.h" +#include "runtime/query-state.h" #include "service/fe-support.h" #include "testutil/cpu-util.h" #include "testutil/death-test-util.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/coordinator-backend-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc new file mode 100644 index 0000000..cd3a741 --- /dev/null +++ b/be/src/runtime/coordinator-backend-state.cc @@ -0,0 +1,524 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/coordinator-backend-state.h" + +#include <sstream> +#include <boost/thread/locks.hpp> +#include <boost/thread/lock_guard.hpp> +#include <boost/accumulators/accumulators.hpp> + +#include "common/object-pool.h" +#include "exec/exec-node.h" +#include "exec/scan-node.h" +#include "scheduling/query-schedule.h" +#include "runtime/exec-env.h" +#include "runtime/fragment-instance-state.h" +#include "runtime/debug-options.h" +#include "runtime/client-cache.h" +#include "runtime/client-cache-types.h" +#include "runtime/backend-client.h" +#include "runtime/coordinator-filter-state.h" +#include "util/error-util.h" +#include "util/uid-util.h" +#include "util/network-util.h" +#include "util/counting-barrier.h" +#include "util/progress-updater.h" +#include "gen-cpp/Types_types.h" +#include "gen-cpp/ImpalaInternalService_types.h" +#include "gen-cpp/ImpalaInternalService_constants.h" +#include "common/names.h" + +using namespace impala; +namespace accumulators = boost::accumulators; + +Coordinator::BackendState::BackendState( + const TUniqueId& query_id, int state_idx, TRuntimeFilterMode::type filter_mode) + : query_id_(query_id), + state_idx_(state_idx), + filter_mode_(filter_mode), + rpc_latency_(0), + rpc_sent_(false), + peak_consumption_(0L) { +} + +void Coordinator::BackendState::Init( + const vector<const FInstanceExecParams*>& instance_params_list, + const vector<FragmentStats*>& fragment_stats, ObjectPool* obj_pool) { + instance_params_list_ = instance_params_list; + host_ = instance_params_list_[0]->host; + num_remaining_instances_ = instance_params_list.size(); + + // populate instance_stats_map_ and install instance + // profiles as child profiles in fragment_stats' profile + int prev_fragment_idx = -1; + for (const FInstanceExecParams* instance_params: instance_params_list) { + DCHECK_EQ(host_, instance_params->host); // all hosts must be the same + int fragment_idx = instance_params->fragment().idx; + DCHECK_LT(fragment_idx, fragment_stats.size()); + if (prev_fragment_idx != -1 && fragment_idx != prev_fragment_idx) { + // all instances of a fragment are contiguous + DCHECK_EQ(fragments_.count(fragment_idx), 0); + prev_fragment_idx = fragment_idx; + } + fragments_.insert(fragment_idx); + + instance_stats_map_.emplace( + GetInstanceIdx(instance_params->instance_id), + obj_pool->Add( + new InstanceStats(*instance_params, fragment_stats[fragment_idx], obj_pool))); + } +} + +void Coordinator::BackendState::SetRpcParams( + const DebugOptions& debug_options, const FilterRoutingTable& filter_routing_table, + TExecQueryFInstancesParams* rpc_params) { + rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1); + rpc_params->__set_coord_state_idx(state_idx_); + + // set fragment_ctxs and fragment_instance_ctxs + rpc_params->fragment_instance_ctxs.resize(instance_params_list_.size()); + for (int i = 0; i < instance_params_list_.size(); ++i) { + TPlanFragmentInstanceCtx& instance_ctx = rpc_params->fragment_instance_ctxs[i]; + const FInstanceExecParams& params = *instance_params_list_[i]; + int fragment_idx = params.fragment_exec_params.fragment.idx; + + // add a TPlanFragmentCtx, if we don't already have it + if (rpc_params->fragment_ctxs.empty() + || rpc_params->fragment_ctxs.back().fragment.idx != fragment_idx) { + rpc_params->fragment_ctxs.emplace_back(); + TPlanFragmentCtx& fragment_ctx = rpc_params->fragment_ctxs.back(); + fragment_ctx.__set_fragment(params.fragment_exec_params.fragment); + fragment_ctx.__set_destinations(params.fragment_exec_params.destinations); + } + + instance_ctx.fragment_idx = fragment_idx; + instance_ctx.fragment_instance_id = params.instance_id; + instance_ctx.per_fragment_instance_idx = params.per_fragment_instance_idx; + instance_ctx.__set_per_node_scan_ranges(params.per_node_scan_ranges); + instance_ctx.__set_per_exch_num_senders( + params.fragment_exec_params.per_exch_num_senders); + instance_ctx.__set_sender_id(params.sender_id); + if (debug_options.node_id() != -1 + && (debug_options.instance_idx() == -1 + || debug_options.instance_idx() == GetInstanceIdx(params.instance_id))) { + instance_ctx.__set_debug_options(debug_options.ToThrift()); + } + + if (filter_mode_ == TRuntimeFilterMode::OFF) continue; + + // Remove filters that weren't selected during filter routing table construction. + // TODO: do this more efficiently, we're looping over the entire plan for each + // instance separately + DCHECK_EQ(rpc_params->query_ctx.client_request.query_options.mt_dop, 0); + int instance_idx = GetInstanceIdx(params.instance_id); + for (TPlanNode& plan_node: rpc_params->fragment_ctxs.back().fragment.plan.nodes) { + if (!plan_node.__isset.hash_join_node) continue; + if (!plan_node.__isset.runtime_filters) continue; + + vector<TRuntimeFilterDesc> required_filters; + for (const TRuntimeFilterDesc& desc: plan_node.runtime_filters) { + FilterRoutingTable::const_iterator filter_it = + filter_routing_table.find(desc.filter_id); + // filter was dropped in Coordinator::InitFilterRoutingTable() + if (filter_it == filter_routing_table.end()) continue; + const FilterState& f = filter_it->second; + if (f.src_fragment_instance_idxs().find(instance_idx) + == f.src_fragment_instance_idxs().end()) { + DCHECK(desc.is_broadcast_join); + continue; + } + // We don't need a target-side check here, because a filter is either sent to + // all its targets or none, and the none case is handled by checking if the + // filter is in the routing table. + required_filters.push_back(desc); + } + plan_node.__set_runtime_filters(required_filters); + } + } +} + +void Coordinator::BackendState::Exec( + const TQueryCtx& query_ctx, const DebugOptions& debug_options, + const FilterRoutingTable& filter_routing_table, + CountingBarrier* exec_complete_barrier) { + NotifyBarrierOnExit notifier(exec_complete_barrier); + TExecQueryFInstancesParams rpc_params; + rpc_params.__set_query_ctx(query_ctx); + SetRpcParams(debug_options, filter_routing_table, &rpc_params); + VLOG_FILE << "making rpc: ExecQueryFInstances" + << " host=" << impalad_address() << " query_id=" << PrintId(query_id_); + + // guard against concurrent UpdateBackendExecStatus() that may arrive after RPC returns + lock_guard<mutex> l(lock_); + int64_t start = MonotonicMillis(); + + ImpalaBackendConnection backend_client( + ExecEnv::GetInstance()->impalad_client_cache(), impalad_address(), &status_); + if (!status_.ok()) return; + + TExecQueryFInstancesResult thrift_result; + Status rpc_status = backend_client.DoRpc( + &ImpalaBackendClient::ExecQueryFInstances, rpc_params, &thrift_result); + rpc_sent_ = true; + rpc_latency_ = MonotonicMillis() - start; + + const string ERR_TEMPLATE = + "ExecQueryFInstances rpc query_id=$0 failed: $1"; + + if (!rpc_status.ok()) { + const string& err_msg = + Substitute(ERR_TEMPLATE, PrintId(query_id_), rpc_status.msg().msg()); + VLOG_QUERY << err_msg; + status_ = Status(err_msg); + return; + } + + Status exec_status = Status(thrift_result.status); + if (!exec_status.ok()) { + const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id_), + exec_status.msg().GetFullMessageDetails()); + VLOG_QUERY << err_msg; + status_ = Status(err_msg); + return; + } + + for (const auto& entry: instance_stats_map_) entry.second->stopwatch_.Start(); + VLOG_FILE << "rpc succeeded: ExecQueryFInstances query_id=" << PrintId(query_id_); +} + +Status Coordinator::BackendState::GetStatus(TUniqueId* failed_instance_id) { + lock_guard<mutex> l(lock_); + if (!status_.ok() && failed_instance_id != nullptr) { + *failed_instance_id = failed_instance_id_; + } + return status_; +} + +int64_t Coordinator::BackendState::GetPeakConsumption() { + lock_guard<mutex> l(lock_); + return peak_consumption_; +} + +void Coordinator::BackendState::MergeErrorLog(ErrorLogMap* merged) { + lock_guard<mutex> l(lock_); + if (error_log_.size() > 0) MergeErrorMaps(error_log_, merged); +} + +bool Coordinator::BackendState::IsDone() { + lock_guard<mutex> l(lock_); + return IsDoneInternal(); +} + +inline bool Coordinator::BackendState::IsDoneInternal() const { + return num_remaining_instances_ == 0 || !status_.ok(); +} + +void Coordinator::BackendState::ApplyExecStatusReport( + const TReportExecStatusParams& backend_exec_status, ExecSummary* exec_summary, + ProgressUpdater* scan_range_progress, bool* done) { + lock_guard<SpinLock> l1(exec_summary->lock); + lock_guard<mutex> l2(lock_); + for (const TFragmentInstanceExecStatus& instance_exec_status: + backend_exec_status.instance_exec_status) { + Status instance_status(instance_exec_status.status); + if (instance_status.ok()) { + int instance_idx = GetInstanceIdx(instance_exec_status.fragment_instance_id); + DCHECK_EQ(instance_stats_map_.count(instance_idx), 1); + InstanceStats* instance_stats = instance_stats_map_[instance_idx]; + DCHECK_EQ(instance_stats->exec_params_.instance_id, + instance_exec_status.fragment_instance_id); + instance_stats->Update(instance_exec_status, exec_summary, scan_range_progress); + if (instance_stats->peak_mem_counter_ != nullptr) { + // protect against out-of-order status updates + peak_consumption_ = + max(peak_consumption_, instance_stats->peak_mem_counter_->value()); + } + } else { + // if a query is aborted due to an error encountered by a single fragment instance, + // all other fragment instances will report a cancelled status; make sure not + // to mask the original error status + if (status_.ok() || status_.IsCancelled()) { + status_ = instance_status; + failed_instance_id_ = instance_exec_status.fragment_instance_id; + } + } + DCHECK_GT(num_remaining_instances_, 0); + if (instance_exec_status.done) --num_remaining_instances_; + + // TODO: clean up the ReportQuerySummary() mess + if (status_.ok()) { + // We can't update this backend's profile if ReportQuerySummary() is running, + // because it depends on all profiles not changing during its execution (when it + // calls SortChildren()). ReportQuerySummary() only gets called after + // WaitForBackendCompletion() returns or at the end of CancelFragmentInstances(). + // WaitForBackendCompletion() only returns after all backends have completed (in + // which case we wouldn't be in this function), or when there's an error, in which + // case CancelFragmentInstances() is called. CancelFragmentInstances sets all + // exec_state's statuses to cancelled. + // TODO: We're losing this profile information. Call ReportQuerySummary only after + // all backends have completed. + } + } + + // Log messages aggregated by type + if (backend_exec_status.__isset.error_log && backend_exec_status.error_log.size() > 0) { + // Append the log messages from each update with the global state of the query + // execution + MergeErrorMaps(backend_exec_status.error_log, &error_log_); + VLOG_FILE << "host=" << host_ << " error log: " << PrintErrorMapToString(error_log_); + } + + *done = IsDoneInternal(); + // TODO: keep backend-wide stopwatch? +} + +void Coordinator::BackendState::UpdateExecStats( + const vector<FragmentStats*>& fragment_stats) { + lock_guard<mutex> l(lock_); + for (const auto& entry: instance_stats_map_) { + const InstanceStats& instance_stats = *entry.second; + int fragment_idx = instance_stats.exec_params_.fragment().idx; + DCHECK_LT(fragment_idx, fragment_stats.size()); + FragmentStats* f = fragment_stats[fragment_idx]; + int64_t completion_time = instance_stats.stopwatch_.ElapsedTime(); + f->completion_times_(completion_time); + if (completion_time > 0) { + f->rates_(instance_stats.total_split_size_ + / (completion_time / 1000.0 / 1000.0 / 1000.0)); + } + f->avg_profile_->UpdateAverage(instance_stats.profile_); + } +} + +bool Coordinator::BackendState::Cancel() { + lock_guard<mutex> l(lock_); + + // Nothing to cancel if the exec rpc was not sent + if (!rpc_sent_) return false; + + // don't cancel if it already finished (for any reason) + if (IsDoneInternal()) return false; + + /// If the status is not OK, we still try to cancel - !OK status might mean + /// communication failure between backend and coordinator, but fragment + /// instances might still be running. + + // set an error status to make sure we only cancel this once + if (status_.ok()) status_ = Status::CANCELLED; + + Status status; + ImpalaBackendConnection backend_client( + ExecEnv::GetInstance()->impalad_client_cache(), impalad_address(), &status); + if (!status.ok()) return false; + TCancelQueryFInstancesParams params; + params.protocol_version = ImpalaInternalServiceVersion::V1; + params.__set_query_id(query_id_); + TCancelQueryFInstancesResult dummy; + VLOG_QUERY << "sending CancelQueryFInstances rpc for query_id=" + << query_id_ << " backend=" << impalad_address(); + Status rpc_status; + // Try to send the RPC 3 times before failing. + bool retry_is_safe; + for (int i = 0; i < 3; ++i) { + rpc_status = backend_client.DoRpc( + &ImpalaBackendClient::CancelQueryFInstances, params, &dummy, &retry_is_safe); + if (rpc_status.ok() || !retry_is_safe) break; + } + if (!rpc_status.ok()) { + status_.MergeStatus(rpc_status); + stringstream msg; + msg << "CancelQueryFInstances rpc query_id=" << query_id_ + << " failed: " << rpc_status.msg().msg(); + status_.AddDetail(msg.str()); + return true; + } + return true; +} + +void Coordinator::BackendState::PublishFilter( + shared_ptr<TPublishFilterParams> rpc_params) { + DCHECK_EQ(rpc_params->dst_query_id, query_id_); + if (fragments_.count(rpc_params->dst_fragment_idx) == 0) return; + Status status; + ImpalaBackendConnection backend_client( + ExecEnv::GetInstance()->impalad_client_cache(), host_, &status); + if (!status.ok()) return; + // Make a local copy of the shared 'master' set of parameters + TPublishFilterParams local_params(*rpc_params); + local_params.__set_bloom_filter(rpc_params->bloom_filter); + TPublishFilterResult res; + backend_client.DoRpc(&ImpalaBackendClient::PublishFilter, local_params, &res); + // TODO: switch back to the following once we fix the lifecycle + // problems of Coordinator + //std::cref(fragment_inst->impalad_address()), + //std::cref(fragment_inst->fragment_instance_id()))); +} + +Coordinator::BackendState::InstanceStats::InstanceStats( + const FInstanceExecParams& exec_params, FragmentStats* fragment_stats, + ObjectPool* obj_pool) + : exec_params_(exec_params), + profile_(nullptr), + profile_created_(false), + total_split_size_(0), + total_ranges_complete_(0) { + const string& profile_name = Substitute("Instance $0 (host=$1)", + PrintId(exec_params.instance_id), lexical_cast<string>(exec_params.host)); + profile_ = obj_pool->Add(new RuntimeProfile(obj_pool, profile_name)); + fragment_stats->root_profile()->AddChild(profile_); + + // add total split size to fragment_stats->bytes_assigned() + for (const PerNodeScanRanges::value_type& entry: exec_params_.per_node_scan_ranges) { + for (const TScanRangeParams& scan_range_params: entry.second) { + if (!scan_range_params.scan_range.__isset.hdfs_file_split) continue; + total_split_size_ += scan_range_params.scan_range.hdfs_file_split.length; + } + } + (*fragment_stats->bytes_assigned())(total_split_size_); +} + +void Coordinator::BackendState::InstanceStats::InitCounters() { + vector<RuntimeProfile*> children; + profile_->GetAllChildren(&children); + for (RuntimeProfile* p: children) { + PlanNodeId id = ExecNode::GetNodeIdFromProfile(p); + // This profile is not for an exec node. + if (id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) continue; + + RuntimeProfile::Counter* c = + p->GetCounter(ScanNode::SCAN_RANGES_COMPLETE_COUNTER); + if (c != nullptr) scan_ranges_complete_counters_.push_back(c); + } + + peak_mem_counter_ = + profile_->GetCounter(FragmentInstanceState::PER_HOST_PEAK_MEM_COUNTER); +} + +void Coordinator::BackendState::InstanceStats::Update( + const TFragmentInstanceExecStatus& exec_status, + ExecSummary* exec_summary, ProgressUpdater* scan_range_progress) { + DCHECK(Status(exec_status.status).ok()); + if (exec_status.done) stopwatch_.Stop(); + profile_->Update(exec_status.profile); + if (!profile_created_) { + profile_created_ = true; + InitCounters(); + } + profile_->ComputeTimeInProfile(); + + // update exec_summary + // TODO: why do this every time we get an updated instance profile? + vector<RuntimeProfile*> children; + profile_->GetAllChildren(&children); + + TExecSummary& thrift_exec_summary = exec_summary->thrift_exec_summary; + for (RuntimeProfile* child: children) { + int node_id = ExecNode::GetNodeIdFromProfile(child); + if (node_id == -1) continue; + + // TODO: create plan_node_id_to_summary_map_ + TPlanNodeExecSummary& node_exec_summary = + thrift_exec_summary.nodes[exec_summary->node_id_to_idx_map[node_id]]; + int per_fragment_instance_idx = exec_params_.per_fragment_instance_idx; + DCHECK_LT(per_fragment_instance_idx, node_exec_summary.exec_stats.size()) + << " node_id=" << node_id << " instance_id=" << PrintId(exec_params_.instance_id) + << " fragment_idx=" << exec_params_.fragment().idx; + TExecStats& instance_stats = + node_exec_summary.exec_stats[per_fragment_instance_idx]; + + RuntimeProfile::Counter* rows_counter = child->GetCounter("RowsReturned"); + RuntimeProfile::Counter* mem_counter = child->GetCounter("PeakMemoryUsage"); + if (rows_counter != nullptr) instance_stats.__set_cardinality(rows_counter->value()); + if (mem_counter != nullptr) instance_stats.__set_memory_used(mem_counter->value()); + instance_stats.__set_latency_ns(child->local_time()); + // TODO: track interesting per-node metrics + node_exec_summary.__isset.exec_stats = true; + } + + // determine newly-completed scan ranges and update scan_range_progress + int64_t total = 0; + for (RuntimeProfile::Counter* c: scan_ranges_complete_counters_) total += c->value(); + int64_t delta = total - total_ranges_complete_; + total_ranges_complete_ = total; + scan_range_progress->Update(delta); +} + +Coordinator::FragmentStats::FragmentStats(const string& avg_profile_name, + const string& root_profile_name, int num_instances, ObjectPool* obj_pool) + : avg_profile_( + obj_pool->Add(new RuntimeProfile(obj_pool, avg_profile_name, true))), + root_profile_( + obj_pool->Add(new RuntimeProfile(obj_pool, root_profile_name))), + num_instances_(num_instances) { +} + +void Coordinator::FragmentStats::AddSplitStats() { + double min = accumulators::min(bytes_assigned_); + double max = accumulators::max(bytes_assigned_); + double mean = accumulators::mean(bytes_assigned_); + double stddev = sqrt(accumulators::variance(bytes_assigned_)); + stringstream ss; + ss << " min: " << PrettyPrinter::Print(min, TUnit::BYTES) + << ", max: " << PrettyPrinter::Print(max, TUnit::BYTES) + << ", avg: " << PrettyPrinter::Print(mean, TUnit::BYTES) + << ", stddev: " << PrettyPrinter::Print(stddev, TUnit::BYTES); + avg_profile_->AddInfoString("split sizes", ss.str()); +} + +// Comparator to order RuntimeProfiles by descending total time +typedef struct { + typedef pair<RuntimeProfile*, bool> Profile; + bool operator()(const Profile& a, const Profile& b) const { + // Reverse ordering: we want the longest first + return + a.first->total_time_counter()->value() > b.first->total_time_counter()->value(); + } +} InstanceComparator; + +void Coordinator::FragmentStats::AddExecStats() { + InstanceComparator comparator; + root_profile_->SortChildren(comparator); + + stringstream times_label; + times_label + << "min:" << PrettyPrinter::Print( + accumulators::min(completion_times_), TUnit::TIME_NS) + << " max:" << PrettyPrinter::Print( + accumulators::max(completion_times_), TUnit::TIME_NS) + << " mean: " << PrettyPrinter::Print( + accumulators::mean(completion_times_), TUnit::TIME_NS) + << " stddev:" << PrettyPrinter::Print( + sqrt(accumulators::variance(completion_times_)), TUnit::TIME_NS); + + stringstream rates_label; + rates_label + << "min:" << PrettyPrinter::Print( + accumulators::min(rates_), TUnit::BYTES_PER_SECOND) + << " max:" << PrettyPrinter::Print( + accumulators::max(rates_), TUnit::BYTES_PER_SECOND) + << " mean:" << PrettyPrinter::Print( + accumulators::mean(rates_), TUnit::BYTES_PER_SECOND) + << " stddev:" << PrettyPrinter::Print( + sqrt(accumulators::variance(rates_)), TUnit::BYTES_PER_SECOND); + + // why plural? + avg_profile_->AddInfoString("completion times", times_label.str()); + // why plural? + avg_profile_->AddInfoString("execution rates", rates_label.str()); + avg_profile_->AddInfoString("num instances", lexical_cast<string>(num_instances_)); +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/coordinator-backend-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h new file mode 100644 index 0000000..b4f9fea --- /dev/null +++ b/be/src/runtime/coordinator-backend-state.h @@ -0,0 +1,272 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_COORDINATOR_BACKEND_STATE_H +#define IMPALA_RUNTIME_COORDINATOR_BACKEND_STATE_H + +#include <vector> +#include <unordered_set> + +#include <boost/thread/mutex.hpp> + +#include "runtime/coordinator.h" +#include "util/progress-updater.h" +#include "util/stopwatch.h" +#include "util/runtime-profile.h" +#include "gen-cpp/Types_types.h" + +namespace impala { + +class ProgressUpdater; +class FInstanceExecParams; +class ObjectPool; +class DebugOptions; +class CountingBarrier; +class TUniqueId; +class TQueryCtx; +class TReportExecStatusParams; +class ExecSummary; + +/// This class manages all aspects of the execution of all fragment instances of a +/// single query on a particular backend. +/// Thread-safe unless pointed out otherwise. +class Coordinator::BackendState { + public: + BackendState(const TUniqueId& query_id, int state_idx, + TRuntimeFilterMode::type filter_mode); + + /// Creates InstanceStats for all entries in instance_params_list in obj_pool + /// and installs the instance profiles as children of the corresponding FragmentStats' + /// root profile. + /// Separated from c'tor to simplify future handling of out-of-mem errors. + void Init(const vector<const FInstanceExecParams*>& instance_params_list, + const std::vector<FragmentStats*>& fragment_stats, ObjectPool* obj_pool); + + /// Starts query execution at this backend by issuing an ExecQueryFInstances rpc and + /// notifies on rpc_complete_barrier when the rpc completes. Success/failure is + /// communicated through GetStatus(). Uses filter_routing_table to remove filters + /// that weren't selected during its construction. + /// The debug_options are applied to the appropriate TPlanFragmentInstanceCtxs, based + /// on their node_id/instance_idx. + void Exec(const TQueryCtx& query_ctx, const DebugOptions& debug_options, + const FilterRoutingTable& filter_routing_table, + CountingBarrier* rpc_complete_barrier); + + /// Update overall execution status, including the instances' exec status/profiles + /// and the error log. Updates the fragment instances' TExecStats in exec_summary + /// (exec_summary->nodes.exec_stats) and updates progress_update, and sets + /// done to true if all fragment instances completed, regardless of status. + /// If any instance reports an error, the overall execution status becomes the first + /// reported error status and 'done' is set to true. + void ApplyExecStatusReport(const TReportExecStatusParams& backend_exec_status, + ExecSummary* exec_summary, ProgressUpdater* scan_range_progress, bool* done); + + /// Update completion_times, rates, and avg_profile for all fragment_stats. + void UpdateExecStats(const std::vector<FragmentStats*>& fragment_stats); + + /// Make a PublishFilter rpc with given params if this backend has instances of the + /// fragment with idx == rpc_params->dst_fragment_idx, otherwise do nothing. + /// This takes by-value parameters because we cannot guarantee that the originating + /// coordinator won't be destroyed while this executes. + /// TODO: switch to references when we fix the lifecycle problems of coordinators. + void PublishFilter(std::shared_ptr<TPublishFilterParams> rpc_params); + + /// Cancel execution at this backend if anything is running. Returns true + /// if cancellation was attempted, false otherwise. + bool Cancel(); + + /// Return the overall execution status. For an error status, also return the id + /// of the instance that caused that status, if failed_instance_id != nullptr. + Status GetStatus(TUniqueId* failed_instance_id = nullptr) WARN_UNUSED_RESULT; + + /// Return peak memory consumption. + int64_t GetPeakConsumption(); + + /// Merge the accumulated error log into 'merged'. + void MergeErrorLog(ErrorLogMap* merged); + + const TNetworkAddress& impalad_address() const { return host_; } + int state_idx() const { return state_idx_; } + + /// only valid after Exec() + int64_t rpc_latency() const { return rpc_latency_; } + + /// Return true if execution at this backend is done. + bool IsDone(); + + private: + /// Execution stats for a single fragment instance. + /// Not thread-safe. + class InstanceStats { + public: + InstanceStats(const FInstanceExecParams& exec_params, FragmentStats* fragment_stats, + ObjectPool* obj_pool); + + /// Update 'this' with exec_status, the fragment instances' TExecStats in + /// exec_summary, and 'progress_updater' with the number of + /// newly completed scan ranges. Also updates the instance's avg profile. + void Update(const TFragmentInstanceExecStatus& exec_status, + ExecSummary* exec_summary, ProgressUpdater* scan_range_progress); + + int per_fragment_instance_idx() const { + return exec_params_.per_fragment_instance_idx; + } + + private: + friend class BackendState; + + /// query lifetime + const FInstanceExecParams& exec_params_; + + /// owned by coordinator object pool provided in the c'tor, created in Update() + RuntimeProfile* profile_; + + /// true after the first call to profile->Update() + bool profile_created_; + + /// cumulative size of all splits; set in c'tor + int64_t total_split_size_; + + /// wall clock timer for this instance + MonotonicStopWatch stopwatch_; + + /// total scan ranges complete across all scan nodes + int64_t total_ranges_complete_; + + /// SCAN_RANGES_COMPLETE_COUNTERs in profile_ + std::vector<RuntimeProfile::Counter*> scan_ranges_complete_counters_; + + /// PER_HOST_PEAK_MEM_COUNTER + RuntimeProfile::Counter* peak_mem_counter_; + + /// Extract scan_ranges_complete_counters_ and peak_mem_counter_ from profile_. + void InitCounters(); + }; + + const TUniqueId query_id_; + const int state_idx_; /// index of 'this' in Coordinator::backend_states_ + const TRuntimeFilterMode::type filter_mode_; + + /// all instances of a particular fragment are contiguous in this vector; + /// query lifetime + std::vector<const FInstanceExecParams*> instance_params_list_; + + /// map from instance idx to InstanceStats, the latter live in the obj_pool parameter + /// of Init() + std::unordered_map<int, InstanceStats*> instance_stats_map_; + + /// indices of fragments executing on this backend, populated in Init() + std::unordered_set<int> fragments_; + + TNetworkAddress host_; + + /// protects fields below + /// lock ordering: Coordinator::lock_ must only be obtained *prior* to lock_ + boost::mutex lock_; + + // number of in-flight instances + int num_remaining_instances_; + + /// If the status indicates an error status, execution has either been aborted by the + /// executing impalad (which then reported the error) or cancellation has been + /// initiated; either way, execution must not be cancelled. + Status status_; + + /// Id of the first fragment instance that reports an error status. + /// Invalid if no fragment instance has reported an error status. + TUniqueId failed_instance_id_; + + /// Errors reported by this fragment instance. + ErrorLogMap error_log_; + + /// Time, in ms, that it took to execute the ExecRemoteFragment() RPC. + int64_t rpc_latency_; + + /// If true, ExecPlanFragment() rpc has been sent - even if it was not determined to be + /// successful. + bool rpc_sent_; + + /// peak memory used for this query (value of that node's query memtracker's + /// peak_consumption() + int64_t peak_consumption_; + + /// Fill in rpc_params based on state. Uses filter_routing_table to remove filters + /// that weren't selected during its construction. + void SetRpcParams(const DebugOptions& debug_options, + const FilterRoutingTable& filter_routing_table, + TExecQueryFInstancesParams* rpc_params); + + /// Return true if execution at this backend is done. Doesn't acquire lock. + bool IsDoneInternal() const; +}; + +/// Per fragment execution statistics. +class Coordinator::FragmentStats { + public: + /// typedef for boost utility to compute averaged stats + typedef boost::accumulators::accumulator_set<int64_t, + boost::accumulators::features< + boost::accumulators::tag::min, + boost::accumulators::tag::max, + boost::accumulators::tag::mean, + boost::accumulators::tag::variance> + > SummaryStats; + + /// Create avg and root profiles in obj_pool. + FragmentStats(const std::string& avg_profile_name, + const std::string& root_profile_name, + int num_instances, ObjectPool* obj_pool); + + RuntimeProfile* avg_profile() { return avg_profile_; } + RuntimeProfile* root_profile() { return root_profile_; } + SummaryStats* bytes_assigned() { return &bytes_assigned_; } + + /// Compute stats for 'bytes_assigned' and add as info string to avg_profile. + void AddSplitStats(); + + /// Add summary string with execution stats to avg profile. + void AddExecStats(); + + private: + friend class BackendState; + + /// Averaged profile for this fragment. Stored in obj_pool. + /// The counters in this profile are averages (type AveragedCounter) of the + /// counters in the fragment instance profiles. + /// Note that the individual fragment instance profiles themselves are stored and + /// displayed as children of the root_profile below. + RuntimeProfile* avg_profile_; + + /// root profile for all fragment instances for this fragment; resides in obj_pool + RuntimeProfile* root_profile_; + + /// Number of instances running this fragment. + int num_instances_; + + /// Bytes assigned for instances of this fragment + SummaryStats bytes_assigned_; + + /// Completion times for instances of this fragment + SummaryStats completion_times_; + + /// Execution rates for instances of this fragment + SummaryStats rates_; +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/coordinator-filter-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator-filter-state.h b/be/src/runtime/coordinator-filter-state.h new file mode 100644 index 0000000..61dece9 --- /dev/null +++ b/be/src/runtime/coordinator-filter-state.h @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include <memory> +#include <vector> +#include <boost/unordered_set.hpp> + +#include "runtime/coordinator.h" +#include "gen-cpp/ImpalaInternalService_types.h" +#include "gen-cpp/PlanNodes_types.h" +#include "gen-cpp/Types_types.h" + +namespace impala { + +class MemTracker; + +/// Represents a runtime filter target. +struct Coordinator::FilterTarget { + TPlanNodeId node_id; + bool is_local; + bool is_bound_by_partition_columns; + int fragment_idx; + + FilterTarget(const TRuntimeFilterTargetDesc& desc, int f_idx) + : node_id(desc.node_id), + is_local(desc.is_local_target), + is_bound_by_partition_columns(desc.is_bound_by_partition_columns), + fragment_idx(f_idx) {} +}; + +/// State of filters that are received for aggregation. +/// +/// A broadcast join filter is published as soon as the first update is received for it +/// and subsequent updates are ignored (as they will be the same). +/// Updates for a partitioned join filter are aggregated in 'bloom_filter' and this is +/// published once 'pending_count' reaches 0 and if the filter was not disabled before +/// that. +/// +/// A filter is disabled if an always_true filter update is received, an OOM is hit, +/// filter aggregation is complete or if the query is complete. +/// Once a filter is disabled, subsequent updates for that filter are ignored. +class Coordinator::FilterState { + public: + FilterState(const TRuntimeFilterDesc& desc, const TPlanNodeId& src) + : desc_(desc), src_(src), pending_count_(0), first_arrival_time_(0L), + completion_time_(0L), disabled_(false) { } + + TBloomFilter* bloom_filter() { return bloom_filter_.get(); } + boost::unordered_set<int>* src_fragment_instance_idxs() { + return &src_fragment_instance_idxs_; + } + const boost::unordered_set<int>& src_fragment_instance_idxs() const { + return src_fragment_instance_idxs_; + } + std::vector<FilterTarget>* targets() { return &targets_; } + const std::vector<FilterTarget>& targets() const { return targets_; } + int64_t first_arrival_time() const { return first_arrival_time_; } + int64_t completion_time() const { return completion_time_; } + const TPlanNodeId& src() const { return src_; } + const TRuntimeFilterDesc& desc() const { return desc_; } + int pending_count() const { return pending_count_; } + void set_pending_count(int pending_count) { pending_count_ = pending_count; } + bool disabled() const { return disabled_; } + + /// Aggregates partitioned join filters and updates memory consumption. + /// Disables filter if always_true filter is received or OOM is hit. + void ApplyUpdate(const TUpdateFilterParams& params, Coordinator* coord); + + /// Disables a filter. A disabled filter consumes no memory. + void Disable(MemTracker* tracker); + + private: + /// Contains the specification of the runtime filter. + TRuntimeFilterDesc desc_; + + TPlanNodeId src_; + std::vector<FilterTarget> targets_; + + // Indices of source fragment instances (as returned by GetInstanceIdx()). + boost::unordered_set<int> src_fragment_instance_idxs_; + + /// Number of remaining backends to hear from before filter is complete. + int pending_count_; + + /// BloomFilter aggregated from all source plan nodes, to be broadcast to all + /// destination plan fragment instances. Owned by this object so that it can be + /// deallocated once finished with. Only set for partitioned joins (broadcast joins + /// need no aggregation). + /// In order to avoid memory spikes, an incoming filter is moved (vs. copied) to the + /// output structure in the case of a broadcast join. Similarly, for partitioned joins, + /// the filter is moved from the following member to the output structure. + std::unique_ptr<TBloomFilter> bloom_filter_; + + /// Time at which first local filter arrived. + int64_t first_arrival_time_; + + /// Time at which all local filters arrived. + int64_t completion_time_; + + /// True if the filter is permanently disabled for this query. + bool disabled_; + + /// TODO: Add a per-object lock so that we can avoid holding the global filter_lock_ + /// for every filter update. + +}; + +}
