IMPALA-4014: Introduce query-wide execution state. This introduces a global structure to coordinate execution of fragment instances on a backend for a single query.
New classes: - QueryExecMgr: subsumes FragmentMgr - QueryState - FragmentInstanceState: replaces FragmentExecState Change-Id: I962ae6b7cb7dc0d07fbb8f70317aeb01d88d400b Reviewed-on: http://gerrit.cloudera.org:8080/4418 Reviewed-by: Marcel Kornacker <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/4b2d76db Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/4b2d76db Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/4b2d76db Branch: refs/heads/hadoop-next Commit: 4b2d76dbb523c3761a6f53983b635ce88bc67a0c Parents: 48792eb Author: Marcel Kornacker <[email protected]> Authored: Wed Oct 26 14:02:44 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Sun Dec 11 02:29:28 2016 +0000 ---------------------------------------------------------------------- be/src/benchmarks/expr-benchmark.cc | 4 +- be/src/exec/catalog-op-executor.cc | 2 + be/src/exec/data-source-scan-node.cc | 2 +- be/src/exec/exchange-node.cc | 1 + be/src/exec/hash-table-test.cc | 7 +- be/src/exec/hdfs-scan-node-base.cc | 4 +- be/src/exec/hdfs-scan-node.cc | 2 +- be/src/exec/union-node.cc | 2 +- be/src/exprs/expr-test.cc | 6 +- be/src/runtime/CMakeLists.txt | 3 + be/src/runtime/buffered-block-mgr-test.cc | 8 +- be/src/runtime/buffered-tuple-stream-test.cc | 3 +- be/src/runtime/coordinator.cc | 150 +++++++++-------- be/src/runtime/coordinator.h | 39 +++-- be/src/runtime/data-stream-test.cc | 16 +- be/src/runtime/exec-env.cc | 6 +- be/src/runtime/exec-env.h | 6 +- be/src/runtime/fragment-instance-state.cc | 165 +++++++++++++++++++ be/src/runtime/fragment-instance-state.h | 118 +++++++++++++ be/src/runtime/plan-fragment-executor.cc | 76 ++++----- be/src/runtime/plan-fragment-executor.h | 16 +- be/src/runtime/query-exec-mgr.cc | 165 +++++++++++++++++++ be/src/runtime/query-exec-mgr.h | 78 +++++++++ be/src/runtime/query-state.cc | 69 ++++++++ be/src/runtime/query-state.h | 117 +++++++++++++ be/src/runtime/runtime-filter-bank.cc | 6 +- be/src/runtime/runtime-state.cc | 99 +++++++---- be/src/runtime/runtime-state.h | 90 +++++----- be/src/runtime/test-env.cc | 51 +++--- be/src/runtime/test-env.h | 7 +- be/src/runtime/thread-resource-mgr.h | 2 + be/src/scheduling/query-schedule.cc | 2 +- be/src/scheduling/request-pool-service.cc | 2 +- be/src/scheduling/simple-scheduler.cc | 3 +- be/src/service/CMakeLists.txt | 3 +- be/src/service/fe-support.cc | 10 +- be/src/service/fragment-exec-state.cc | 145 ---------------- be/src/service/fragment-exec-state.h | 105 ------------ be/src/service/fragment-mgr.cc | 154 ----------------- be/src/service/fragment-mgr.h | 89 ---------- be/src/service/impala-beeswax-server.cc | 22 +-- be/src/service/impala-hs2-server.cc | 34 ++-- be/src/service/impala-http-handler.cc | 1 + be/src/service/impala-internal-service.cc | 103 ++++++++++++ be/src/service/impala-internal-service.h | 54 ++---- be/src/service/impala-server.cc | 9 +- be/src/service/impala-server.h | 6 +- be/src/service/query-exec-state.cc | 25 +-- be/src/service/query-exec-state.h | 6 +- be/src/testutil/desc-tbl-builder.h | 5 + be/src/udf/udf.cc | 4 +- be/src/util/container-util.h | 1 + be/src/util/thread.h | 5 + be/src/util/uid-util.h | 18 +- common/thrift/ImpalaInternalService.thrift | 3 +- .../apache/impala/analysis/AnalysisContext.java | 2 +- .../org/apache/impala/analysis/Analyzer.java | 2 +- .../impala/analysis/ColumnLineageGraph.java | 6 +- .../org/apache/impala/analysis/SelectStmt.java | 2 +- .../org/apache/impala/planner/HdfsScanNode.java | 2 +- .../impala/planner/SingleNodePlanner.java | 2 +- .../org/apache/impala/planner/UnionNode.java | 2 +- .../org/apache/impala/service/Frontend.java | 18 +- .../apache/impala/common/FrontendTestBase.java | 2 +- .../org/apache/impala/planner/PlannerTest.java | 6 +- .../apache/impala/planner/PlannerTestBase.java | 16 +- .../org/apache/impala/service/FrontendTest.java | 4 +- .../org/apache/impala/testutil/TestUtils.java | 4 +- 68 files changed, 1296 insertions(+), 901 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/benchmarks/expr-benchmark.cc ---------------------------------------------------------------------- diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc index 444df3a..cda183f 100644 --- a/be/src/benchmarks/expr-benchmark.cc +++ b/be/src/benchmarks/expr-benchmark.cc @@ -73,8 +73,8 @@ class Planner { Status GeneratePlan(const string& stmt, TExecRequest* result) { TQueryCtx query_ctx; - query_ctx.request.stmt = stmt; - query_ctx.request.query_options = query_options_; + query_ctx.client_request.stmt = stmt; + query_ctx.client_request.query_options = query_options_; query_ctx.__set_session(session_state_); ImpalaServer::PrepareQueryContext(&query_ctx); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exec/catalog-op-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc index 4d39e95..f3aed05 100644 --- a/be/src/exec/catalog-op-executor.cc +++ b/be/src/exec/catalog-op-executor.cc @@ -22,6 +22,8 @@ #include "exec/incr-stats-util.h" #include "common/status.h" #include "runtime/lib-cache.h" +#include "runtime/client-cache-types.h" +#include "runtime/exec-env.h" #include "service/impala-server.h" #include "service/hs2-util.h" #include "util/string-parser.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exec/data-source-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc index 0faae94..3198a73 100644 --- a/be/src/exec/data-source-scan-node.cc +++ b/be/src/exec/data-source-scan-node.cc @@ -111,7 +111,7 @@ Status DataSourceScanNode::Open(RuntimeState* state) { params.__set_query_id(state->query_id()); params.__set_table_name(tuple_desc_->table_desc()->name()); params.__set_init_string(data_src_node_.init_string); - params.__set_authenticated_user_name(state->effective_user()); + params.__set_authenticated_user_name(state->GetEffectiveUser()); params.__set_row_schema(row_schema); params.__set_batch_size(FLAGS_data_source_batch_size); params.__set_predicates(data_src_node_.accepted_predicates); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exec/exchange-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc index 0bbd42b..0f86339 100644 --- a/be/src/exec/exchange-node.cc +++ b/be/src/exec/exchange-node.cc @@ -23,6 +23,7 @@ #include "runtime/data-stream-recvr.h" #include "runtime/runtime-state.h" #include "runtime/row-batch.h" +#include "runtime/exec-env.h" #include "util/debug-util.h" #include "util/runtime-profile-counters.h" #include "util/time.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exec/hash-table-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc index a539bb1..9f428f1 100644 --- a/be/src/exec/hash-table-test.cc +++ b/be/src/exec/hash-table-test.cc @@ -185,8 +185,8 @@ class HashTableTest : public testing::Test { bool CreateHashTable(bool quadratic, int64_t initial_num_buckets, scoped_ptr<HashTable>* table, int block_size = 8 * 1024 * 1024, int max_num_blocks = 100, int reserved_blocks = 10) { - EXPECT_OK(test_env_->CreatePerQueryState( - next_query_id_++, max_num_blocks, block_size, &runtime_state_)); + EXPECT_OK(test_env_->CreateQueryState( + next_query_id_++, max_num_blocks, block_size, nullptr, &runtime_state_)); MemTracker* client_tracker = pool_.Add( new MemTracker(-1, "client", runtime_state_->instance_mem_tracker())); BufferedBlockMgr::Client* client; @@ -604,7 +604,8 @@ TEST_F(HashTableTest, QuadraticInsertFullTest) { // Test that hashing empty string updates hash value. TEST_F(HashTableTest, HashEmpty) { EXPECT_TRUE( - test_env_->CreatePerQueryState(0, 100, 8 * 1024 * 1024, &runtime_state_).ok()); + test_env_->CreateQueryState( + 0, 100, 8 * 1024 * 1024, nullptr, &runtime_state_).ok()); scoped_ptr<HashTableCtx> ht_ctx; Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_, probe_expr_ctxs_, false /* !stores_nulls_ */, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exec/hdfs-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index f7677eb..738ef36 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -235,7 +235,7 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) { // TODO: this should be a DCHECK but we sometimes hit it. It's likely IMPALA-1702. LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id() << " partition_id=" << split.partition_id - << "\n" << PrintThrift(state->fragment_params()); + << "\n" << PrintThrift(state->instance_ctx()); return Status("Query encountered invalid metadata, likely due to IMPALA-1702." " Try rerunning the query."); } @@ -367,7 +367,7 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) { HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id); DCHECK(partition_desc != NULL) << "table_id=" << hdfs_table_->id() << " partition_id=" << partition_id - << "\n" << PrintThrift(state->fragment_params()); + << "\n" << PrintThrift(state->instance_ctx()); partition_template_tuple_map_[partition_id] = InitTemplateTuple( partition_desc->partition_key_value_ctxs(), scan_node_pool_.get(), state); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exec/hdfs-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index 305fa28..03217e0 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -501,7 +501,7 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs, HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id); DCHECK(partition != NULL) << "table_id=" << hdfs_table_->id() << " partition_id=" << partition_id - << "\n" << PrintThrift(runtime_state_->fragment_params()); + << "\n" << PrintThrift(runtime_state_->instance_ctx()); // IMPALA-3798: Filtering before the scanner is created can cause hangs if a header // split is filtered out, for sequence-based file formats. If the scanner does not http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exec/union-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc index f4d3b69..5457737 100644 --- a/be/src/exec/union-node.cc +++ b/be/src/exec/union-node.cc @@ -172,7 +172,7 @@ Status UnionNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { } // Only evaluate the const expr lists by the first fragment instance. - if (state->fragment_ctx().per_fragment_instance_idx == 0) { + if (state->instance_ctx().per_fragment_instance_idx == 0) { // Evaluate and materialize the const expr lists exactly once. while (const_expr_list_idx_ < const_expr_lists_.size()) { MaterializeExprs( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exprs/expr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc index f439aa9..4185ea1 100644 --- a/be/src/exprs/expr-test.cc +++ b/be/src/exprs/expr-test.cc @@ -41,6 +41,7 @@ #include "gen-cpp/hive_metastore_types.h" #include "rpc/thrift-client.h" #include "rpc/thrift-server.h" +#include "runtime/runtime-state.h" #include "runtime/mem-tracker.h" #include "runtime/raw-value.inline.h" #include "runtime/string-value.h" @@ -52,6 +53,7 @@ #include "util/debug-util.h" #include "util/string-parser.h" #include "util/test-info.h" +#include "gen-cpp/ImpalaInternalService_types.h" #include "common/names.h" @@ -1025,7 +1027,7 @@ template <typename T> void TestSingleLiteralConstruction( const ColumnType& type, const T& value, const string& string_val) { ObjectPool pool; RowDescriptor desc; - RuntimeState state(TExecPlanFragmentParams(), NULL); + RuntimeState state{TQueryCtx()}; MemTracker tracker; Expr* expr = pool.Add(new Literal(type, value)); @@ -1041,7 +1043,7 @@ TEST_F(ExprTest, NullLiteral) { for (int type = TYPE_BOOLEAN; type != TYPE_DATE; ++type) { NullLiteral expr(static_cast<PrimitiveType>(type)); ExprContext ctx(&expr); - RuntimeState state(TExecPlanFragmentParams(), NULL); + RuntimeState state{TQueryCtx()}; MemTracker tracker; EXPECT_OK(ctx.Prepare(&state, RowDescriptor(), &tracker)); EXPECT_OK(ctx.Open(&state)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 6602f07..640ab39 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -38,6 +38,7 @@ add_library(Runtime disk-io-mgr-stress.cc exec-env.cc free-pool.cc + fragment-instance-state.cc hbase-table.cc hbase-table-factory.cc hdfs-fs-cache.cc @@ -47,6 +48,8 @@ add_library(Runtime multi-precision.cc parallel-executor.cc plan-fragment-executor.cc + query-exec-mgr.cc + query-state.cc test-env.cc types.cc raw-value.cc http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/buffered-block-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-block-mgr-test.cc b/be/src/runtime/buffered-block-mgr-test.cc index 5e99c09..1828ff8 100644 --- a/be/src/runtime/buffered-block-mgr-test.cc +++ b/be/src/runtime/buffered-block-mgr-test.cc @@ -142,8 +142,8 @@ class BufferedBlockMgrTest : public ::testing::Test { BufferedBlockMgr* CreateMgr(int64_t query_id, int max_buffers, int block_size, RuntimeState** query_state = NULL, TQueryOptions* query_options = NULL) { RuntimeState* state; - EXPECT_OK(test_env_->CreatePerQueryState( - query_id, max_buffers, block_size, &state, query_options)); + EXPECT_OK(test_env_->CreateQueryState( + query_id, max_buffers, block_size, query_options, &state)); if (query_state != NULL) *query_state = state; return state->block_mgr(); } @@ -558,8 +558,8 @@ class BufferedBlockMgrTest : public ::testing::Test { thread_group workers; // Create a shared RuntimeState with no BufferedBlockMgr. RuntimeState* shared_state = - new RuntimeState(TExecPlanFragmentParams(), test_env_->exec_env()); - shared_state->InitMemTrackers(TUniqueId(), NULL, -1); + new RuntimeState(TQueryCtx(), test_env_->exec_env()); + shared_state->InitMemTrackers(NULL, -1); for (int i = 0; i < num_threads; ++i) { thread* t = new thread( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/buffered-tuple-stream-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc index e98c334..1d36064 100644 --- a/be/src/runtime/buffered-tuple-stream-test.cc +++ b/be/src/runtime/buffered-tuple-stream-test.cc @@ -101,7 +101,8 @@ class SimpleTupleStreamTest : public testing::Test { /// Setup a block manager with the provided settings and client with no reservation, /// tracked by tracker_. void InitBlockMgr(int64_t limit, int block_size) { - ASSERT_OK(test_env_->CreatePerQueryState(0, limit, block_size, &runtime_state_)); + ASSERT_OK( + test_env_->CreateQueryState(0, limit, block_size, nullptr, &runtime_state_)); MemTracker* client_tracker = pool_.Add( new MemTracker(-1, "client", runtime_state_->instance_mem_tracker())); ASSERT_OK(runtime_state_->block_mgr()->RegisterClient( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 651d349..0cfb340 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -58,9 +58,11 @@ #include "runtime/parallel-executor.h" #include "runtime/plan-fragment-executor.h" #include "runtime/row-batch.h" +#include "runtime/query-exec-mgr.h" +#include "runtime/query-state.h" +#include "runtime/fragment-instance-state.h" #include "runtime/tuple-row.h" #include "scheduling/scheduler.h" -#include "service/fragment-exec-state.h" #include "util/bloom-filter.h" #include "util/container-util.h" #include "util/counting-barrier.h" @@ -123,9 +125,9 @@ struct DebugOptions { /// /// Concurrent accesses: /// - updates through UpdateFragmentExecStatus() -class Coordinator::FragmentInstanceState { +class Coordinator::InstanceState { public: - FragmentInstanceState(const FInstanceExecParams& params, ObjectPool* obj_pool) + InstanceState(const FInstanceExecParams& params, ObjectPool* obj_pool) : exec_params_(params), total_split_size_(0), profile_(nullptr), @@ -333,7 +335,7 @@ class Coordinator::FilterState { }; -void Coordinator::FragmentInstanceState::ComputeTotalSplitSize( +void Coordinator::InstanceState::ComputeTotalSplitSize( const PerNodeScanRanges& per_node_scan_ranges) { total_split_size_ = 0; @@ -345,7 +347,7 @@ void Coordinator::FragmentInstanceState::ComputeTotalSplitSize( } } -int64_t Coordinator::FragmentInstanceState::UpdateNumScanRangesCompleted() { +int64_t Coordinator::InstanceState::UpdateNumScanRangesCompleted() { int64_t total = 0; CounterMap& complete = aggregate_counters_.scan_ranges_complete_counters; for (CounterMap::iterator i = complete.begin(); i != complete.end(); ++i) { @@ -382,6 +384,10 @@ Coordinator::~Coordinator() { query_mem_tracker_.reset(); } +PlanFragmentExecutor* Coordinator::executor() { + return coord_instance_->executor(); +} + TExecNodePhase::type GetExecNodePhase(const string& key) { map<int, const char*>::const_iterator entry = _TExecNodePhase_VALUES_TO_NAMES.begin(); @@ -439,7 +445,7 @@ Status Coordinator::Exec() { if (needs_finalization_) finalize_params_ = request.finalize_params; VLOG_QUERY << "Exec() query_id=" << schedule_.query_id() - << " stmt=" << request.query_ctx.request.stmt; + << " stmt=" << request.query_ctx.client_request.stmt; stmt_type_ = request.stmt_type; query_id_ = schedule_.query_id(); desc_tbl_ = request.desc_tbl; @@ -457,7 +463,7 @@ Status Coordinator::Exec() { progress_.Init(str, schedule_.num_scan_ranges()); // runtime filters not yet supported for mt execution - bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0; + bool is_mt_execution = request.query_ctx.client_request.query_options.mt_dop > 0; if (is_mt_execution) filter_mode_ = TRuntimeFilterMode::OFF; // to keep things simple, make async Cancel() calls wait until plan fragment @@ -468,9 +474,9 @@ Status Coordinator::Exec() { // The coordinator may require a query mem tracker for result-caching, which tracks // memory via the query mem tracker. int64_t query_limit = -1; - if (query_ctx_.request.query_options.__isset.mem_limit - && query_ctx_.request.query_options.mem_limit > 0) { - query_limit = query_ctx_.request.query_options.mem_limit; + if (query_ctx_.client_request.query_options.__isset.mem_limit + && query_ctx_.client_request.query_options.mem_limit > 0) { + query_limit = query_ctx_.client_request.query_options.mem_limit; } MemTracker* pool_tracker = MemTracker::GetRequestPoolMemTracker( schedule_.request_pool(), exec_env_->process_mem_tracker()); @@ -484,38 +490,42 @@ Status Coordinator::Exec() { InitExecSummary(); StartFInstances(); - // In the error case, it's safe to return and not to get root_sink_ here to close - if + // In the error case, it's safe to return and not to get coord_sink_ here to close - if // there was an error, but the coordinator fragment was successfully started, it should // cancel itself when it receives an error status after reporting its profile. RETURN_IF_ERROR(FinishInstanceStartup()); // Grab executor and wait until Prepare() has finished so that runtime state etc. will - // be set up. Must do this here in order to get a reference to root_fragment_instance_ - // so that root_sink_ remains valid throughout query lifetime. + // be set up. Must do this here in order to get a reference to coord_instance_ + // so that coord_sink_ remains valid throughout query lifetime. if (schedule_.GetCoordFragment() != nullptr) { - // Coordinator fragment instance has same ID as query. - root_fragment_instance_ = - ExecEnv::GetInstance()->fragment_mgr()->GetFragmentExecState(query_id_); - // Fragment instance might have been failed and unregistered itself even though it was - // successfully started (e.g. Prepare() might have failed). - if (root_fragment_instance_.get() == nullptr) { - FragmentInstanceState* root_state = fragment_instance_states_[0]; - DCHECK(root_state != nullptr); - lock_guard<mutex> instance_state_lock(*root_state->lock()); + QueryState* qs = ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id_); + if (qs != nullptr) coord_instance_ = qs->GetFInstanceState(query_id_); + if (coord_instance_ == nullptr) { + // Coordinator instance might have failed and unregistered itself even + // though it was successfully started (e.g. Prepare() might have failed). + if (qs != nullptr) { + ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(qs); + qs = nullptr; + } + InstanceState* coord_state = fragment_instance_states_[0]; + DCHECK(coord_state != nullptr); + lock_guard<mutex> instance_state_lock(*coord_state->lock()); // Try and return the fragment instance status if it was already set. - // TODO: Consider waiting for root_state->done() here. - RETURN_IF_ERROR(*root_state->status()); - return Status(Substitute("Root fragment instance ($0) failed", PrintId(query_id_))); + // TODO: Consider waiting for coord_state->done() here. + RETURN_IF_ERROR(*coord_state->status()); + return Status( + Substitute("Coordinator fragment instance ($0) failed", PrintId(query_id_))); } - executor_ = root_fragment_instance_->executor(); + // When WaitForPrepare() returns OK(), the executor's root sink will be set up. At // that point, the coordinator must be sure to call root_sink()->CloseConsumer(); the // fragment instance's executor will not complete until that point. // TODO: Consider moving this to Wait(). - Status prepare_status = executor_->WaitForPrepare(); - root_sink_ = executor_->root_sink(); + Status prepare_status = executor()->WaitForPrepare(); + coord_sink_ = executor()->root_sink(); RETURN_IF_ERROR(prepare_status); - DCHECK(root_sink_ != nullptr); + DCHECK(coord_sink_ != nullptr); } PrintFragmentInstanceInfo(); @@ -523,7 +533,7 @@ Status Coordinator::Exec() { } void Coordinator::UpdateFilterRoutingTable(const FragmentExecParams& fragment_params) { - DCHECK(schedule_.request().query_ctx.request.query_options.mt_dop == 0); + DCHECK(schedule_.request().query_ctx.client_request.query_options.mt_dop == 0); int num_hosts = fragment_params.instance_exec_params.size(); DCHECK_GT(num_hosts, 0); DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF) @@ -614,8 +624,8 @@ void Coordinator::StartFInstances() { num_instances += fragment_params.instance_exec_params.size(); for (const FInstanceExecParams& instance_params: fragment_params.instance_exec_params) { - FragmentInstanceState* exec_state = obj_pool()->Add( - new FragmentInstanceState(instance_params, obj_pool())); + InstanceState* exec_state = obj_pool()->Add( + new InstanceState(instance_params, obj_pool())); int instance_state_idx = GetInstanceIdx(instance_params.instance_id); fragment_instance_states_[instance_state_idx] = exec_state; @@ -638,7 +648,7 @@ Status Coordinator::FinishInstanceStartup() { const TMetricDef& def = MakeTMetricDef("fragment-latencies", TMetricKind::HISTOGRAM, TUnit::TIME_MS); HistogramMetric latencies(def, 20000, 3); - for (FragmentInstanceState* exec_state: fragment_instance_states_) { + for (InstanceState* exec_state: fragment_instance_states_) { lock_guard<mutex> l(*exec_state->lock()); // Preserve the first non-OK status, if there is one if (status.ok()) status = *exec_state->status(); @@ -859,7 +869,7 @@ Status Coordinator::FinalizeSuccessfulInsert() { HdfsPartitionDescriptor* part = hdfs_table->GetPartition(partition.second.id); DCHECK(part != NULL) << "table_id=" << hdfs_table->id() << " partition_id=" << partition.second.id - << "\n" << PrintThrift(runtime_state()->fragment_params()); + << "\n" << PrintThrift(runtime_state()->instance_ctx()); part_path_ss << part->location(); } const string& part_path = part_path_ss.str(); @@ -924,7 +934,8 @@ Status Coordinator::FinalizeSuccessfulInsert() { partition_create_ops.Add(CREATE_DIR, part_path); } } - } else if (!is_s3_path || !query_ctx_.request.query_options.s3_skip_insert_staging) { + } else if (!is_s3_path + || !query_ctx_.client_request.query_options.s3_skip_insert_staging) { // If the S3_SKIP_INSERT_STAGING query option is set, then the partition directories // would have already been created by the table sinks. if (FLAGS_insert_inherit_permissions && !is_s3_path) { @@ -1071,8 +1082,8 @@ Status Coordinator::Wait() { has_called_wait_ = true; if (stmt_type_ == TStmtType::QUERY) { - DCHECK(executor_ != nullptr); - return UpdateStatus(executor_->WaitForOpen(), runtime_state()->fragment_instance_id(), + DCHECK(executor() != nullptr); + return UpdateStatus(executor()->WaitForOpen(), runtime_state()->fragment_instance_id(), FLAGS_hostname); } @@ -1106,15 +1117,15 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) { if (returned_all_results_) { // May be called after the first time we set *eos. Re-set *eos and return here; - // already torn-down root_sink_ so no more work to do. + // already torn-down coord_sink_ so no more work to do. *eos = true; return Status::OK(); } - DCHECK(root_sink_ != nullptr) + DCHECK(coord_sink_ != nullptr) << "GetNext() called without result sink. Perhaps Prepare() failed and was not " << "checked?"; - Status status = root_sink_->GetNext(runtime_state(), results, max_rows, eos); + Status status = coord_sink_->GetNext(runtime_state(), results, max_rows, eos); // if there was an error, we need to return the query's error status rather than // the status we just got back from the local executor (which may well be CANCELLED @@ -1126,8 +1137,8 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) { returned_all_results_ = true; // Trigger tear-down of coordinator fragment by closing the consumer. Must do before // WaitForAllInstances(). - root_sink_->CloseConsumer(); - root_sink_ = nullptr; + coord_sink_->CloseConsumer(); + coord_sink_ = nullptr; // Don't return final NULL until all instances have completed. GetNext must wait for // all instances to complete before ultimately signalling the end of execution via a @@ -1148,12 +1159,12 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) { } void Coordinator::PrintFragmentInstanceInfo() { - for (FragmentInstanceState* state: fragment_instance_states_) { + for (InstanceState* state: fragment_instance_states_) { SummaryStats& acc = fragment_profiles_[state->fragment_idx()].bytes_assigned; acc(state->total_split_size()); } - for (int id = (executor_ == NULL ? 0 : 1); id < fragment_profiles_.size(); ++id) { + for (int id = (executor() == NULL ? 0 : 1); id < fragment_profiles_.size(); ++id) { SummaryStats& acc = fragment_profiles_[id].bytes_assigned; double min = accumulators::min(acc); double max = accumulators::max(acc); @@ -1168,7 +1179,7 @@ void Coordinator::PrintFragmentInstanceInfo() { if (VLOG_FILE_IS_ON) { VLOG_FILE << "Byte split for fragment " << id << " " << ss.str(); - for (FragmentInstanceState* exec_state: fragment_instance_states_) { + for (InstanceState* exec_state: fragment_instance_states_) { if (exec_state->fragment_idx() != id) continue; VLOG_FILE << "data volume for ipaddress " << exec_state << ": " << PrettyPrinter::Print(exec_state->total_split_size(), TUnit::BYTES); @@ -1284,7 +1295,7 @@ void Coordinator::ExecRemoteFInstance( rpc_params.fragment_instance_ctx.__set_debug_phase(debug_options->phase); } int instance_state_idx = GetInstanceIdx(exec_params.instance_id); - FragmentInstanceState* exec_state = fragment_instance_states_[instance_state_idx]; + InstanceState* exec_state = fragment_instance_states_[instance_state_idx]; exec_state->ComputeTotalSplitSize( rpc_params.fragment_instance_ctx.per_node_scan_ranges); VLOG_FILE << "making rpc: ExecPlanFragment" @@ -1357,7 +1368,7 @@ void Coordinator::CancelInternal() { void Coordinator::CancelFragmentInstances() { int num_cancelled = 0; - for (FragmentInstanceState* exec_state: fragment_instance_states_) { + for (InstanceState* exec_state: fragment_instance_states_) { DCHECK(exec_state != nullptr); // lock each exec_state individually to synchronize correctly with @@ -1433,7 +1444,7 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para Substitute("Unknown fragment instance index $0 (max known: $1)", instance_state_idx, fragment_instance_states_.size() - 1)); } - FragmentInstanceState* exec_state = fragment_instance_states_[instance_state_idx]; + InstanceState* exec_state = fragment_instance_states_[instance_state_idx]; const TRuntimeProfileTree& cumulative_profile = params.profile; Status status(params.status); @@ -1507,9 +1518,8 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para if (VLOG_FILE_IS_ON) { stringstream s; exec_state->profile()->PrettyPrint(&s); - VLOG_FILE << "profile for query_id=" << query_id_ - << " instance_id=" << exec_state->fragment_instance_id() - << "\n" << s.str(); + VLOG_FILE << "profile for instance_id=" << exec_state->fragment_instance_id() + << "\n" << s.str(); } // also print the cumulative profile // TODO: fix the coordinator/PlanFragmentExecutor, so this isn't needed @@ -1533,14 +1543,14 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para lock_guard<mutex> l(lock_); exec_state->stopwatch()->Stop(); DCHECK_GT(num_remaining_fragment_instances_, 0); - VLOG_QUERY << "Fragment instance completed: " + VLOG_QUERY << "Fragment instance completed:" << " id=" << PrintId(exec_state->fragment_instance_id()) << " host=" << exec_state->impalad_address() << " remaining=" << num_remaining_fragment_instances_ - 1; if (VLOG_QUERY_IS_ON && num_remaining_fragment_instances_ > 1) { // print host/port info for the first backend that's still in progress as a // debugging aid for backend deadlocks - for (FragmentInstanceState* exec_state: fragment_instance_states_) { + for (InstanceState* exec_state: fragment_instance_states_) { lock_guard<mutex> l2(*exec_state->lock()); if (!exec_state->done()) { VLOG_QUERY << "query_id=" << query_id_ << ": first in-progress backend: " @@ -1567,7 +1577,7 @@ uint64_t Coordinator::GetLatestKuduInsertTimestamp() const { } RuntimeState* Coordinator::runtime_state() { - return executor_ == NULL ? NULL : executor_->runtime_state(); + return executor() == NULL ? NULL : executor()->runtime_state(); } MemTracker* Coordinator::query_mem_tracker() { @@ -1595,7 +1605,7 @@ typedef struct { } } InstanceComparator; -void Coordinator::UpdateAverageProfile(FragmentInstanceState* instance_state) { +void Coordinator::UpdateAverageProfile(InstanceState* instance_state) { FragmentIdx fragment_idx = instance_state->fragment_idx(); DCHECK_GE(fragment_idx, 0); DCHECK_LT(fragment_idx, fragment_profiles_.size()); @@ -1608,7 +1618,7 @@ void Coordinator::UpdateAverageProfile(FragmentInstanceState* instance_state) { data->root_profile->AddChild(instance_state->profile()); } -void Coordinator::ComputeFragmentSummaryStats(FragmentInstanceState* instance_state) { +void Coordinator::ComputeFragmentSummaryStats(InstanceState* instance_state) { FragmentIdx fragment_idx = instance_state->fragment_idx(); DCHECK_GE(fragment_idx, 0); DCHECK_LT(fragment_idx, fragment_profiles_.size()); @@ -1625,7 +1635,7 @@ void Coordinator::ComputeFragmentSummaryStats(FragmentInstanceState* instance_st data->root_profile->AddChild(instance_state->profile()); } -void Coordinator::UpdateExecSummary(const FragmentInstanceState& instance_state) { +void Coordinator::UpdateExecSummary(const InstanceState& instance_state) { vector<RuntimeProfile*> children; instance_state.profile()->GetAllChildren(&children); @@ -1667,12 +1677,12 @@ void Coordinator::ReportQuerySummary() { if (!fragment_instance_states_.empty()) { // Average all fragment instances for each fragment. - for (FragmentInstanceState* state: fragment_instance_states_) { + for (InstanceState* state: fragment_instance_states_) { state->profile()->ComputeTimeInProfile(); UpdateAverageProfile(state); // Skip coordinator fragment, if one exists. // TODO: Can we remove the special casing here? - if (executor_ == nullptr || state->fragment_idx() != 0) { + if (coord_instance_ == nullptr || state->fragment_idx() != 0) { ComputeFragmentSummaryStats(state); } UpdateExecSummary(*state); @@ -1680,7 +1690,7 @@ void Coordinator::ReportQuerySummary() { InstanceComparator comparator; // Per fragment instances have been collected, output summaries - for (int i = (executor_ != NULL ? 1 : 0); i < fragment_profiles_.size(); ++i) { + for (int i = (executor() != NULL ? 1 : 0); i < fragment_profiles_.size(); ++i) { fragment_profiles_[i].root_profile->SortChildren(comparator); SummaryStats& completion_times = fragment_profiles_[i].completion_times; SummaryStats& rates = fragment_profiles_[i].rates; @@ -1719,7 +1729,7 @@ void Coordinator::ReportQuerySummary() { // Map from Impalad address to peak memory usage of this query typedef unordered_map<TNetworkAddress, int64_t> PerNodePeakMemoryUsage; PerNodePeakMemoryUsage per_node_peak_mem_usage; - for (FragmentInstanceState* state: fragment_instance_states_) { + for (InstanceState* state: fragment_instance_states_) { int64_t initial_usage = 0; int64_t* mem_usage = FindOrInsert(&per_node_peak_mem_usage, state->impalad_address(), initial_usage); @@ -1740,7 +1750,7 @@ void Coordinator::ReportQuerySummary() { string Coordinator::GetErrorLog() { ErrorLogMap merged; - for (FragmentInstanceState* state: fragment_instance_states_) { + for (InstanceState* state: fragment_instance_states_) { lock_guard<mutex> l(*state->lock()); if (state->error_log()->size() > 0) MergeErrorMaps(&merged, *state->error_log()); } @@ -1760,7 +1770,7 @@ void Coordinator::SetExecPlanFragmentParams( // Remove filters that weren't selected during filter routing table construction. if (filter_mode_ != TRuntimeFilterMode::OFF) { - DCHECK(schedule_.request().query_ctx.request.query_options.mt_dop == 0); + DCHECK(schedule_.request().query_ctx.client_request.query_options.mt_dop == 0); int instance_idx = GetInstanceIdx(params.instance_id); for (TPlanNode& plan_node: rpc_params->fragment_ctx.fragment.plan.nodes) { if (plan_node.__isset.runtime_filters) { @@ -1893,6 +1903,8 @@ void DistributeFilters(shared_ptr<TPublishFilterParams> params, } +// TODO: call this as soon as it's clear that we won't reference the state +// anymore, ie, in CancelInternal() and when GetNext() hits eos void Coordinator::TearDown() { DCHECK(!torn_down_) << "Coordinator::TearDown() may not be called twice"; torn_down_ = true; @@ -1909,7 +1921,15 @@ void Coordinator::TearDown() { } // Need to protect against failed Prepare(), where root_sink() would not be set. - if (root_sink_ != nullptr) root_sink_->CloseConsumer(); + if (coord_sink_ != nullptr) { + coord_sink_->CloseConsumer(); + coord_sink_ = nullptr; + } + if (coord_instance_ != nullptr) { + ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState( + coord_instance_->query_state()); + coord_instance_ = nullptr; + } } void Coordinator::UpdateFilter(const TUpdateFilterParams& params) { @@ -1987,7 +2007,7 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) { rpc_params->filter_id = params.filter_id; for (int target_idx: target_fragment_instance_state_idxs) { - FragmentInstanceState* fragment_inst = fragment_instance_states_[target_idx]; + InstanceState* fragment_inst = fragment_instance_states_[target_idx]; DCHECK(fragment_inst != NULL) << "Missing fragment instance: " << target_idx; exec_env_->rpc_pool()->Offer(bind<void>(DistributeFilters, rpc_params, fragment_inst->impalad_address(), fragment_inst->fragment_instance_id())); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/coordinator.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h index d53f16c..e1334db 100644 --- a/be/src/runtime/coordinator.h +++ b/be/src/runtime/coordinator.h @@ -38,13 +38,11 @@ #include "common/status.h" #include "gen-cpp/Frontend_types.h" #include "gen-cpp/Types_types.h" -#include "runtime/runtime-state.h" -#include "scheduling/simple-scheduler.h" -#include "service/fragment-exec-state.h" -#include "service/fragment-mgr.h" #include "util/histogram-metric.h" #include "util/progress-updater.h" #include "util/runtime-profile.h" +#include "scheduling/query-schedule.h" +#include "runtime/runtime-state.h" // for PartitionStatusMap; TODO: disentangle namespace impala { @@ -69,6 +67,9 @@ class RuntimeProfile; class TablePrinter; class TPlanFragment; class QueryResultSet; +class MemTracker; +class PlanRootSink; +class FragmentInstanceState; struct DebugOptions; @@ -212,7 +213,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save void TearDown(); private: - class FragmentInstanceState; + class InstanceState; struct FilterTarget; class FilterState; @@ -249,10 +250,10 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save CounterMap scan_ranges_complete_counters; }; - /// FragmentInstanceStates for all fragment instances, including that of the coordinator + /// InstanceStates for all fragment instances, including that of the coordinator /// fragment. All elements are non-nullptr. Owned by obj_pool(). Filled in /// StartFInstances(). - std::vector<FragmentInstanceState*> fragment_instance_states_; + std::vector<InstanceState*> fragment_instance_states_; /// True if the query needs a post-execution step to tidy up bool needs_finalization_; @@ -273,7 +274,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// time. /// Lock ordering is /// 1. lock_ - /// 2. FragmentInstanceState::lock_ + /// 2. InstanceState::lock_ boost::mutex lock_; /// Overall status of the entire query; set to the first reported fragment error @@ -294,13 +295,13 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// Result rows are materialized by this fragment instance in its own thread. They are /// materialized into a QueryResultSet provided to the coordinator during GetNext(). /// - /// Created during fragment instance start-up by FragmentExecState and set here in - /// Exec(). Keep a shared_ptr reference to the fragment state so that root_sink_ will - /// be valid for the lifetime of the coordinator. This is important if, for example, the - /// fragment instance is cancelled while the coordinator is calling GetNext(). - std::shared_ptr<FragmentMgr::FragmentExecState> root_fragment_instance_; - PlanFragmentExecutor* executor_ = nullptr; - PlanRootSink* root_sink_ = nullptr; + /// Not owned by this class. Set in Exec(). Reset to nullptr (and the implied + /// reference of QueryState released) in TearDown(). + FragmentInstanceState* coord_instance_ = nullptr; + + /// Not owned by this class. Set in Exec(). Reset to nullptr in TearDown() or when + /// GetNext() hits eos. + PlanRootSink* coord_sink_ = nullptr; /// Query mem tracker for this coordinator initialized in Exec(). Only valid if there /// is no coordinator fragment (i.e. executor_ == NULL). If executor_ is not NULL, @@ -314,6 +315,8 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// Returns a local object pool. ObjectPool* obj_pool() { return obj_pool_.get(); } + PlanFragmentExecutor* executor(); + // Sets the TDescriptorTable(s) for the current fragment. void SetExecPlanDescriptorTable(const TPlanFragment& fragment, TExecPlanFragmentParams* rpc_params); @@ -501,11 +504,11 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save void InitExecSummary(); /// Update fragment profile information from a fragment instance state. - void UpdateAverageProfile(FragmentInstanceState* fragment_instance_state); + void UpdateAverageProfile(InstanceState* instance_state); /// Compute the summary stats (completion_time and rates) /// for an individual fragment_profile_ based on the specified instance state. - void ComputeFragmentSummaryStats(FragmentInstanceState* fragment_instance_state); + void ComputeFragmentSummaryStats(InstanceState* instance_state); /// Outputs aggregate query profile summary. This is assumed to be called at the end of /// a query -- remote fragments' profiles must not be updated while this is running. @@ -513,7 +516,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// Populates the summary execution stats from the profile. Can only be called when the /// query is done. - void UpdateExecSummary(const FragmentInstanceState& instance_state); + void UpdateExecSummary(const InstanceState& instance_state); /// Determines what the permissions of directories created by INSERT statements should /// be if permission inheritance is enabled. Populates a map from all prefixes of http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/data-stream-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc index d53a146..c76afb5 100644 --- a/be/src/runtime/data-stream-test.cc +++ b/be/src/runtime/data-stream-test.cc @@ -28,6 +28,7 @@ #include "runtime/row-batch.h" #include "runtime/runtime-state.h" #include "runtime/data-stream-mgr.h" +#include "runtime/exec-env.h" #include "runtime/data-stream-sender.h" #include "runtime/data-stream-recvr.h" #include "runtime/descriptors.h" @@ -112,10 +113,12 @@ class ImpalaTestBackend : public ImpalaInternalServiceIf { class DataStreamTest : public testing::Test { protected: - DataStreamTest() : runtime_state_(TExecPlanFragmentParams(), &exec_env_), next_val_(0) { + DataStreamTest() + : runtime_state_(TQueryCtx(), &exec_env_), + next_val_(0) { // Initialize Mem trackers for use by the data stream receiver. exec_env_.InitForFeTests(); - runtime_state_.InitMemTrackers(TUniqueId(), NULL, -1); + runtime_state_.InitMemTrackers(NULL, -1); // Stop tests that rely on mismatched sender / receiver pairs timing out from failing. FLAGS_datastream_sender_timeout_ms = 250; @@ -480,9 +483,9 @@ class DataStreamTest : public testing::Test { void Sender(int sender_num, int channel_buffer_size, TPartitionType::type partition_type) { - RuntimeState state(TExecPlanFragmentParams(), &exec_env_); + RuntimeState state(TQueryCtx(), &exec_env_); state.set_desc_tbl(desc_tbl_); - state.InitMemTrackers(TUniqueId(), NULL, -1); + state.InitMemTrackers(NULL, -1); VLOG_QUERY << "create sender " << sender_num; const TDataStreamSink& sink = GetSink(partition_type); DataStreamSender sender( @@ -593,9 +596,8 @@ TEST_F(DataStreamTest, BasicTest) { // // TODO: Make lifecycle requirements more explicit. TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) { - scoped_ptr<RuntimeState> runtime_state( - new RuntimeState(TExecPlanFragmentParams(), &exec_env_)); - runtime_state->InitMemTrackers(TUniqueId(), NULL, -1); + scoped_ptr<RuntimeState> runtime_state(new RuntimeState(TQueryCtx(), &exec_env_)); + runtime_state->InitMemTrackers(NULL, -1); scoped_ptr<RuntimeProfile> profile(new RuntimeProfile(&obj_pool_, "TestReceiver")); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/exec-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index db0d242..ad996e3 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -36,10 +36,10 @@ #include "runtime/lib-cache.h" #include "runtime/mem-tracker.h" #include "runtime/thread-resource-mgr.h" +#include "runtime/query-exec-mgr.h" #include "runtime/tmp-file-mgr.h" #include "scheduling/request-pool-service.h" #include "scheduling/simple-scheduler.h" -#include "service/fragment-mgr.h" #include "service/frontend.h" #include "statestore/statestore-subscriber.h" #include "util/debug-util.h" @@ -146,7 +146,7 @@ ExecEnv::ExecEnv() fragment_exec_thread_pool_(new CallableThreadPool("coordinator-fragment-rpc", "worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())), async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)), - fragment_mgr_(new FragmentMgr()), + query_exec_mgr_(new QueryExecMgr()), enable_webserver_(FLAGS_enable_webserver), is_fe_tests_(false), backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) { @@ -199,7 +199,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port, fragment_exec_thread_pool_(new CallableThreadPool("coordinator-fragment-rpc", "worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())), async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)), - fragment_mgr_(new FragmentMgr()), + query_exec_mgr_(new QueryExecMgr()), enable_webserver_(FLAGS_enable_webserver && webserver_port > 0), is_fe_tests_(false), backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/exec-env.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h index d4cdf24..08ddd9f 100644 --- a/be/src/runtime/exec-env.h +++ b/be/src/runtime/exec-env.h @@ -31,7 +31,7 @@ namespace impala { class CallableThreadPool; class DataStreamMgr; class DiskIoMgr; -class FragmentMgr; +class QueryExecMgr; class Frontend; class HBaseTableFactory; class HdfsFsCache; @@ -96,7 +96,7 @@ class ExecEnv { Frontend* frontend() { return frontend_.get(); } RequestPoolService* request_pool_service() { return request_pool_service_.get(); } CallableThreadPool* rpc_pool() { return async_rpc_pool_.get(); } - FragmentMgr* fragment_mgr() { return fragment_mgr_.get(); } + QueryExecMgr* query_exec_mgr() { return query_exec_mgr_.get(); } void set_enable_webserver(bool enable) { enable_webserver_ = enable; } @@ -138,7 +138,7 @@ class ExecEnv { boost::scoped_ptr<Frontend> frontend_; boost::scoped_ptr<CallableThreadPool> fragment_exec_thread_pool_; boost::scoped_ptr<CallableThreadPool> async_rpc_pool_; - boost::scoped_ptr<FragmentMgr> fragment_mgr_; + boost::scoped_ptr<QueryExecMgr> query_exec_mgr_; /// Not owned by this class ImpalaServer* impala_server_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/fragment-instance-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc new file mode 100644 index 0000000..5dcd4c6 --- /dev/null +++ b/be/src/runtime/fragment-instance-state.cc @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include "runtime/fragment-instance-state.h" + +#include <boost/thread/locks.hpp> +#include <boost/thread/lock_guard.hpp> + +#include "runtime/exec-env.h" +#include "runtime/backend-client.h" +#include "runtime/runtime-filter-bank.h" +#include "runtime/client-cache.h" +#include "runtime/runtime-state.h" +#include "runtime/query-state.h" +#include "gen-cpp/ImpalaInternalService_types.h" + +using namespace impala; + +FragmentInstanceState::FragmentInstanceState( + QueryState* query_state, const TPlanFragmentCtx& fragment_ctx, + const TPlanFragmentInstanceCtx& instance_ctx, const TDescriptorTable& desc_tbl) + : query_state_(query_state), + fragment_ctx_(fragment_ctx), + instance_ctx_(instance_ctx), + desc_tbl_(desc_tbl), + executor_( + [this](const Status& status, RuntimeProfile* profile, bool done) { + ReportStatusCb(status, profile, done); + }) { +} + +Status FragmentInstanceState::UpdateStatus(const Status& status) { + lock_guard<mutex> l(status_lock_); + if (!status.ok() && exec_status_.ok()) exec_status_ = status; + return exec_status_; +} + +Status FragmentInstanceState::Cancel() { + lock_guard<mutex> l(status_lock_); + RETURN_IF_ERROR(exec_status_); + executor_.Cancel(); + return Status::OK(); +} + +void FragmentInstanceState::Exec() { + Status status = + executor_.Prepare(query_state_, desc_tbl_, fragment_ctx_, instance_ctx_); + prepare_promise_.Set(status); + if (status.ok()) { + if (executor_.Open().ok()) { + executor_.Exec(); + } + } + executor_.Close(); +} + +void FragmentInstanceState::ReportStatusCb( + const Status& status, RuntimeProfile* profile, bool done) { + DCHECK(status.ok() || done); // if !status.ok() => done + Status exec_status = UpdateStatus(status); + + Status coord_status; + ImpalaBackendConnection coord( + ExecEnv::GetInstance()->impalad_client_cache(), coord_address(), &coord_status); + if (!coord_status.ok()) { + stringstream s; + s << "Couldn't get a client for " << coord_address() <<"\tReason: " + << coord_status.GetDetail(); + UpdateStatus(Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, s.str()))); + return; + } + + TReportExecStatusParams params; + params.protocol_version = ImpalaInternalServiceVersion::V1; + params.__set_query_id(query_state_->query_ctx().query_id); + params.__set_fragment_instance_id(instance_ctx_.fragment_instance_id); + exec_status.SetTStatus(¶ms); + params.__set_done(done); + + if (profile != NULL) { + profile->ToThrift(¶ms.profile); + params.__isset.profile = true; + } + + RuntimeState* runtime_state = executor_.runtime_state(); + // If executor_ did not successfully prepare, runtime state may not have been set. + if (runtime_state != NULL) { + // Only send updates to insert status if fragment is finished, the coordinator + // waits until query execution is done to use them anyhow. + if (done) { + TInsertExecStatus insert_status; + + if (runtime_state->hdfs_files_to_move()->size() > 0) { + insert_status.__set_files_to_move(*runtime_state->hdfs_files_to_move()); + } + if (runtime_state->per_partition_status()->size() > 0) { + insert_status.__set_per_partition_status(*runtime_state->per_partition_status()); + } + + params.__set_insert_exec_status(insert_status); + } + + // Send new errors to coordinator + runtime_state->GetUnreportedErrors(&(params.error_log)); + } + params.__isset.error_log = (params.error_log.size() > 0); + + TReportExecStatusResult res; + Status rpc_status; + bool retry_is_safe; + // Try to send the RPC 3 times before failing. + for (int i = 0; i < 3; ++i) { + rpc_status = coord.DoRpc( + &ImpalaBackendClient::ReportExecStatus, params, &res, &retry_is_safe); + if (rpc_status.ok()) { + rpc_status = Status(res.status); + break; + } + if (!retry_is_safe) break; + if (i < 2) SleepForMs(100); + } + if (!rpc_status.ok()) { + UpdateStatus(rpc_status); + executor_.Cancel(); + } +} + +void FragmentInstanceState::PublishFilter( + int32_t filter_id, const TBloomFilter& thrift_bloom_filter) { + VLOG_FILE << "PublishFilter(): instance_id=" << PrintId(instance_id()) + << " filter_id=" << filter_id; + // Defensively protect against blocking forever in case there's some problem with + // Prepare(). + static const int WAIT_MS = 30000; + bool timed_out = false; + // Wait until Prepare() is done, so we know that the filter bank is set up. + // TODO: get rid of concurrency in the setup phase as part of the per-query exec rpc + Status prepare_status = prepare_promise_.Get(WAIT_MS, &timed_out); + if (timed_out) { + LOG(ERROR) << "Unexpected timeout in PublishFilter()"; + return; + } + if (!prepare_status.ok()) return; + executor_.runtime_state()->filter_bank()->PublishGlobalFilter( + filter_id, thrift_bloom_filter); +} + +const TQueryCtx& FragmentInstanceState::query_ctx() const { + return query_state_->query_ctx(); +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/fragment-instance-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h new file mode 100644 index 0000000..c938a31 --- /dev/null +++ b/be/src/runtime/fragment-instance-state.h @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#ifndef IMPALA_RUNTIME_FRAGMENT_INSTANCE_STATE_H +#define IMPALA_RUNTIME_FRAGMENT_INSTANCE_STATE_H + +#include "common/status.h" +#include "util/promise.h" +#include "runtime/plan-fragment-executor.h" + +#include "gen-cpp/ImpalaInternalService_types.h" + +namespace impala { + +class TPlanFragmentCtx; +class TPlanFragmentInstanceCtx; +class TBloomFilter; +class TUniqueId; +class TNetworkAddress; +class QueryState; +class PlanFragmentExecutor; +class RuntimeProfile; + +/// Collection of state specific to the execution of a particular fragment instance, +/// as well as manager of the execution of that fragment instance, including +/// set-up and tear-down. +/// Tear-down happens automatically in the d'tor and frees all memory allocated for +/// this fragment instance and closes all data streams. +/// +/// Aside from Cancel(), which may be called asynchronously, this class is not +/// thread-safe. +/// +/// TODO: +/// - merge PlanFragmentExecutor into this class +/// - move tear-down logic out of d'tor and into ReleaseResources() function +/// - as part of the per-query exec rpc, get rid of concurrency during setup +/// (and remove prepare_promise_) +/// - move ReportStatusCb() logic into PFE::SendReport() and get rid of the callback +class FragmentInstanceState { + public: + FragmentInstanceState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx, + const TPlanFragmentInstanceCtx& instance_ctx, const TDescriptorTable& desc_tbl); + + /// Frees up all resources allocated in Exec(). + /// It is an error to delete a FragmentInstanceState before Exec() returns. + ~FragmentInstanceState() { } + + /// Main loop of plan fragment execution. Blocks until execution finishes. + void Exec(); + + /// Returns current execution status, if there was an error. Otherwise cancels + /// the fragment and returns OK. + Status Cancel(); + + /// Publishes filter with ID 'filter_id' to this fragment instance's filter bank. + void PublishFilter(int32_t filter_id, const TBloomFilter& thrift_bloom_filter); + + QueryState* query_state() { return query_state_; } + PlanFragmentExecutor* executor() { return &executor_; } + const TQueryCtx& query_ctx() const; + const TPlanFragmentCtx& fragment_ctx() const { return fragment_ctx_; } + const TPlanFragmentInstanceCtx& instance_ctx() const { return instance_ctx_; } + const TUniqueId& query_id() const { return query_ctx().query_id; } + const TUniqueId& instance_id() const { return instance_ctx_.fragment_instance_id; } + const TNetworkAddress& coord_address() const { return query_ctx().coord_address; } + + private: + QueryState* query_state_; + const TPlanFragmentCtx fragment_ctx_; + const TPlanFragmentInstanceCtx instance_ctx_; + + /// instance-specific descriptor table + /// TODO: remove when switching to per-query exec rpc + const TDescriptorTable desc_tbl_; + + PlanFragmentExecutor executor_; + + /// protects exec_status_ + boost::mutex status_lock_; + + /// set in ReportStatusCb(); + /// if set to anything other than OK, execution has terminated w/ an error + Status exec_status_; + + /// Barrier for the completion of executor_.Prepare(). + Promise<Status> prepare_promise_; + + /// Update 'exec_status_' w/ 'status', if the former is not already an error. + /// Returns the value of 'exec_status_' after this method completes. + Status UpdateStatus(const Status& status); + + /// Callback for executor; updates exec_status_ if 'status' indicates an error + /// or if there was a thrift error. + /// If not NULL, `profile` is encoded as a Thrift structure and transmitted as part of + /// the reporting RPC. `profile` may be NULL if a runtime profile has not been created + /// for this fragment (e.g. when the fragment has failed during preparation). + /// The executor must ensure that there is only one invocation at a time. + void ReportStatusCb(const Status& status, RuntimeProfile* profile, bool done); +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/plan-fragment-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc index 3657882..464dbce 100644 --- a/be/src/runtime/plan-fragment-executor.cc +++ b/be/src/runtime/plan-fragment-executor.cc @@ -37,7 +37,9 @@ #include "runtime/descriptors.h" #include "runtime/mem-tracker.h" #include "runtime/row-batch.h" +#include "runtime/query-state.h" #include "runtime/runtime-filter-bank.h" +#include "runtime/exec-env.h" #include "util/container-util.h" #include "runtime/runtime-state.h" #include "util/cpu-info.h" @@ -69,9 +71,8 @@ const string EXEC_TIMER_NAME = "ExecTime"; } PlanFragmentExecutor::PlanFragmentExecutor( - ExecEnv* exec_env, const ReportStatusCallback& report_status_cb) - : exec_env_(exec_env), - exec_tree_(NULL), + const ReportStatusCallback& report_status_cb) + : exec_tree_(NULL), report_status_cb_(report_status_cb), report_thread_active_(false), closed_(false), @@ -92,8 +93,10 @@ PlanFragmentExecutor::~PlanFragmentExecutor() { DCHECK(!report_thread_active_); } -Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { - Status status = PrepareInternal(request); +Status PlanFragmentExecutor::Prepare( + QueryState* query_state, const TDescriptorTable& desc_tbl, + const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& instance_ctx) { + Status status = PrepareInternal(query_state, desc_tbl, fragment_ctx, instance_ctx); prepared_promise_.Set(status); if (!status.ok()) FragmentComplete(status); return status; @@ -105,7 +108,9 @@ Status PlanFragmentExecutor::WaitForOpen() { return opened_promise_.Get(); } -Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& request) { +Status PlanFragmentExecutor::PrepareInternal( + QueryState* qs, const TDescriptorTable& tdesc_tbl, + const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& instance_ctx) { lock_guard<mutex> l(prepare_lock_); DCHECK(!is_prepared_); @@ -113,19 +118,17 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& requ is_prepared_ = true; // TODO: Break this method up. - const TPlanFragmentInstanceCtx& fragment_instance_ctx = request.fragment_instance_ctx; - query_id_ = request.query_ctx.query_id; + query_id_ = qs->query_ctx().query_id; - VLOG_QUERY << "Prepare(): query_id=" << PrintId(query_id_) << " instance_id=" - << PrintId(request.fragment_instance_ctx.fragment_instance_id); - VLOG(2) << "fragment_instance_ctx:\n" << ThriftDebugString(fragment_instance_ctx); - - DCHECK(request.__isset.fragment_ctx); + VLOG_QUERY << "Prepare(): instance_id=" + << PrintId(instance_ctx.fragment_instance_id); + VLOG(2) << "fragment_instance_ctx:\n" << ThriftDebugString(instance_ctx); // Prepare() must not return before runtime_state_ is set if is_prepared_ was // set. Having runtime_state_.get() != NULL is a postcondition of this method in that // case. Do not call RETURN_IF_ERROR or explicitly return before this line. - runtime_state_.reset(new RuntimeState(request, exec_env_)); + runtime_state_.reset( + new RuntimeState(qs, fragment_ctx, instance_ctx, ExecEnv::GetInstance())); // total_time_counter() is in the runtime_state_ so start it up now. SCOPED_TIMER(profile()->total_time_counter()); @@ -143,9 +146,8 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& requ << PrettyPrinter::Print(bytes_limit, TUnit::BYTES); } - DCHECK(!fragment_instance_ctx.request_pool.empty()); - runtime_state_->InitMemTrackers( - query_id_, &fragment_instance_ctx.request_pool, bytes_limit); + DCHECK(!instance_ctx.request_pool.empty()); + runtime_state_->InitMemTrackers(&instance_ctx.request_pool, bytes_limit); RETURN_IF_ERROR(runtime_state_->CreateBlockMgr()); runtime_state_->InitFilterBank(); @@ -167,27 +169,21 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& requ // set up desc tbl DescriptorTbl* desc_tbl = NULL; - DCHECK(request.__isset.query_ctx); - DCHECK(request.query_ctx.__isset.desc_tbl); - RETURN_IF_ERROR( - DescriptorTbl::Create(obj_pool(), request.query_ctx.desc_tbl, &desc_tbl)); + RETURN_IF_ERROR(DescriptorTbl::Create(obj_pool(), tdesc_tbl, &desc_tbl)); runtime_state_->set_desc_tbl(desc_tbl); - VLOG_QUERY << "descriptor table for fragment=" - << request.fragment_instance_ctx.fragment_instance_id + VLOG_QUERY << "descriptor table for fragment=" << instance_ctx.fragment_instance_id << "\n" << desc_tbl->DebugString(); // set up plan - DCHECK(request.__isset.fragment_ctx); RETURN_IF_ERROR(ExecNode::CreateTree( - runtime_state_.get(), request.fragment_ctx.fragment.plan, *desc_tbl, &exec_tree_)); + runtime_state_.get(), fragment_ctx.fragment.plan, *desc_tbl, &exec_tree_)); runtime_state_->set_fragment_root_id(exec_tree_->id()); - if (fragment_instance_ctx.__isset.debug_node_id) { - DCHECK(fragment_instance_ctx.__isset.debug_action); - DCHECK(fragment_instance_ctx.__isset.debug_phase); - ExecNode::SetDebugOptions(fragment_instance_ctx.debug_node_id, - fragment_instance_ctx.debug_phase, fragment_instance_ctx.debug_action, - exec_tree_); + if (instance_ctx.__isset.debug_node_id) { + DCHECK(instance_ctx.__isset.debug_action); + DCHECK(instance_ctx.__isset.debug_phase); + ExecNode::SetDebugOptions(instance_ctx.debug_node_id, instance_ctx.debug_phase, + instance_ctx.debug_action, exec_tree_); } // set #senders of exchange nodes before calling Prepare() @@ -195,8 +191,8 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& requ exec_tree_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes); for (ExecNode* exch_node : exch_nodes) { DCHECK_EQ(exch_node->type(), TPlanNodeType::EXCHANGE_NODE); - int num_senders = FindWithDefault(fragment_instance_ctx.per_exch_num_senders, - exch_node->id(), 0); + int num_senders = + FindWithDefault(instance_ctx.per_exch_num_senders, exch_node->id(), 0); DCHECK_GT(num_senders, 0); static_cast<ExchangeNode*>(exch_node)->set_num_senders(num_senders); } @@ -208,7 +204,7 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& requ for (int i = 0; i < scan_nodes.size(); ++i) { ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]); const vector<TScanRangeParams>& scan_ranges = FindWithDefault( - fragment_instance_ctx.per_node_scan_ranges, scan_node->id(), no_scan_ranges); + instance_ctx.per_node_scan_ranges, scan_node->id(), no_scan_ranges); scan_node->SetScanRanges(scan_ranges); } @@ -220,13 +216,13 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& requ RETURN_IF_ERROR(exec_tree_->Prepare(state)); } - PrintVolumeIds(fragment_instance_ctx.per_node_scan_ranges); + PrintVolumeIds(instance_ctx.per_node_scan_ranges); - DCHECK(request.fragment_ctx.fragment.__isset.output_sink); + DCHECK(fragment_ctx.fragment.__isset.output_sink); RETURN_IF_ERROR( - DataSink::CreateDataSink(obj_pool(), request.fragment_ctx.fragment.output_sink, - request.fragment_ctx.fragment.output_exprs, fragment_instance_ctx, - exec_tree_->row_desc(), &sink_)); + DataSink::CreateDataSink(obj_pool(), fragment_ctx.fragment.output_sink, + fragment_ctx.fragment.output_exprs, instance_ctx, exec_tree_->row_desc(), + &sink_)); RETURN_IF_ERROR( sink_->Prepare(runtime_state(), runtime_state_->instance_mem_tracker())); @@ -235,7 +231,7 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& requ profile()->AddChild(sink_profile); } - if (request.fragment_ctx.fragment.output_sink.type == TDataSinkType::PLAN_ROOT_SINK) { + if (fragment_ctx.fragment.output_sink.type == TDataSinkType::PLAN_ROOT_SINK) { root_sink_ = reinterpret_cast<PlanRootSink*>(sink_.get()); // Release the thread token on the root fragment instance. This fragment spends most // of the time waiting and doing very little work. Holding on to the token causes http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/plan-fragment-executor.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h index f93dab4..2194e58 100644 --- a/be/src/runtime/plan-fragment-executor.h +++ b/be/src/runtime/plan-fragment-executor.h @@ -84,7 +84,7 @@ class PlanFragmentExecutor { /// report_status_cb, if !empty(), is used to report the accumulated profile /// information periodically during execution. - PlanFragmentExecutor(ExecEnv* exec_env, const ReportStatusCallback& report_status_cb); + PlanFragmentExecutor(const ReportStatusCallback& report_status_cb); /// It is an error to delete a PlanFragmentExecutor with a report callback before Exec() /// indicated that execution is finished, or to delete one that has not been Close()'d @@ -103,11 +103,16 @@ class PlanFragmentExecutor { /// Status::CANCELLED; /// /// If Prepare() fails, it will invoke final status callback with the error status. - Status Prepare(const TExecPlanFragmentParams& request); + /// TODO: remove desc_tbl parameter once we do a per-query exec rpc (and we + /// have a single descriptor table to cover all fragment instances); at the moment + /// we need to pass the TDescriptorTable explicitly + Status Prepare(QueryState* query_state, const TDescriptorTable& desc_tbl, + const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& instance_ctx); /// Opens the fragment plan and sink. Starts the profile reporting thread, if /// required. Can be called only if Prepare() succeeded. If Open() fails it will /// invoke the final status callback with the error status. + /// TODO: is this needed? It's only ever called in conjunction with Exec() and Close() Status Open(); /// Executes the fragment by repeatedly driving the sink with batches produced by the @@ -153,7 +158,6 @@ class PlanFragmentExecutor { static const std::string PER_HOST_PEAK_MEM_COUNTER; private: - ExecEnv* exec_env_; // not owned ExecNode* exec_tree_; // lives in runtime_state_->obj_pool() TUniqueId query_id_; @@ -278,7 +282,10 @@ class PlanFragmentExecutor { Status ExecInternal(); /// Performs all the logic of Prepare() and returns resulting status. - Status PrepareInternal(const TExecPlanFragmentParams& request); + /// TODO: remove desc_tbl parameter as part of per-query exec rpc + Status PrepareInternal(QueryState* qs, const TDescriptorTable& desc_tbl, + const TPlanFragmentCtx& fragment_ctx, + const TPlanFragmentInstanceCtx& instance_ctx); /// Releases the thread token for this fragment executor. void ReleaseThreadToken(); @@ -288,7 +295,6 @@ class PlanFragmentExecutor { void StopReportThread(); /// Print stats about scan ranges for each volumeId in params to info log. - void PrintVolumeIds(const TPlanExecParams& params); void PrintVolumeIds(const PerNodeScanRanges& per_node_scan_ranges); const DescriptorTbl& desc_tbl() { return runtime_state_->desc_tbl(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/query-exec-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc new file mode 100644 index 0000000..4a3742c --- /dev/null +++ b/be/src/runtime/query-exec-mgr.cc @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include "runtime/query-exec-mgr.h" + +#include <gperftools/malloc_extension.h> +#include <gutil/strings/substitute.h> +#include <boost/thread/locks.hpp> +#include <boost/thread/lock_guard.hpp> + +#include "common/logging.h" +#include "runtime/query-state.h" +#include "runtime/fragment-instance-state.h" +#include "runtime/exec-env.h" +#include "runtime/mem-tracker.h" +#include "util/uid-util.h" +#include "util/thread.h" +#include "util/impalad-metrics.h" +#include "util/debug-util.h" + +using boost::lock_guard; +using namespace impala; + +// TODO: this logging should go into a per query log. +DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory usage " + "every log_mem_usage_interval'th fragment completion."); + +Status QueryExecMgr::StartFInstance(const TExecPlanFragmentParams& params) { + TUniqueId instance_id = params.fragment_instance_ctx.fragment_instance_id; + VLOG_QUERY << "StartFInstance() instance_id=" << PrintId(instance_id) + << " coord=" << params.query_ctx.coord_address; + + // Starting a new fragment instance creates a thread and consumes a non-trivial + // amount of memory. If we are already starved for memory, cancel the instance as + // early as possible to avoid digging the hole deeper. + MemTracker* process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker(); + if (process_mem_tracker->LimitExceeded()) { + string msg = Substitute("Instance $0 of plan fragment $1 could not " + "start because the backend Impala daemon is over its memory limit", + PrintId(instance_id), params.fragment_ctx.fragment.display_name); + return process_mem_tracker->MemLimitExceeded(NULL, msg, 0); + } + + QueryState* qs = nullptr; + int refcnt; + { + lock_guard<mutex> l(qs_map_lock_); + TUniqueId query_id = params.query_ctx.query_id; + auto it = qs_map_.find(query_id); + if (it == qs_map_.end()) { + // register new QueryState + qs = new QueryState(params.query_ctx); + qs_map_.insert(make_pair(query_id, qs)); + VLOG_QUERY << "new QueryState: query_id=" << query_id; + } else { + qs = it->second; + } + // decremented at the end of ExecFInstance() + refcnt = qs->refcnt_.Add(1); + } + DCHECK(qs != nullptr && qs->refcnt_.Load() > 0); + VLOG_QUERY << "QueryState: query_id=" << params.query_ctx.query_id + << " refcnt=" << refcnt; + + DCHECK(params.__isset.fragment_ctx); + DCHECK(params.__isset.fragment_instance_ctx); + FragmentInstanceState* fis = qs->obj_pool()->Add( + new FragmentInstanceState(qs, params.fragment_ctx, params.fragment_instance_ctx, + params.query_ctx.desc_tbl)); + // register instance before returning so that async Cancel() calls can + // find the instance + qs->RegisterFInstance(fis); + // start new thread to execute instance + Thread t("query-exec-mgr", + Substitute("exec-fragment-instance-$0", PrintId(instance_id)), + &QueryExecMgr::ExecFInstance, this, fis); + t.Detach(); + + ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(1L); + ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->Increment(1L); + return Status::OK(); +} + +void QueryExecMgr::ExecFInstance(FragmentInstanceState* fis) { + fis->Exec(); + + ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(-1L); + VLOG_QUERY << "Instance completed. instance_id=" << PrintId(fis->instance_id()); + +#ifndef ADDRESS_SANITIZER + // tcmalloc and address sanitizer can not be used together + if (FLAGS_log_mem_usage_interval > 0) { + uint64_t num_complete = ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->value(); + if (num_complete % FLAGS_log_mem_usage_interval == 0) { + char buf[2048]; + // This outputs how much memory is currently being used by this impalad + MallocExtension::instance()->GetStats(buf, 2048); + LOG(INFO) << buf; + } + } +#endif + + // decrement refcount taken in StartFInstance() + ReleaseQueryState(fis->query_state()); +} + +QueryState* QueryExecMgr::GetQueryState(const TUniqueId& query_id) { + VLOG_QUERY << "GetQueryState(): query_id=" << PrintId(query_id); + lock_guard<mutex> l(qs_map_lock_); + auto it = qs_map_.find(query_id); + if (it == qs_map_.end()) return nullptr; + QueryState* qs = it->second; + int32_t cnt = qs->refcnt_.Add(1); + DCHECK_GT(cnt, 0); + return qs; +} + +void QueryExecMgr::ReleaseQueryState(QueryState* qs) { + DCHECK(qs != nullptr); + TUniqueId query_id; + { + int32_t cnt = qs->refcnt_.Add(-1); + VLOG_QUERY << "ReleaseQueryState(): query_id=" << PrintId(qs->query_id()) + << " refcnt=" << cnt + 1; + DCHECK_GE(cnt, 0); + if (cnt > 0) return; + // don't reference anything from 'qs' beyond this point, 'qs' might get + // gc'd out from under us + query_id = qs->query_id(); + } + + { + // for now, gc right away + lock_guard<mutex> l(qs_map_lock_); + auto it = qs_map_.find(query_id); + // someone else might have gc'd the entry + if (it == qs_map_.end()) return; + qs = it->second; + DCHECK_EQ(qs->query_ctx().query_id, query_id); + int32_t cnt = qs->refcnt_.Load(); + DCHECK_GE(cnt, 0); + // someone else might have increased the refcnt in the meantime + if (cnt > 0) return; + size_t num_erased = qs_map_.erase(qs->query_ctx().query_id); + DCHECK_EQ(num_erased, 1); + } + // TODO: send final status report during gc, but do this from a different thread + delete qs; +} + http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/query-exec-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-exec-mgr.h b/be/src/runtime/query-exec-mgr.h new file mode 100644 index 0000000..7b3fb84 --- /dev/null +++ b/be/src/runtime/query-exec-mgr.h @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#ifndef IMPALA_RUNTIME_QUERY_EXEC_MGR_H +#define IMPALA_RUNTIME_QUERY_EXEC_MGR_H + +#include <boost/thread/mutex.hpp> +#include <unordered_map> + +#include "common/status.h" +#include "util/uid-util.h" +#include "gen-cpp/Types_types.h" + +namespace impala { + +class QueryState; +class Thread; +class TExecPlanFragmentParams; +class TUniqueId; +class FragmentInstanceState; + +/// A daemon-wide registry and manager of QueryStates. This is the central +/// entry point for gaining refcounted access to a QueryState. It also initiates +/// fragment instance execution. +/// Thread-safe. +/// +/// TODO: as part of Impala-2550 (per-query exec rpc) +/// replace Start-/CancelFInstance() with StartQuery()/CancelQuery() +class QueryExecMgr { + public: + /// Initiates execution of this fragment instance in a newly created thread. + /// Also creates a QueryState for this query, if none exists. + /// In both cases it increases the refcount prior to instance execution and decreases + /// it after execution finishes. + /// + /// Returns an error if there was some unrecoverable problem before the fragment + /// was started (like low memory). In that case, no QueryState is created or has its + /// refcount incremented. After this call returns, it is legal to call + /// FragmentInstanceState::Cancel() on this fragment instance, regardless of the + /// return value of this function. + Status StartFInstance(const TExecPlanFragmentParams& params); + + /// If a QueryState for the given query exists, increments that refcount and returns + /// the QueryState, otherwise returns nullptr. + QueryState* GetQueryState(const TUniqueId& query_id); + + /// Decrements the refcount for the given QueryState. + void ReleaseQueryState(QueryState* qs); + + private: + /// protects qs_map_ + boost::mutex qs_map_lock_; + + /// map from query id to QueryState (owned by us) + std::unordered_map<TUniqueId, QueryState*> qs_map_; + + /// Execute instance. + void ExecFInstance(FragmentInstanceState* fis); +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/query-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc new file mode 100644 index 0000000..2757750 --- /dev/null +++ b/be/src/runtime/query-state.cc @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include "runtime/query-state.h" + +#include <boost/thread/locks.hpp> +#include <boost/thread/lock_guard.hpp> + +#include "runtime/exec-env.h" +#include "runtime/fragment-instance-state.h" +#include "runtime/query-exec-mgr.h" + +using namespace impala; + +QueryState::ScopedRef::ScopedRef(const TUniqueId& query_id) { + DCHECK(ExecEnv::GetInstance()->query_exec_mgr() != nullptr); + query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id); +} + +QueryState::ScopedRef::~ScopedRef() { + if (query_state_ == nullptr) return; + ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_); +} + +QueryState::QueryState(const TQueryCtx& query_ctx) + : query_ctx_(query_ctx), + refcnt_(0) { + TQueryOptions& query_options = query_ctx_.client_request.query_options; + // max_errors does not indicate how many errors in total have been recorded, but rather + // how many are distinct. It is defined as the sum of the number of generic errors and + // the number of distinct other errors. + if (query_options.max_errors <= 0) { + // TODO: fix linker error and uncomment this + //query_options_.max_errors = FLAGS_max_errors; + query_options.max_errors = 100; + } + if (query_options.batch_size <= 0) { + query_options.__set_batch_size(DEFAULT_BATCH_SIZE); + } +} + +void QueryState::RegisterFInstance(FragmentInstanceState* fis) { + VLOG_QUERY << "RegisterFInstance(): instance_id=" << PrintId(fis->instance_id()); + lock_guard<mutex> l(fis_map_lock_); + DCHECK_EQ(fis_map_.count(fis->instance_id()), 0); + fis_map_.insert(make_pair(fis->instance_id(), fis)); +} + +FragmentInstanceState* QueryState::GetFInstanceState(const TUniqueId& instance_id) { + VLOG_FILE << "GetFInstanceState(): instance_id=" << PrintId(instance_id); + lock_guard<mutex> l(fis_map_lock_); + auto it = fis_map_.find(instance_id); + return it != fis_map_.end() ? it->second : nullptr; +}
