IMPALA-3902: Scheduler improvements for running multiple fragment instances on a single backend
This is an extension of the scheduler and coordinator for multi-threaded execution. It mainly removes the assumption of having one instance per fragment per host. The approach taken here is to create parallel data structures and control flow functions, where necessary, and otherwise to leave the existing single-instance logic in place. The parallel structures' and functions' names are prefixed with "Mt" to facilitate the enventual clean-up. Not much of an attempt was made to factor out common functionality between the Mt- and the single-threaded version, because the single-threaded version will disappear in a follow-on patch and refactoring the existing code to fit into two parallel functions from which it's being called might end up obscuring the code more than helping it. Also, this code is relatively stable and having two parallel paths won't cause much extra work (in terms of having to apply the same changes/fixes twice) in the medium term. Changes to data structures: - QuerySchedule: per-instance and per-fragment structs with complete execution parameters (instead of partially relying on TQueryExecRequest); the per-instance execution parameter struct is a child of the per-fragment parameter struct - explicit fragment id, with range 0..#fragments-1 (instead of relying on an index into an array in TQueryExecRequest) Excluded: - runtime filter handling - anything related to RM Change-Id: I240445199e22f009f4e72fdb8754eb8d77e3d680 Reviewed-on: http://gerrit.cloudera.org:8080/4054 Reviewed-by: Marcel Kornacker <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a9b9933b Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a9b9933b Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a9b9933b Branch: refs/heads/master Commit: a9b9933b5f059bd908291fd94d6f6f4fb88eeb7a Parents: 5d9f5be Author: Marcel Kornacker <[email protected]> Authored: Mon Jul 18 08:37:21 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Thu Oct 6 00:20:36 2016 +0000 ---------------------------------------------------------------------- be/src/common/global-types.h | 1 + be/src/exec/exec-node.cc | 2 +- be/src/exec/union-node.cc | 2 +- be/src/runtime/coordinator.cc | 810 +++++++++++++------ be/src/runtime/coordinator.h | 103 ++- be/src/runtime/runtime-filter-bank.cc | 2 + be/src/scheduling/query-schedule.cc | 158 +++- be/src/scheduling/query-schedule.h | 155 +++- be/src/scheduling/scheduler.h | 10 +- be/src/scheduling/simple-scheduler.cc | 263 +++++- be/src/scheduling/simple-scheduler.h | 38 +- be/src/service/fragment-exec-state.cc | 1 - be/src/service/fragment-mgr.cc | 7 +- be/src/service/impala-server.cc | 15 +- be/src/service/query-exec-state.cc | 18 +- be/src/service/query-options.cc | 10 +- be/src/service/query-options.h | 2 +- be/src/util/container-util.h | 26 + be/src/util/uid-util-test.cc | 7 +- be/src/util/uid-util.h | 2 +- common/thrift/ExecStats.thrift | 10 +- common/thrift/Frontend.thrift | 32 +- common/thrift/ImpalaInternalService.thrift | 61 +- common/thrift/ImpalaService.thrift | 5 +- common/thrift/Planner.thrift | 9 +- common/thrift/Types.thrift | 1 + .../java/org/apache/impala/common/TreeNode.java | 16 + .../java/org/apache/impala/planner/Planner.java | 23 + .../org/apache/impala/service/Frontend.java | 260 ++++-- .../apache/impala/planner/PlannerTestBase.java | 6 +- 30 files changed, 1525 insertions(+), 530 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/common/global-types.h ---------------------------------------------------------------------- diff --git a/be/src/common/global-types.h b/be/src/common/global-types.h index d4a41a5..3111e94 100644 --- a/be/src/common/global-types.h +++ b/be/src/common/global-types.h @@ -27,5 +27,6 @@ typedef int TupleId; typedef int SlotId; typedef int TableId; typedef int PlanNodeId; +typedef int FragmentIdx; }; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/exec/exec-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index 2dce0d3..837fc09 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -264,7 +264,7 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode, switch (tnode.node_type) { case TPlanNodeType::HDFS_SCAN_NODE: *node = pool->Add(new HdfsScanNode(pool, tnode, descs)); - if (state->query_options().mt_num_cores > 0) { + if (state->query_options().mt_dop > 0) { *node = pool->Add(new HdfsScanNodeMt(pool, tnode, descs)); } else { *node = pool->Add(new HdfsScanNode(pool, tnode, descs)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/exec/union-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc index 9fa8d28..8b56099 100644 --- a/be/src/exec/union-node.cc +++ b/be/src/exec/union-node.cc @@ -171,7 +171,7 @@ Status UnionNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { // Evaluate and materialize the const expr lists exactly once. while (const_result_expr_idx_ < const_result_expr_ctx_lists_.size()) { // Only evaluate the const expr lists by the first fragment instance. - if (state->fragment_ctx().fragment_instance_idx == 0) { + if (state->fragment_ctx().per_fragment_instance_idx == 0) { // Materialize expr results into row_batch. RETURN_IF_ERROR(EvalAndMaterializeExprs( const_result_expr_ctx_lists_[const_result_expr_idx_], true, &tuple, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index e2dd1a4..df4ad7b 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -66,6 +66,7 @@ #include "util/pretty-printer.h" #include "util/summary-util.h" #include "util/table-printer.h" +#include "util/uid-util.h" #include "gen-cpp/ImpalaInternalService.h" #include "gen-cpp/ImpalaInternalService_types.h" #include "gen-cpp/Frontend_types.h" @@ -111,10 +112,10 @@ struct DebugOptions { // If these debug options apply to the candidate fragment instance, returns true // otherwise returns false. - bool IsApplicable(int candidate_fragment_instance_idx) { + bool IsApplicable(int candidate_instance_state_idx) { if (phase == TExecNodePhase::INVALID) return false; return (instance_state_idx == -1 || - instance_state_idx == candidate_fragment_instance_idx); + instance_state_idx == candidate_instance_state_idx); } }; @@ -124,16 +125,35 @@ struct DebugOptions { /// - updates through UpdateFragmentExecStatus() class Coordinator::FragmentInstanceState { public: - FragmentInstanceState(int fragment_idx, const FragmentExecParams* params, - int instance_idx, ObjectPool* obj_pool) - : fragment_instance_id_(params->instance_ids[instance_idx]), - impalad_address_(params->hosts[instance_idx]), - total_split_size_(0), + // TODO-MT: remove this c'tor + FragmentInstanceState(FragmentIdx fragment_idx, const FragmentExecParams& params, + int per_fragment_instance_idx, ObjectPool* obj_pool) + : fragment_instance_id_(params.instance_ids[per_fragment_instance_idx]), fragment_idx_(fragment_idx), - instance_idx_(instance_idx), + per_fragment_instance_idx_(per_fragment_instance_idx), + impalad_address_(params.hosts[per_fragment_instance_idx]), + total_split_size_(0), rpc_sent_(false), done_(false), profile_created_(false), + profile_(NULL), + total_ranges_complete_(0), + rpc_latency_(0) { + const string& profile_name = Substitute("Instance $0 (host=$1)", + PrintId(fragment_instance_id_), lexical_cast<string>(impalad_address_)); + profile_ = obj_pool->Add(new RuntimeProfile(obj_pool, profile_name)); + } + + FragmentInstanceState(const FInstanceExecParams& params, ObjectPool* obj_pool) + : fragment_instance_id_(params.instance_id), + fragment_idx_(params.fragment().idx), + per_fragment_instance_idx_(params.per_fragment_instance_idx), + impalad_address_(params.host), + total_split_size_(0), + rpc_sent_(false), + done_(false), + profile_created_(false), + profile_(NULL), total_ranges_complete_(0), rpc_latency_(0) { const string& profile_name = Substitute("Instance $0 (host=$1)", @@ -161,26 +181,31 @@ class Coordinator::FragmentInstanceState { // The following getters do not require lock() to be held. const TUniqueId& fragment_instance_id() const { return fragment_instance_id_; } + FragmentIdx fragment_idx() const { return fragment_idx_; } MonotonicStopWatch* stopwatch() { return &stopwatch_; } const TNetworkAddress& impalad_address() const { return impalad_address_; } int64_t total_split_size() const { return total_split_size_; } bool done() const { return done_; } - int fragment_idx() const { return fragment_idx_; } - int instance_idx() const { return instance_idx_; } + int per_fragment_instance_idx() const { return per_fragment_instance_idx_; } bool rpc_sent() const { return rpc_sent_; } int64_t rpc_latency() const { return rpc_latency_; } - // Getters below must be accessed with lock() held - RuntimeProfile* profile() { return profile_; } + mutex* lock() { return &lock_; } + + void set_status(const Status& status) { status_ = status; } + void set_done(bool done) { done_ = done; } + void set_rpc_latency(int64_t millis) { + DCHECK_EQ(rpc_latency_, 0); + rpc_latency_ = millis; + } + + // Return values of the following functions must be accessed with lock() held + RuntimeProfile* profile() const { return profile_; } + void set_profile(RuntimeProfile* profile) { profile_ = profile; } FragmentInstanceCounters* aggregate_counters() { return &aggregate_counters_; } ErrorLogMap* error_log() { return &error_log_; } Status* status() { return &status_; } - mutex* lock() { return &lock_; } - - void SetStatus(const Status& status) { status_ = status; } - void SetDone(bool done) { done_ = done; } - /// Registers that the fragment instance's profile has been created and initially /// populated. Returns whether the profile had already been initialised so that callers /// can tell if they are the first to do so. Not thread-safe. @@ -190,16 +215,17 @@ class Coordinator::FragmentInstanceState { return cur; } - void SetRpcLatency(int64_t millis) { - DCHECK_EQ(rpc_latency_, 0); - rpc_latency_ = millis; - } - private: /// The unique ID of this instance of this fragment (there may be many instance of the /// same fragment, but this ID uniquely identifies this FragmentInstanceState). TUniqueId fragment_instance_id_; + // Same as TPlanFragment.idx + FragmentIdx fragment_idx_; + + /// range: 0..<# instances of this fragment>-1 + int per_fragment_instance_idx_; + /// Wall clock timer for this fragment. MonotonicStopWatch stopwatch_; @@ -209,12 +235,6 @@ class Coordinator::FragmentInstanceState { /// Summed across all splits; in bytes. int64_t total_split_size_; - /// Fragment idx for this ExecState - int fragment_idx_; - - /// The 0-based instance idx. - int instance_idx_; - /// Protects fields below. Can be held while doing an RPC, so SpinLock is a bad idea. /// lock ordering: Coordinator::lock_ must only be obtained *prior* to lock_ mutex lock_; @@ -268,10 +288,14 @@ class Coordinator::FilterState { disabled_(false) { } TBloomFilter* bloom_filter() { return bloom_filter_.get(); } - boost::unordered_set<int>* src_fragment_instance_idxs() { - return &src_fragment_instance_idxs_; + boost::unordered_set<int>* src_fragment_instance_state_idxs() { + return &src_fragment_instance_state_idxs_; + } + const boost::unordered_set<int>& src_fragment_instance_state_idxs() const { + return src_fragment_instance_state_idxs_; } std::vector<FilterTarget>* targets() { return &targets_; } + const std::vector<FilterTarget>& targets() const { return targets_; } int64_t first_arrival_time() const { return first_arrival_time_; } int64_t completion_time() const { return completion_time_; } const TPlanNodeId& src() const { return src_; } @@ -295,7 +319,7 @@ class Coordinator::FilterState { std::vector<FilterTarget> targets_; // Index into fragment_instance_states_ for source fragment instances. - boost::unordered_set<int> src_fragment_instance_idxs_; + boost::unordered_set<int> src_fragment_instance_state_idxs_; /// Number of remaining backends to hear from before filter is complete. int pending_count_; @@ -347,9 +371,10 @@ int64_t Coordinator::FragmentInstanceState::UpdateNumScanRangesCompleted() { return delta; } -Coordinator::Coordinator(const TQueryOptions& query_options, ExecEnv* exec_env, +Coordinator::Coordinator(const QuerySchedule& schedule, ExecEnv* exec_env, RuntimeProfile::EventSequence* events) - : exec_env_(exec_env), + : schedule_(schedule), + exec_env_(exec_env), has_called_wait_(false), returned_all_results_(false), executor_(NULL), // Set in Prepare() @@ -358,7 +383,7 @@ Coordinator::Coordinator(const TQueryOptions& query_options, ExecEnv* exec_env, obj_pool_(new ObjectPool()), query_events_(events), filter_routing_table_complete_(false), - filter_mode_(query_options.runtime_filter_mode), + filter_mode_(schedule.query_options().runtime_filter_mode), torn_down_(false) { } @@ -423,16 +448,16 @@ static void ProcessQueryOptions( << "because nodes cannot be cancelled in Close()"; } -Status Coordinator::Exec(QuerySchedule& schedule, - vector<ExprContext*>* output_expr_ctxs) { - const TQueryExecRequest& request = schedule.request(); - DCHECK_GT(request.fragments.size(), 0); +Status Coordinator::Exec(vector<ExprContext*>* output_expr_ctxs) { + const TQueryExecRequest& request = schedule_.request(); + DCHECK(request.fragments.size() > 0 || request.mt_plan_exec_info.size() > 0); needs_finalization_ = request.__isset.finalize_params; if (needs_finalization_) finalize_params_ = request.finalize_params; - VLOG_QUERY << "Exec() query_id=" << schedule.query_id(); + VLOG_QUERY << "Exec() query_id=" << schedule_.query_id() + << " stmt=" << request.query_ctx.request.stmt; stmt_type_ = request.stmt_type; - query_id_ = schedule.query_id(); + query_id_ = schedule_.query_id(); desc_tbl_ = request.desc_tbl; query_ctx_ = request.query_ctx; @@ -443,51 +468,35 @@ Status Coordinator::Exec(QuerySchedule& schedule, SCOPED_TIMER(query_profile_->total_time_counter()); - const TNetworkAddress& coord = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port); - // After the coordinator fragment is started, it may call UpdateFilter() asynchronously, // which waits on this barrier for completion. - if (schedule.num_fragment_instances() > 0) { - exec_complete_barrier_.reset(new CountingBarrier(schedule.num_fragment_instances())); + // TODO: remove special treatment of coord fragment + int num_remote_instances = schedule_.GetNumRemoteFInstances(); + if (num_remote_instances > 0) { + exec_complete_barrier_.reset(new CountingBarrier(num_remote_instances)); } + num_remaining_fragment_instances_ = num_remote_instances; + + // TODO: move initial setup into a separate function; right now part of it + // (InitExecProfile()) depends on the coordinator fragment having been started + + // initialize progress updater + const string& str = Substitute("Query $0", PrintId(query_id_)); + progress_.Init(str, schedule_.num_scan_ranges()); // to keep things simple, make async Cancel() calls wait until plan fragment // execution has been initiated, otherwise we might try to cancel fragment // execution at Impala daemons where it hasn't even started lock_guard<mutex> l(lock_); - // we run the root fragment ourselves if it is unpartitioned - bool has_coordinator_fragment = - request.fragments[0].partition.type == TPartitionType::UNPARTITIONED; - + bool has_coordinator_fragment = schedule_.GetCoordFragment() != NULL; if (has_coordinator_fragment) { - executor_.reset(new PlanFragmentExecutor( - exec_env_, PlanFragmentExecutor::ReportStatusCallback())); - // If a coordinator fragment is requested (for most queries this will be the case, the - // exception is parallel INSERT queries), start this before starting any more plan + // Start this before starting any more plan // fragments, otherwise they start sending data before the local exchange node had a // chance to register with the stream mgr. // TODO: This is no longer necessary (see IMPALA-1599). Consider starting all // fragments in the same way with no coordinator special case. - if (filter_mode_ != TRuntimeFilterMode::OFF) { - UpdateFilterRoutingTable(request.fragments[0].plan.nodes, 1, 0); - if (schedule.num_fragment_instances() == 0) MarkFilterRoutingTableComplete(); - } - TExecPlanFragmentParams rpc_params; - SetExecPlanFragmentParams(schedule, request.fragments[0], - (*schedule.exec_params())[0], 0, 0, 0, coord, &rpc_params); - RETURN_IF_ERROR(executor_->Prepare(rpc_params)); - - // Prepare output_expr_ctxs before optimizing the LLVM module. The other exprs of this - // coordinator fragment have been prepared in executor_->Prepare(). - DCHECK(output_expr_ctxs != NULL); - RETURN_IF_ERROR(Expr::CreateExprTrees( - runtime_state()->obj_pool(), request.fragments[0].output_exprs, - output_expr_ctxs)); - MemTracker* output_expr_tracker = runtime_state()->obj_pool()->Add(new MemTracker( - -1, "Output exprs", runtime_state()->instance_mem_tracker(), false)); - RETURN_IF_ERROR(Expr::Prepare( - *output_expr_ctxs, runtime_state(), row_desc(), output_expr_tracker)); + RETURN_IF_ERROR(PrepareCoordFragment(output_expr_ctxs)); } else { // The coordinator instance may require a query mem tracker even if there is no // coordinator fragment. For example, result-caching tracks memory via the query mem @@ -500,7 +509,7 @@ Status Coordinator::Exec(QuerySchedule& schedule, query_limit = query_ctx_.request.query_options.mem_limit; } MemTracker* pool_tracker = MemTracker::GetRequestPoolMemTracker( - schedule.request_pool(), exec_env_->process_mem_tracker()); + schedule_.request_pool(), exec_env_->process_mem_tracker()); query_mem_tracker_ = MemTracker::GetQueryMemTracker(query_id_, query_limit, pool_tracker); @@ -509,17 +518,29 @@ Status Coordinator::Exec(QuerySchedule& schedule, filter_mem_tracker_.reset( new MemTracker(-1, "Runtime Filter (Coordinator)", query_mem_tracker(), false)); - // Initialize the execution profile structures. - InitExecProfile(request); + // initialize execution profile structures + bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0; + if (is_mt_execution) { + MtInitExecProfiles(); + MtInitExecSummary(); + } else { + InitExecProfile(request); + } - // Once remote fragments are started, they can start making ReportExecStatus RPCs, - // which will update the progress updater. So initialize it before starting remote - // fragments. - const string& str = Substitute("Query $0", PrintId(query_id_)); - progress_.Init(str, schedule.num_scan_ranges()); + if (num_remote_instances > 0) { + // pre-size fragment_instance_states_ in order to directly address by instance idx + // when creating FragmentInstanceStates (instead of push_back()) + int num_fragment_instances = schedule_.GetTotalFInstances(); + DCHECK_GT(num_fragment_instances, 0); + fragment_instance_states_.resize(num_fragment_instances); + + if (is_mt_execution) { + MtStartRemoteFInstances(); + } else { + StartRemoteFragments(); + } + RETURN_IF_ERROR(FinishRemoteInstanceStartup()); - if (schedule.num_fragment_instances() > 0) { - RETURN_IF_ERROR(StartRemoteFragments(&schedule)); // If we have a coordinator fragment and remote fragments (the common case), release // the thread token on the coordinator fragment. This fragment spends most of the time // waiting and doing very little work. Holding on to the token causes underutilization @@ -533,7 +554,7 @@ Status Coordinator::Exec(QuerySchedule& schedule, } void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes, - int num_hosts, int start_fragment_instance_idx) { + int num_hosts, int start_fragment_instance_state_idx) { DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF) << "UpdateFilterRoutingTable() called although runtime filters are disabled"; DCHECK(!filter_routing_table_complete_) @@ -555,7 +576,7 @@ void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes, f->set_pending_count(pending_count); vector<int> src_idxs; for (int i = 0; i < num_hosts; ++i) { - src_idxs.push_back(start_fragment_instance_idx + i); + src_idxs.push_back(start_fragment_instance_state_idx + i); } // If this is a broadcast join with only non-local targets, build and publish it @@ -567,7 +588,7 @@ void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes, random_shuffle(src_idxs.begin(), src_idxs.end()); src_idxs.resize(MAX_BROADCAST_FILTER_PRODUCERS); } - f->src_fragment_instance_idxs()->insert(src_idxs.begin(), src_idxs.end()); + f->src_fragment_instance_state_idxs()->insert(src_idxs.begin(), src_idxs.end()); } else if (plan_node.__isset.hdfs_scan_node) { auto it = filter.planid_to_target_ndx.find(plan_node.node_id); DCHECK(it != filter.planid_to_target_ndx.end()); @@ -577,7 +598,8 @@ void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes, } FilterTarget target(tFilterTarget); for (int i = 0; i < num_hosts; ++i) { - target.fragment_instance_idxs.insert(start_fragment_instance_idx + i); + target.fragment_instance_state_idxs.insert( + start_fragment_instance_state_idx + i); } f->targets()->push_back(target); } else { @@ -588,32 +610,82 @@ void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes, } } -Status Coordinator::StartRemoteFragments(QuerySchedule* schedule) { - int32_t num_fragment_instances = schedule->num_fragment_instances(); - DCHECK_GT(num_fragment_instances , 0); +Status Coordinator::PrepareCoordFragment(vector<ExprContext*>* output_expr_ctxs) { + const TQueryExecRequest& request = schedule_.request(); + bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0; + if (!is_mt_execution && filter_mode_ != TRuntimeFilterMode::OFF) { + UpdateFilterRoutingTable(schedule_.GetCoordFragment()->plan.nodes, 1, 0); + if (schedule_.GetNumFragmentInstances() == 0) MarkFilterRoutingTableComplete(); + } + + // create rpc params and FragmentInstanceState for the coordinator fragment + TExecPlanFragmentParams rpc_params; + FragmentInstanceState* coord_state = nullptr; + if (is_mt_execution) { + const FInstanceExecParams& coord_params = schedule_.GetCoordInstanceExecParams(); + MtSetExecPlanFragmentParams(coord_params, &rpc_params); + coord_state = obj_pool()->Add( + new FragmentInstanceState(coord_params, obj_pool())); + } else { + const TPlanFragment& coord_fragment = *schedule_.GetCoordFragment(); + SetExecPlanFragmentParams( + coord_fragment, schedule_.exec_params()[0], 0, &rpc_params); + coord_state = obj_pool()->Add( + new FragmentInstanceState( + coord_fragment.idx, schedule_.exec_params()[0], 0, obj_pool())); + // apparently this was never called for the coordinator fragment + // TODO: fix this + //exec_state->ComputeTotalSplitSize( + //rpc_params.fragment_instance_ctx.per_node_scan_ranges); + } + // register state before calling Prepare(), in case it fails + DCHECK_EQ(GetInstanceIdx(coord_state->fragment_instance_id()), 0); + fragment_instance_states_.push_back(coord_state); + DCHECK(coord_state != nullptr); + DCHECK_EQ(fragment_instance_states_.size(), 1); + executor_.reset(new PlanFragmentExecutor( + exec_env_, PlanFragmentExecutor::ReportStatusCallback())); + RETURN_IF_ERROR(executor_->Prepare(rpc_params)); + coord_state->set_profile(executor_->profile()); + + // Prepare output_expr_ctxs before optimizing the LLVM module. The other exprs of this + // coordinator fragment have been prepared in executor_->Prepare(). + DCHECK(output_expr_ctxs != NULL); + RETURN_IF_ERROR(Expr::CreateExprTrees( + runtime_state()->obj_pool(), schedule_.GetCoordFragment()->output_exprs, + output_expr_ctxs)); + MemTracker* output_expr_tracker = runtime_state()->obj_pool()->Add(new MemTracker( + -1, "Output exprs", runtime_state()->instance_mem_tracker(), false)); + RETURN_IF_ERROR(Expr::Prepare( + *output_expr_ctxs, runtime_state(), row_desc(), output_expr_tracker)); + + return Status::OK(); +} + +void Coordinator::StartRemoteFragments() { + int num_fragment_instances = schedule_.GetNumFragmentInstances(); DebugOptions debug_options; - ProcessQueryOptions(schedule->query_options(), &debug_options); - const TQueryExecRequest& request = schedule->request(); + ProcessQueryOptions(schedule_.query_options(), &debug_options); + const TQueryExecRequest& request = schedule_.request(); - fragment_instance_states_.resize(num_fragment_instances); - num_remaining_fragment_instances_ = num_fragment_instances; VLOG_QUERY << "starting " << num_fragment_instances << " fragment instances for query " << query_id_; - query_events_->MarkEvent( - Substitute("Ready to start $0 remote fragments", num_fragment_instances)); + Substitute("Ready to start $0 remote fragment instances", num_fragment_instances)); - int instance_state_idx = 0; bool has_coordinator_fragment = request.fragments[0].partition.type == TPartitionType::UNPARTITIONED; + int instance_state_idx = has_coordinator_fragment ? 1 : 0; int first_remote_fragment_idx = has_coordinator_fragment ? 1 : 0; if (filter_mode_ != TRuntimeFilterMode::OFF) { - // Populate the runtime filter routing table. This should happen before - // starting the remote fragments. + // Populate the runtime filter routing table. This should happen before starting + // the remote fragments. + // This code anticipates the indices of the instance states created later on in + // ExecRemoteFragment() for (int fragment_idx = first_remote_fragment_idx; fragment_idx < request.fragments.size(); ++fragment_idx) { - const FragmentExecParams* params = &(*schedule->exec_params())[fragment_idx]; - int num_hosts = params->hosts.size(); + const FragmentExecParams& params = schedule_.exec_params()[fragment_idx]; + int num_hosts = params.hosts.size(); DCHECK_GT(num_hosts, 0); UpdateFilterRoutingTable(request.fragments[fragment_idx].plan.nodes, num_hosts, instance_state_idx); @@ -622,13 +694,13 @@ Status Coordinator::StartRemoteFragments(QuerySchedule* schedule) { MarkFilterRoutingTableComplete(); } - instance_state_idx = 0; + int num_instances = 0; // Start one fragment instance per fragment per host (number of hosts running each // fragment may not be constant). for (int fragment_idx = first_remote_fragment_idx; fragment_idx < request.fragments.size(); ++fragment_idx) { - const FragmentExecParams* params = &(*schedule->exec_params())[fragment_idx]; - int num_hosts = params->hosts.size(); + const FragmentExecParams& params = schedule_.exec_params()[fragment_idx]; + int num_hosts = params.hosts.size(); DCHECK_GT(num_hosts, 0); fragment_profiles_[fragment_idx].num_instances = num_hosts; // Start one fragment instance for every fragment_instance required by the @@ -636,24 +708,62 @@ Status Coordinator::StartRemoteFragments(QuerySchedule* schedule) { // instances for fragment ID 0 being assigned IDs [0 .. num_hosts(fragment_id_0)] and // so on. for (int fragment_instance_idx = 0; fragment_instance_idx < num_hosts; - ++fragment_instance_idx) { + ++fragment_instance_idx, ++num_instances) { DebugOptions* fragment_instance_debug_options = debug_options.IsApplicable(instance_state_idx) ? &debug_options : NULL; exec_env_->fragment_exec_thread_pool()->Offer( - bind<void>(mem_fn(&Coordinator::ExecRemoteFragment), this, - params, // fragment_exec_params - &request.fragments[fragment_idx], // plan_fragment, - fragment_instance_debug_options, - schedule, - instance_state_idx++, - fragment_idx, - fragment_instance_idx)); + std::bind(&Coordinator::ExecRemoteFragment, this, std::cref(params), + std::cref(request.fragments[fragment_idx]), fragment_instance_debug_options, + fragment_instance_idx)); + } + } + exec_complete_barrier_->Wait(); + query_events_->MarkEvent( + Substitute("All $0 remote fragments instances started", num_instances)); +} + +void Coordinator::MtStartRemoteFInstances() { + int num_fragment_instances = schedule_.GetNumFragmentInstances(); + DebugOptions debug_options; + ProcessQueryOptions(schedule_.query_options(), &debug_options); + const TQueryExecRequest& request = schedule_.request(); + + VLOG_QUERY << "starting " << num_fragment_instances << " fragment instances for query " + << query_id_; + query_events_->MarkEvent( + Substitute("Ready to start $0 remote fragment instances", num_fragment_instances)); + + // TODO: populate the runtime filter routing table + // this requires local aggregation of filters prior to sending + // for broadcast joins in order to avoid more complicated merge logic here + + int num_instances = 0; + for (const MtFragmentExecParams& fragment_params: schedule_.mt_fragment_exec_params()) { + if (fragment_params.is_coord_fragment) continue; + for (int i = 0; i < fragment_params.instance_exec_params.size(); + ++i, ++num_instances) { + const FInstanceExecParams& instance_params = + fragment_params.instance_exec_params[i]; + FragmentInstanceState* exec_state = obj_pool()->Add( + new FragmentInstanceState(instance_params, obj_pool())); + int instance_state_idx = GetInstanceIdx(instance_params.instance_id); + fragment_instance_states_[instance_state_idx] = exec_state; + + DebugOptions* instance_debug_options = + debug_options.IsApplicable(instance_state_idx) ? &debug_options : NULL; + exec_env_->fragment_exec_thread_pool()->Offer( + std::bind(&Coordinator::MtExecRemoteFInstance, + this, std::cref(instance_params), instance_debug_options)); } } exec_complete_barrier_->Wait(); + VLOG_QUERY << "started " << num_fragment_instances << " fragment instances for query " + << query_id_; query_events_->MarkEvent( - Substitute("All $0 remote fragments started", instance_state_idx)); + Substitute("All $0 remote fragment instances started", num_instances)); +} +Status Coordinator::FinishRemoteInstanceStartup() { Status status = Status::OK(); const TMetricDef& def = MakeTMetricDef("fragment-latencies", TMetricKind::HISTOGRAM, TUnit::TIME_MS); @@ -665,7 +775,8 @@ Status Coordinator::StartRemoteFragments(QuerySchedule* schedule) { latencies.Update(exec_state->rpc_latency()); } - query_profile_->AddInfoString("Fragment start latencies", latencies.ToHumanReadable()); + query_profile_->AddInfoString( + "Fragment instance start latencies", latencies.ToHumanReadable()); if (!status.ok()) { DCHECK(query_status_.ok()); // nobody should have been able to cancel @@ -694,17 +805,17 @@ string Coordinator::FilterDebugString() { lock_guard<SpinLock> l(filter_lock_); for (FilterRoutingTable::value_type& v: filter_routing_table_) { vector<string> row; - FilterState& state = v.second; + const FilterState& state = v.second; row.push_back(lexical_cast<string>(v.first)); row.push_back(lexical_cast<string>(state.src())); vector<string> target_ids; vector<string> num_target_instances; vector<string> target_types; vector<string> partition_filter; - for (const FilterTarget& target: *state.targets()) { + for (const FilterTarget& target: state.targets()) { target_ids.push_back(lexical_cast<string>(target.node_id)); num_target_instances.push_back( - lexical_cast<string>(target.fragment_instance_idxs.size())); + lexical_cast<string>(target.fragment_instance_state_idxs.size())); target_types.push_back(target.is_local ? "LOCAL" : "REMOTE"); partition_filter.push_back(target.is_bound_by_partition_columns ? "true" : "false"); } @@ -716,7 +827,7 @@ string Coordinator::FilterDebugString() { if (filter_mode_ == TRuntimeFilterMode::GLOBAL) { int pending_count = state.completion_time() != 0L ? 0 : state.pending_count(); row.push_back(Substitute("$0 ($1)", pending_count, - state.src_fragment_instance_idxs()->size())); + state.src_fragment_instance_state_idxs().size())); if (state.first_arrival_time() == 0L) { row.push_back("N/A"); } else { @@ -1041,7 +1152,7 @@ Status Coordinator::FinalizeSuccessfulInsert() { } Status Coordinator::FinalizeQuery() { - // All backends must have reported their final statuses before finalization, which is a + // All instances must have reported their final statuses before finalization, which is a // post-condition of Wait. If the query was not successful, still try to clean up the // staging directory. DCHECK(has_called_wait_); @@ -1066,17 +1177,17 @@ Status Coordinator::FinalizeQuery() { return return_status; } -Status Coordinator::WaitForAllBackends() { +Status Coordinator::WaitForAllInstances() { unique_lock<mutex> l(lock_); while (num_remaining_fragment_instances_ > 0 && query_status_.ok()) { - VLOG_QUERY << "Coordinator waiting for backends to finish, " + VLOG_QUERY << "Coordinator waiting for fragment instances to finish, " << num_remaining_fragment_instances_ << " remaining"; - backend_completion_cv_.wait(l); + instance_completion_cv_.wait(l); } if (query_status_.ok()) { - VLOG_QUERY << "All backends finished successfully."; + VLOG_QUERY << "All fragment instances finished successfully."; } else { - VLOG_QUERY << "All backends finished due to one or more errors."; + VLOG_QUERY << "All fragment instances finished due to one or more errors."; } return query_status_; @@ -1100,8 +1211,8 @@ Status Coordinator::Wait() { RuntimeState* state = runtime_state(); DCHECK(state != NULL); - // No other backends should have updated these structures if the coordinator has a - // fragment. (Backends have a sink only if the coordinator does not) + // No other instances should have updated these structures if the coordinator has a + // fragment. (Instances have a sink only if the coordinator does not) DCHECK_EQ(files_to_move_.size(), 0); DCHECK_EQ(per_partition_status_.size(), 0); @@ -1110,13 +1221,13 @@ Status Coordinator::Wait() { per_partition_status_ = *state->per_partition_status(); } } else { - // Query finalization can only happen when all backends have reported + // Query finalization can only happen when all instances have reported // relevant state. They only have relevant state to report in the parallel // INSERT case, otherwise all the relevant state is from the coordinator // fragment which will be available after Open() returns. // Ignore the returned status if finalization is required., since FinalizeQuery() will // pick it up and needs to execute regardless. - Status status = WaitForAllBackends(); + Status status = WaitForAllInstances(); if (!needs_finalization_ && !status.ok()) return status; } @@ -1178,13 +1289,13 @@ Status Coordinator::GetNext(RowBatch** batch, RuntimeState* state) { } } - // Don't return final NULL until all backends have completed. - // GetNext must wait for all backends to complete before + // Don't return final NULL until all instances have completed. + // GetNext must wait for all instances to complete before // ultimately signalling the end of execution via a NULL // batch. After NULL is returned, the coordinator may tear down // query state, and perform post-query finalization which might - // depend on the reports from all backends. - RETURN_IF_ERROR(WaitForAllBackends()); + // depend on the reports from all instances. + RETURN_IF_ERROR(WaitForAllInstances()); if (query_status_.ok()) { // If the query completed successfully, report aggregate query profiles. ReportQuerySummary(); @@ -1218,14 +1329,13 @@ void Coordinator::ValidateCollectionSlots(RowBatch* batch) { } void Coordinator::PrintFragmentInstanceInfo() { - for (int i = 0; i < fragment_instance_states_.size(); ++i) { - SummaryStats& acc = - fragment_profiles_[fragment_instance_states_[i]->fragment_idx()].bytes_assigned; - acc(fragment_instance_states_[i]->total_split_size()); + for (FragmentInstanceState* state: fragment_instance_states_) { + SummaryStats& acc = fragment_profiles_[state->fragment_idx()].bytes_assigned; + acc(state->total_split_size()); } - for (int i = (executor_.get() == NULL ? 0 : 1); i < fragment_profiles_.size(); ++i) { - SummaryStats& acc = fragment_profiles_[i].bytes_assigned; + for (int id = (executor_.get() == NULL ? 0 : 1); id < fragment_profiles_.size(); ++id) { + SummaryStats& acc = fragment_profiles_[id].bytes_assigned; double min = accumulators::min(acc); double max = accumulators::max(acc); double mean = accumulators::mean(acc); @@ -1235,13 +1345,12 @@ void Coordinator::PrintFragmentInstanceInfo() { << ", max: " << PrettyPrinter::Print(max, TUnit::BYTES) << ", avg: " << PrettyPrinter::Print(mean, TUnit::BYTES) << ", stddev: " << PrettyPrinter::Print(stddev, TUnit::BYTES); - fragment_profiles_[i].averaged_profile->AddInfoString("split sizes", ss.str()); + fragment_profiles_[id].averaged_profile->AddInfoString("split sizes", ss.str()); if (VLOG_FILE_IS_ON) { - VLOG_FILE << "Byte split for fragment " << i << " " << ss.str(); - for (int j = 0; j < fragment_instance_states_.size(); ++j) { - FragmentInstanceState* exec_state = fragment_instance_states_[j]; - if (exec_state->fragment_idx() != i) continue; + VLOG_FILE << "Byte split for fragment " << id << " " << ss.str(); + for (FragmentInstanceState* exec_state: fragment_instance_states_) { + if (exec_state->fragment_idx() != id) continue; VLOG_FILE << "data volume for ipaddress " << exec_state << ": " << PrettyPrinter::Print(exec_state->total_split_size(), TUnit::BYTES); } @@ -1260,7 +1369,7 @@ void Coordinator::InitExecProfile(const TQueryExecRequest& request) { for (int j = 0; j < plan.nodes.size(); ++j) { TPlanNodeExecSummary node; node.node_id = plan.nodes[j].node_id; - node.fragment_id = i; + node.fragment_idx = i; node.label = plan.nodes[j].label; node.__set_label_detail(plan.nodes[j].label_detail); node.num_children = plan.nodes[j].num_children; @@ -1331,12 +1440,102 @@ void Coordinator::InitExecProfile(const TQueryExecRequest& request) { } } +void Coordinator::MtInitExecSummary() { + const TQueryExecRequest& request = schedule_.request(); + // init exec_summary_.{nodes, exch_to_sender_map} + exec_summary_.__isset.nodes = true; + DCHECK(exec_summary_.nodes.empty()); + for (const TPlanExecInfo& plan_exec_info: request.mt_plan_exec_info) { + for (const TPlanFragment& fragment: plan_exec_info.fragments) { + if (!fragment.__isset.plan) continue; + + // eventual index of fragment's root node in exec_summary_.nodes + int root_node_idx = exec_summary_.nodes.size(); + + const TPlan& plan = fragment.plan; + int num_instances = + schedule_.GetFragmentExecParams(fragment.idx).instance_exec_params.size(); + for (const TPlanNode& node: plan.nodes) { + plan_node_id_to_summary_map_[node.node_id] = exec_summary_.nodes.size(); + exec_summary_.nodes.emplace_back(); + TPlanNodeExecSummary& node_summary = exec_summary_.nodes.back(); + node_summary.__set_node_id(node.node_id); + node_summary.__set_fragment_idx(fragment.idx); + node_summary.__set_label(node.label); + node_summary.__set_label_detail(node.label_detail); + node_summary.__set_num_children(node.num_children); + if (node.__isset.estimated_stats) { + node_summary.__set_estimated_stats(node.estimated_stats); + } + node_summary.exec_stats.resize(num_instances); + } + + if (fragment.__isset.output_sink + && fragment.output_sink.type == TDataSinkType::DATA_STREAM_SINK) { + const TDataStreamSink& sink = fragment.output_sink.stream_sink; + int exch_idx = plan_node_id_to_summary_map_[sink.dest_node_id]; + if (sink.output_partition.type == TPartitionType::UNPARTITIONED) { + exec_summary_.nodes[exch_idx].__set_is_broadcast(true); + } + exec_summary_.__isset.exch_to_sender_map = true; + exec_summary_.exch_to_sender_map[exch_idx] = root_node_idx; + } + } + } +} + +void Coordinator::MtInitExecProfiles() { + const TQueryExecRequest& request = schedule_.request(); + vector<const TPlanFragment*> fragments; + schedule_.GetTPlanFragments(&fragments); + fragment_profiles_.resize(fragments.size()); + + // start with coordinator fragment, if there is one + const TPlanFragment* coord_fragment = schedule_.GetCoordFragment(); + if (coord_fragment != NULL) { + DCHECK(executor_.get() != NULL); + PerFragmentProfileData* data = &fragment_profiles_[coord_fragment->idx]; + data->num_instances = 1; + // TODO: fix this; this is not an averaged profile; we should follow the exact + // same structure we have for all other profiles (average + root + single + // instance profile) + data->averaged_profile = executor_->profile(); + + // register coordinator's fragment profile in the query profile now, before those + // of the fragment instances, so it shows up at the top + query_profile_->AddChild(executor_->profile()); + executor_->profile()->set_name(Substitute("Coordinator Fragment $0", + coord_fragment->display_name)); + CollectScanNodeCounters(executor_->profile(), &coordinator_counters_); + } + + // Initialize the runtime profile structure. This adds the per fragment average + // profiles followed by the per fragment instance profiles. + for (const TPlanFragment* fragment: fragments) { + if (fragment == coord_fragment) continue; + PerFragmentProfileData* data = &fragment_profiles_[fragment->idx]; + data->num_instances = + schedule_.GetFragmentExecParams(fragment->idx).instance_exec_params.size(); + + data->averaged_profile = + obj_pool()->Add(new RuntimeProfile(obj_pool(), + Substitute("Averaged Fragment $0", fragment->display_name), true)); + query_profile_->AddChild(data->averaged_profile, true); + data->root_profile = + obj_pool()->Add(new RuntimeProfile(obj_pool(), + Substitute("Fragment $0", fragment->display_name))); + // Note: we don't start the wall timer here for the fragment profile; + // it's uninteresting and misleading. + query_profile_->AddChild(data->root_profile); + } +} + + void Coordinator::CollectScanNodeCounters(RuntimeProfile* profile, FragmentInstanceCounters* counters) { vector<RuntimeProfile*> children; profile->GetAllChildren(&children); - for (int i = 0; i < children.size(); ++i) { - RuntimeProfile* p = children[i]; + for (RuntimeProfile* p: children) { PlanNodeId id = ExecNode::GetNodeIdFromProfile(p); // This profile is not for an exec node. @@ -1355,28 +1554,87 @@ void Coordinator::CollectScanNodeCounters(RuntimeProfile* profile, } } -void Coordinator::ExecRemoteFragment(const FragmentExecParams* fragment_exec_params, - const TPlanFragment* plan_fragment, DebugOptions* debug_options, - QuerySchedule* schedule, int instance_state_idx, int fragment_idx, +void Coordinator::MtExecRemoteFInstance( + const FInstanceExecParams& exec_params, const DebugOptions* debug_options) { + NotifyBarrierOnExit notifier(exec_complete_barrier_.get()); + TExecPlanFragmentParams rpc_params; + MtSetExecPlanFragmentParams(exec_params, &rpc_params); + if (debug_options != NULL) { + rpc_params.fragment_instance_ctx.__set_debug_node_id(debug_options->node_id); + rpc_params.fragment_instance_ctx.__set_debug_action(debug_options->action); + rpc_params.fragment_instance_ctx.__set_debug_phase(debug_options->phase); + } + int instance_state_idx = GetInstanceIdx(exec_params.instance_id); + FragmentInstanceState* exec_state = fragment_instance_states_[instance_state_idx]; + exec_state->ComputeTotalSplitSize( + rpc_params.fragment_instance_ctx.per_node_scan_ranges); + VLOG_FILE << "making rpc: ExecPlanFragment" + << " host=" << exec_state->impalad_address() + << " instance_id=" << PrintId(exec_state->fragment_instance_id()); + + // Guard against concurrent UpdateExecStatus() that may arrive after RPC returns. + lock_guard<mutex> l(*exec_state->lock()); + int64_t start = MonotonicMillis(); + + Status client_connect_status; + ImpalaBackendConnection backend_client(exec_env_->impalad_client_cache(), + exec_state->impalad_address(), &client_connect_status); + if (!client_connect_status.ok()) { + exec_state->SetInitialStatus(client_connect_status); + return; + } + + TExecPlanFragmentResult thrift_result; + Status rpc_status = backend_client.DoRpc(&ImpalaBackendClient::ExecPlanFragment, + rpc_params, &thrift_result); + exec_state->set_rpc_latency(MonotonicMillis() - start); + + const string ERR_TEMPLATE = "ExecPlanRequest rpc query_id=$0 instance_id=$1 failed: $2"; + + if (!rpc_status.ok()) { + const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id()), + PrintId(exec_state->fragment_instance_id()), rpc_status.msg().msg()); + VLOG_QUERY << err_msg; + exec_state->SetInitialStatus(Status(err_msg)); + return; + } + + Status exec_status = Status(thrift_result.status); + if (!exec_status.ok()) { + const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id()), + PrintId(exec_state->fragment_instance_id()), + exec_status.msg().GetFullMessageDetails()); + VLOG_QUERY << err_msg; + exec_state->SetInitialStatus(Status(err_msg)); + return; + } + + exec_state->SetInitialStatus(Status::OK()); + VLOG_FILE << "rpc succeeded: ExecPlanFragment" + << " instance_id=" << PrintId(exec_state->fragment_instance_id()); +} + +void Coordinator::ExecRemoteFragment(const FragmentExecParams& fragment_exec_params, + const TPlanFragment& plan_fragment, DebugOptions* debug_options, int fragment_instance_idx) { NotifyBarrierOnExit notifier(exec_complete_barrier_.get()); TExecPlanFragmentParams rpc_params; - SetExecPlanFragmentParams(*schedule, *plan_fragment, *fragment_exec_params, - instance_state_idx, fragment_idx, fragment_instance_idx, - MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port), &rpc_params); + SetExecPlanFragmentParams( + plan_fragment, fragment_exec_params, fragment_instance_idx, &rpc_params); if (debug_options != NULL) { rpc_params.fragment_instance_ctx.__set_debug_node_id(debug_options->node_id); rpc_params.fragment_instance_ctx.__set_debug_action(debug_options->action); rpc_params.fragment_instance_ctx.__set_debug_phase(debug_options->phase); } FragmentInstanceState* exec_state = obj_pool()->Add( - new FragmentInstanceState(fragment_idx, fragment_exec_params, fragment_instance_idx, - obj_pool())); + new FragmentInstanceState( + plan_fragment.idx, fragment_exec_params, fragment_instance_idx, obj_pool())); exec_state->ComputeTotalSplitSize( rpc_params.fragment_instance_ctx.per_node_scan_ranges); + int instance_state_idx = GetInstanceIdx(exec_state->fragment_instance_id()); fragment_instance_states_[instance_state_idx] = exec_state; - VLOG_FILE << "making rpc: ExecPlanFragment query_id=" << query_id_ - << " instance_id=" << exec_state->fragment_instance_id() + VLOG_FILE << "making rpc: ExecPlanFragment" + << " instance_id=" << PrintId(exec_state->fragment_instance_id()) << " host=" << exec_state->impalad_address(); // Guard against concurrent UpdateExecStatus() that may arrive after RPC returns. @@ -1395,13 +1653,14 @@ void Coordinator::ExecRemoteFragment(const FragmentExecParams* fragment_exec_par Status rpc_status = backend_client.DoRpc(&ImpalaBackendClient::ExecPlanFragment, rpc_params, &thrift_result); - exec_state->SetRpcLatency(MonotonicMillis() - start); + exec_state->set_rpc_latency(MonotonicMillis() - start); - const string ERR_TEMPLATE = "ExecPlanRequest rpc query_id=$0 instance_id=$1 failed: $2"; + const string ERR_TEMPLATE = "ExecPlanRequest rpc instance_id=$0 failed: $1"; if (!rpc_status.ok()) { - const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id()), - PrintId(exec_state->fragment_instance_id()), rpc_status.msg().msg()); + const string& err_msg = + Substitute(ERR_TEMPLATE, PrintId(exec_state->fragment_instance_id()), + rpc_status.msg().msg()); VLOG_QUERY << err_msg; exec_state->SetInitialStatus(Status(err_msg)); return; @@ -1409,9 +1668,9 @@ void Coordinator::ExecRemoteFragment(const FragmentExecParams* fragment_exec_par Status exec_plan_status = Status(thrift_result.status); if (!exec_plan_status.ok()) { - const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id()), - PrintId(exec_state->fragment_instance_id()), - exec_plan_status.msg().GetFullMessageDetails()); + const string& err_msg = + Substitute(ERR_TEMPLATE, PrintId(exec_state->fragment_instance_id()), + exec_plan_status.msg().GetFullMessageDetails()); VLOG_QUERY << err_msg; exec_state->SetInitialStatus(Status(err_msg)); return; @@ -1444,8 +1703,9 @@ void Coordinator::CancelInternal() { } void Coordinator::CancelRemoteFragments() { - for (int i = 0; i < fragment_instance_states_.size(); ++i) { - FragmentInstanceState* exec_state = fragment_instance_states_[i]; + for (FragmentInstanceState* exec_state: fragment_instance_states_) { + DCHECK(exec_state != nullptr); + if (exec_state->fragment_idx() == 0) continue; // the coord fragment // If a fragment failed before we finished issuing all remote fragments, // this function will have been called before we finished populating @@ -1467,7 +1727,7 @@ void Coordinator::CancelRemoteFragments() { if (exec_state->done()) continue; // set an error status to make sure we only cancel this once - exec_state->SetStatus(Status::CANCELLED); + exec_state->set_status(Status::CANCELLED); // if we get an error while trying to get a connection to the backend, // keep going @@ -1507,14 +1767,15 @@ void Coordinator::CancelRemoteFragments() { } // notify that we completed with an error - backend_completion_cv_.notify_all(); + instance_completion_cv_.notify_all(); } Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& params) { - VLOG_FILE << "UpdateFragmentExecStatus() query_id=" << query_id_ + VLOG_FILE << "UpdateFragmentExecStatus() " + << " instance=" << PrintId(params.fragment_instance_id) << " status=" << params.status.status_code << " done=" << (params.done ? "true" : "false"); - uint32_t instance_state_idx = params.instance_state_idx; + int instance_state_idx = GetInstanceIdx(params.fragment_instance_id); if (instance_state_idx >= fragment_instance_states_.size()) { return Status(TErrorCode::INTERNAL_ERROR, Substitute("Unknown fragment instance index $0 (max known: $1)", @@ -1531,15 +1792,15 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para // process a non-error message from a fragment executor that is sent // before query cancellation is invoked. Make sure we don't go from error status to // OK. - exec_state->SetStatus(status); + exec_state->set_status(status); } - exec_state->SetDone(params.done); + exec_state->set_done(params.done); if (exec_state->status()->ok()) { // We can't update this backend's profile if ReportQuerySummary() is running, // because it depends on all profiles not changing during its execution (when it // calls SortChildren()). ReportQuerySummary() only gets called after - // WaitForAllBackends() returns or at the end of CancelRemoteFragments(). - // WaitForAllBackends() only returns after all backends have completed (in which + // WaitForAllInstances() returns or at the end of CancelRemoteFragments(). + // WaitForAllInstances() only returns after all backends have completed (in which // case we wouldn't be in this function), or when there's an error, in which case // CancelRemoteFragments() is called. CancelRemoteFragments sets all exec_state's // statuses to cancelled. @@ -1550,8 +1811,7 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para // Update the average profile for the fragment corresponding to this instance. exec_state->profile()->ComputeTimeInProfile(); UpdateAverageProfile(exec_state); - UpdateExecSummary(exec_state->fragment_idx(), exec_state->instance_idx(), - exec_state->profile()); + UpdateExecSummary(*exec_state); } if (!exec_state->SetProfileCreated()) { CollectScanNodeCounters(exec_state->profile(), exec_state->aggregate_counters()); @@ -1615,16 +1875,14 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para lock_guard<mutex> l(lock_); exec_state->stopwatch()->Stop(); DCHECK_GT(num_remaining_fragment_instances_, 0); - VLOG_QUERY << "Fragment instance " << params.instance_state_idx << "(" - << exec_state->fragment_instance_id() << ") on host " - << exec_state->impalad_address() << " completed, " - << num_remaining_fragment_instances_ - 1 << " remaining: query_id=" - << query_id_; + VLOG_QUERY << "Fragment instance completed: " + << " id=" << PrintId(exec_state->fragment_instance_id()) + << " host=" << exec_state->impalad_address() + << " remaining=" << num_remaining_fragment_instances_ - 1; if (VLOG_QUERY_IS_ON && num_remaining_fragment_instances_ > 1) { // print host/port info for the first backend that's still in progress as a // debugging aid for backend deadlocks - for (int i = 0; i < fragment_instance_states_.size(); ++i) { - FragmentInstanceState* exec_state = fragment_instance_states_[i]; + for (FragmentInstanceState* exec_state: fragment_instance_states_) { lock_guard<mutex> l2(*exec_state->lock()); if (!exec_state->done()) { VLOG_QUERY << "query_id=" << query_id_ << ": first in-progress backend: " @@ -1634,7 +1892,7 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para } } if (--num_remaining_fragment_instances_ == 0) { - backend_completion_cv_.notify_all(); + instance_completion_cv_.notify_all(); } } @@ -1666,7 +1924,7 @@ bool Coordinator::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) { return catalog_update->created_partitions.size() != 0; } -// Comparator to order fragments by descending total time +// Comparator to order RuntimeProfiles by descending total time typedef struct { typedef pair<RuntimeProfile*, bool> Profile; bool operator()(const Profile& a, const Profile& b) const { @@ -1676,56 +1934,58 @@ typedef struct { } } InstanceComparator; -// Update fragment average profile information from a backend execution state. -void Coordinator::UpdateAverageProfile(FragmentInstanceState* fragment_instance_state) { - int fragment_idx = fragment_instance_state->fragment_idx(); +void Coordinator::UpdateAverageProfile(FragmentInstanceState* instance_state) { + FragmentIdx fragment_idx = instance_state->fragment_idx(); DCHECK_GE(fragment_idx, 0); DCHECK_LT(fragment_idx, fragment_profiles_.size()); - PerFragmentProfileData& data = fragment_profiles_[fragment_idx]; + PerFragmentProfileData* data = &fragment_profiles_[fragment_idx]; // No locks are taken since UpdateAverage() and AddChild() take their own locks - data.averaged_profile->UpdateAverage(fragment_instance_state->profile()); - data.root_profile->AddChild(fragment_instance_state->profile()); + data->averaged_profile->UpdateAverage(instance_state->profile()); + data->root_profile->AddChild(instance_state->profile()); } -// Compute fragment summary information from a backend execution state. -void Coordinator::ComputeFragmentSummaryStats( - FragmentInstanceState* fragment_instance_state) { - int fragment_idx = fragment_instance_state->fragment_idx(); +void Coordinator::ComputeFragmentSummaryStats(FragmentInstanceState* instance_state) { + FragmentIdx fragment_idx = instance_state->fragment_idx(); DCHECK_GE(fragment_idx, 0); DCHECK_LT(fragment_idx, fragment_profiles_.size()); - PerFragmentProfileData& data = fragment_profiles_[fragment_idx]; + PerFragmentProfileData* data = &fragment_profiles_[fragment_idx]; - int64_t completion_time = fragment_instance_state->stopwatch()->ElapsedTime(); - data.completion_times(completion_time); - data.rates(fragment_instance_state->total_split_size() / (completion_time / 1000.0 - / 1000.0 / 1000.0)); + int64_t completion_time = instance_state->stopwatch()->ElapsedTime(); + data->completion_times(completion_time); + data->rates(instance_state->total_split_size() + / (completion_time / 1000.0 / 1000.0 / 1000.0)); // Add the child in case it has not been added previously // via UpdateAverageProfile(). AddChild() will do nothing if the child // already exists. - data.root_profile->AddChild(fragment_instance_state->profile()); + data->root_profile->AddChild(instance_state->profile()); } -void Coordinator::UpdateExecSummary(int fragment_idx, int instance_idx, - RuntimeProfile* profile) { +void Coordinator::UpdateExecSummary(const FragmentInstanceState& instance_state) { vector<RuntimeProfile*> children; - profile->GetAllChildren(&children); + instance_state.profile()->GetAllChildren(&children); lock_guard<SpinLock> l(exec_summary_lock_); for (int i = 0; i < children.size(); ++i) { - int id = ExecNode::GetNodeIdFromProfile(children[i]); - if (id == -1) continue; + int node_id = ExecNode::GetNodeIdFromProfile(children[i]); + if (node_id == -1) continue; TPlanNodeExecSummary& exec_summary = - exec_summary_.nodes[plan_node_id_to_summary_map_[id]]; + exec_summary_.nodes[plan_node_id_to_summary_map_[node_id]]; if (exec_summary.exec_stats.empty()) { // First time, make an exec_stats for each instance this plan node is running on. - DCHECK_LT(fragment_idx, fragment_profiles_.size()); - exec_summary.exec_stats.resize(fragment_profiles_[fragment_idx].num_instances); + // TODO-MT: remove this and initialize all runtime state prior to starting + // instances + DCHECK_LT(instance_state.fragment_idx(), fragment_profiles_.size()); + exec_summary.exec_stats.resize( + fragment_profiles_[instance_state.fragment_idx()].num_instances); } - DCHECK_LT(instance_idx, exec_summary.exec_stats.size()); - TExecStats& stats = exec_summary.exec_stats[instance_idx]; + DCHECK_LT(instance_state.per_fragment_instance_idx(), exec_summary.exec_stats.size()); + DCHECK_EQ(fragment_profiles_[instance_state.fragment_idx()].num_instances, + exec_summary.exec_stats.size()); + TExecStats& stats = + exec_summary.exec_stats[instance_state.per_fragment_instance_idx()]; RuntimeProfile::Counter* rows_counter = children[i]->GetCounter("RowsReturned"); RuntimeProfile::Counter* mem_counter = children[i]->GetCounter("PeakMemoryUsage"); @@ -1750,22 +2010,20 @@ void Coordinator::ReportQuerySummary() { // the query has made so little progress, reporting a summary is not very useful. if (!has_called_wait_) return; - // The fragment has finished executing. Update the profile to compute the - // fraction of time spent in each node. - if (executor_.get() != NULL) { - executor_->profile()->ComputeTimeInProfile(); - UpdateExecSummary(0, 0, executor_->profile()); - } - if (!fragment_instance_states_.empty()) { // Average all remote fragments for each fragment. - for (int i = 0; i < fragment_instance_states_.size(); ++i) { - fragment_instance_states_[i]->profile()->ComputeTimeInProfile(); - UpdateAverageProfile(fragment_instance_states_[i]); - ComputeFragmentSummaryStats(fragment_instance_states_[i]); - UpdateExecSummary(fragment_instance_states_[i]->fragment_idx(), - fragment_instance_states_[i]->instance_idx(), - fragment_instance_states_[i]->profile()); + for (FragmentInstanceState* state: fragment_instance_states_) { + // TODO: make profiles uniform across all fragments so we don't have + // to keep special-casing the coord fragment + if (state->fragment_idx() == 0) { + state->profile()->ComputeTimeInProfile(); + UpdateExecSummary(*state); + } else { + state->profile()->ComputeTimeInProfile(); + UpdateAverageProfile(state); + ComputeFragmentSummaryStats(state); + UpdateExecSummary(*state); + } } InstanceComparator comparator; @@ -1809,26 +2067,14 @@ void Coordinator::ReportQuerySummary() { // Map from Impalad address to peak memory usage of this query typedef unordered_map<TNetworkAddress, int64_t> PerNodePeakMemoryUsage; PerNodePeakMemoryUsage per_node_peak_mem_usage; - if (executor_.get() != NULL) { - // Coordinator fragment is not included in fragment_instance_states_. - RuntimeProfile::Counter* mem_usage_counter = - executor_->profile()->GetCounter( - PlanFragmentExecutor::PER_HOST_PEAK_MEM_COUNTER); - if (mem_usage_counter != NULL) { - TNetworkAddress coord = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port); - per_node_peak_mem_usage[coord] = mem_usage_counter->value(); - } - } - for (int i = 0; i < fragment_instance_states_.size(); ++i) { + for (FragmentInstanceState* state: fragment_instance_states_) { int64_t initial_usage = 0; int64_t* mem_usage = FindOrInsert(&per_node_peak_mem_usage, - fragment_instance_states_[i]->impalad_address(), initial_usage); + state->impalad_address(), initial_usage); RuntimeProfile::Counter* mem_usage_counter = - fragment_instance_states_[i]->profile()->GetCounter( - PlanFragmentExecutor::PER_HOST_PEAK_MEM_COUNTER); + state->profile()->GetCounter(PlanFragmentExecutor::PER_HOST_PEAK_MEM_COUNTER); if (mem_usage_counter != NULL && mem_usage_counter->value() > *mem_usage) { - per_node_peak_mem_usage[fragment_instance_states_[i]->impalad_address()] = - mem_usage_counter->value(); + per_node_peak_mem_usage[state->impalad_address()] = mem_usage_counter->value(); } } stringstream info; @@ -1844,25 +2090,49 @@ string Coordinator::GetErrorLog() { ErrorLogMap merged; { lock_guard<mutex> l(lock_); + // TODO-MT: use FragmentInstanceState::error_log_ instead + // as part of getting rid of the special-casing of the coordinator instance if (executor_.get() != NULL && executor_->runtime_state() != NULL) { ErrorLogMap runtime_error_log; executor_->runtime_state()->GetErrors(&runtime_error_log); MergeErrorMaps(&merged, runtime_error_log); } } - for (int i = 0; i < fragment_instance_states_.size(); ++i) { - lock_guard<mutex> l(*fragment_instance_states_[i]->lock()); - if (fragment_instance_states_[i]->error_log()->size() > 0) { - MergeErrorMaps(&merged, *fragment_instance_states_[i]->error_log()); - } + for (FragmentInstanceState* state: fragment_instance_states_) { + lock_guard<mutex> l(*state->lock()); + if (state->error_log()->size() > 0) MergeErrorMaps(&merged, *state->error_log()); } return PrintErrorMapToString(merged); } -void Coordinator::SetExecPlanFragmentParams(QuerySchedule& schedule, +void Coordinator::MtSetExecPlanFragmentParams( + const FInstanceExecParams& params, TExecPlanFragmentParams* rpc_params) { + rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1); + rpc_params->__set_query_ctx(query_ctx_); + + TPlanFragmentCtx fragment_ctx; + TPlanFragmentInstanceCtx fragment_instance_ctx; + + fragment_ctx.__set_fragment(params.fragment()); + // TODO: Remove filters that weren't selected during filter routing table construction. + SetExecPlanDescriptorTable(params.fragment(), rpc_params); + + fragment_instance_ctx.__set_request_pool(schedule_.request_pool()); + fragment_instance_ctx.__set_per_node_scan_ranges(params.per_node_scan_ranges); + fragment_instance_ctx.__set_per_exch_num_senders( + params.fragment_exec_params.per_exch_num_senders); + fragment_instance_ctx.__set_destinations( + params.fragment_exec_params.destinations); + fragment_instance_ctx.__set_sender_id(params.sender_id); + fragment_instance_ctx.fragment_instance_id = params.instance_id; + fragment_instance_ctx.per_fragment_instance_idx = params.per_fragment_instance_idx; + rpc_params->__set_fragment_ctx(fragment_ctx); + rpc_params->__set_fragment_instance_ctx(fragment_instance_ctx); +} + +void Coordinator::SetExecPlanFragmentParams( const TPlanFragment& fragment, const FragmentExecParams& params, - int instance_state_idx, int fragment_idx, int fragment_instance_idx, - const TNetworkAddress& coord, TExecPlanFragmentParams* rpc_params) { + int per_fragment_instance_idx, TExecPlanFragmentParams* rpc_params) { rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1); rpc_params->__set_query_ctx(query_ctx_); @@ -1870,6 +2140,7 @@ void Coordinator::SetExecPlanFragmentParams(QuerySchedule& schedule, TPlanFragmentInstanceCtx fragment_instance_ctx; fragment_ctx.__set_fragment(fragment); + int instance_state_idx = GetInstanceIdx(params.instance_ids[per_fragment_instance_idx]); // Remove filters that weren't selected during filter routing table construction. if (filter_mode_ != TRuntimeFilterMode::OFF) { for (TPlanNode& plan_node: rpc_params->fragment_ctx.fragment.plan.nodes) { @@ -1879,10 +2150,10 @@ void Coordinator::SetExecPlanFragmentParams(QuerySchedule& schedule, FilterRoutingTable::iterator filter_it = filter_routing_table_.find(desc.filter_id); if (filter_it == filter_routing_table_.end()) continue; - FilterState* f = &filter_it->second; + const FilterState& f = filter_it->second; if (plan_node.__isset.hash_join_node) { - if (f->src_fragment_instance_idxs()->find(instance_state_idx) == - f->src_fragment_instance_idxs()->end()) { + if (f.src_fragment_instance_state_idxs().find(instance_state_idx) == + f.src_fragment_instance_state_idxs().end()) { DCHECK(desc.is_broadcast_join); continue; } @@ -1898,22 +2169,22 @@ void Coordinator::SetExecPlanFragmentParams(QuerySchedule& schedule, } SetExecPlanDescriptorTable(fragment, rpc_params); - TNetworkAddress exec_host = params.hosts[fragment_instance_idx]; + TNetworkAddress exec_host = params.hosts[per_fragment_instance_idx]; FragmentScanRangeAssignment::const_iterator it = params.scan_range_assignment.find(exec_host); // Scan ranges may not always be set, so use an empty structure if so. const PerNodeScanRanges& scan_ranges = (it != params.scan_range_assignment.end()) ? it->second : PerNodeScanRanges(); - fragment_ctx.num_fragment_instances = params.instance_ids.size(); - fragment_instance_ctx.__set_request_pool(schedule.request_pool()); + fragment_instance_ctx.__set_request_pool(schedule_.request_pool()); fragment_instance_ctx.__set_per_node_scan_ranges(scan_ranges); fragment_instance_ctx.__set_per_exch_num_senders(params.per_exch_num_senders); fragment_instance_ctx.__set_destinations(params.destinations); - fragment_instance_ctx.__set_sender_id(params.sender_id_base + fragment_instance_idx); - fragment_instance_ctx.fragment_instance_id = params.instance_ids[fragment_instance_idx]; - fragment_instance_ctx.fragment_instance_idx = fragment_instance_idx; - fragment_instance_ctx.instance_state_idx = instance_state_idx; + fragment_instance_ctx.__set_sender_id( + params.sender_id_base + per_fragment_instance_idx); + fragment_instance_ctx.fragment_instance_id = + params.instance_ids[per_fragment_instance_idx]; + fragment_instance_ctx.per_fragment_instance_idx = per_fragment_instance_idx; rpc_params->__set_fragment_ctx(fragment_ctx); rpc_params->__set_fragment_instance_ctx(fragment_instance_ctx); } @@ -1987,10 +2258,16 @@ void Coordinator::SetExecPlanDescriptorTable(const TPlanFragment& fragment, rpc_params->query_ctx.__set_desc_tbl(thrift_desc_tbl); } + namespace { -void DistributeFilters(shared_ptr<TPublishFilterParams> params, TNetworkAddress impalad, - TUniqueId fragment_instance_id) { +// Make a PublishFilter rpc to 'impalad' for given fragment_instance_id +// and params. +// This takes by-value parameters because we cannot guarantee that the originating +// coordinator won't be destroyed while this executes. +// TODO: switch to references when we fix the lifecycle problems of coordinators. +void DistributeFilters(shared_ptr<TPublishFilterParams> params, + TNetworkAddress impalad, TUniqueId fragment_instance_id) { Status status; ImpalaBackendConnection backend_client( ExecEnv::GetInstance()->impalad_client_cache(), impalad, &status); @@ -2026,7 +2303,7 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) { // Make a 'master' copy that will be shared by all concurrent delivery RPC attempts. shared_ptr<TPublishFilterParams> rpc_params(new TPublishFilterParams()); - unordered_set<int32_t> target_fragment_instance_idxs; + unordered_set<int> target_fragment_instance_state_idxs; { lock_guard<SpinLock> l(filter_lock_); FilterRoutingTable::iterator it = filter_routing_table_.find(params.filter_id); @@ -2060,12 +2337,13 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) { // No more updates are pending on this filter ID. Create a distribution payload and // offer it to the queue. - for (FilterTarget target: *state->targets()) { + for (const FilterTarget& target: *state->targets()) { // Don't publish the filter to targets that are in the same fragment as the join // that produced it. if (target.is_local) continue; - target_fragment_instance_idxs.insert(target.fragment_instance_idxs.begin(), - target.fragment_instance_idxs.end()); + target_fragment_instance_state_idxs.insert( + target.fragment_instance_state_idxs.begin(), + target.fragment_instance_state_idxs.end()); } // Assign outgoing bloom filter. @@ -2088,11 +2366,15 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) { rpc_params->filter_id = params.filter_id; - for (int32_t target_idx: target_fragment_instance_idxs) { + for (int target_idx: target_fragment_instance_state_idxs) { FragmentInstanceState* fragment_inst = fragment_instance_states_[target_idx]; DCHECK(fragment_inst != NULL) << "Missing fragment instance: " << target_idx; exec_env_->rpc_pool()->Offer(bind<void>(DistributeFilters, rpc_params, fragment_inst->impalad_address(), fragment_inst->fragment_instance_id())); + // TODO: switch back to the following once we fixed the lifecycle + // problems of Coordinator + //std::cref(fragment_inst->impalad_address()), + //std::cref(fragment_inst->fragment_instance_id()))); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/runtime/coordinator.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h index 617ccb9..bb67377 100644 --- a/be/src/runtime/coordinator.h +++ b/be/src/runtime/coordinator.h @@ -66,6 +66,7 @@ class TPlanExecRequest; class TRuntimeProfileTree; class RuntimeProfile; class TablePrinter; +class TPlanFragment; struct DebugOptions; @@ -90,9 +91,16 @@ struct DebugOptions; // /// The implementation ensures that setting an overall error status and initiating /// cancellation of local and all remote fragments is atomic. +/// +/// TODO: move into separate subdirectory and move nested classes into separate files +/// and unnest them +/// +/// TODO: remove all data structures and functions that are superceded by their +/// multi-threaded counterpart and remove the "Mt" prefix with which the latter +/// is currently marked class Coordinator { public: - Coordinator(const TQueryOptions& query_options, ExecEnv* exec_env, + Coordinator(const QuerySchedule& schedule, ExecEnv* exec_env, RuntimeProfile::EventSequence* events); ~Coordinator(); @@ -103,7 +111,7 @@ class Coordinator { /// Populates and prepares output_expr_ctxs from the coordinator's fragment if there is /// one, and LLVM optimizes them together with the fragment's other exprs. /// A call to Exec() must precede all other member function calls. - Status Exec(QuerySchedule& schedule, std::vector<ExprContext*>* output_expr_ctxs); + Status Exec(std::vector<ExprContext*>* output_expr_ctxs); /// Blocks until result rows are ready to be retrieved via GetNext(), or, if the /// query doesn't return rows, until the query finishes or is cancelled. @@ -207,6 +215,7 @@ class Coordinator { boost::accumulators::tag::variance> > SummaryStats; + const QuerySchedule schedule_; ExecEnv* exec_env_; TUniqueId query_id_; @@ -229,7 +238,9 @@ class Coordinator { CounterMap scan_ranges_complete_counters; }; - /// FragmentInstanceState owned by obj_pool() + /// FragmentInstanceStates for all fragment instances, including that of the coordinator + /// fragment. All elements are non-nullptr. Owned by obj_pool(). Filled in + /// PrepareCoordFragment() and StartRemoteFragments()/MtStartRemoteFInstances(). std::vector<FragmentInstanceState*> fragment_instance_states_; /// True if the query needs a post-execution step to tidy up @@ -249,6 +260,9 @@ class Coordinator { /// Protects all fields below. This is held while making RPCs, so this lock should /// only be acquired if the acquiring thread is prepared to wait for a significant /// time. + /// Lock ordering is + /// 1. lock_ + /// 2. FragmentInstanceState::lock_ boost::mutex lock_; /// Overall status of the entire query; set to the first reported fragment error @@ -286,10 +300,10 @@ class Coordinator { /// Number of remote fragments that have completed int num_remote_fragements_complete_; - /// If there is no coordinator fragment, Wait simply waits until all - /// backends report completion by notifying on backend_completion_cv_. + /// If there is no coordinator fragment, Wait() simply waits until all + /// backends report completion by notifying on instance_completion_cv_. /// Tied to lock_. - boost::condition_variable backend_completion_cv_; + boost::condition_variable instance_completion_cv_; /// Count of the number of backends for which done != true. When this /// hits 0, any Wait()'ing thread is notified @@ -349,9 +363,12 @@ class Coordinator { /// Execution rates for instances of this fragment SummaryStats rates; + + PerFragmentProfileData() + : averaged_profile(nullptr), num_instances(-1), root_profile(nullptr) {} }; - /// This is indexed by fragment_idx. + /// This is indexed by fragment idx (TPlanFragment.idx). /// This array is only modified at coordinator startup and query completion and /// does not need locks. std::vector<PerFragmentProfileData> fragment_profiles_; @@ -374,7 +391,9 @@ class Coordinator { TPlanNodeId node_id; bool is_local; bool is_bound_by_partition_columns; - boost::unordered_set<int> fragment_instance_idxs; + + // indices into fragment_instance_states_ + boost::unordered_set<int> fragment_instance_state_idxs; FilterTarget(const TRuntimeFilterTargetDesc& tFilterTarget) { node_id = tFilterTarget.node_id; @@ -415,24 +434,22 @@ class Coordinator { void MarkFilterRoutingTableComplete(); /// Fill in rpc_params based on parameters. - /// 'instance_state_idx' is the index of the fragment instance state in - /// fragment_instance_states_. - /// 'fragment_idx' is the 0-based query-wide ordinal of the fragment of which it is an - /// instance. - /// 'fragment_instance_idx' is the 0-based ordinal of this particular fragment + /// 'per_fragment_instance_idx' is the 0-based ordinal of this particular fragment /// instance within its fragment. - void SetExecPlanFragmentParams(QuerySchedule& schedule, const TPlanFragment& fragment, - const FragmentExecParams& params, int instance_state_idx, int fragment_idx, - int fragment_instance_idx, const TNetworkAddress& coord, + void SetExecPlanFragmentParams(const TPlanFragment& fragment, + const FragmentExecParams& params, int per_fragment_instance_idx, TExecPlanFragmentParams* rpc_params); + void MtSetExecPlanFragmentParams( + const FInstanceExecParams& params, TExecPlanFragmentParams* rpc_params); /// Wrapper for ExecPlanFragment() RPC. This function will be called in parallel from /// multiple threads. Creates a new FragmentInstanceState and registers it in /// fragment_instance_states_, then calls RPC to issue fragment on remote impalad. - void ExecRemoteFragment(const FragmentExecParams* fragment_exec_params, - const TPlanFragment* plan_fragment, DebugOptions* debug_options, - QuerySchedule* schedule, int instance_state_idx, int fragment_idx, + void ExecRemoteFragment(const FragmentExecParams& fragment_exec_params, + const TPlanFragment& plan_fragment, DebugOptions* debug_options, int fragment_instance_idx); + void MtExecRemoteFInstance( + const FInstanceExecParams& exec_params, const DebugOptions* debug_options); /// Determine fragment number, given fragment id. int GetFragmentNum(const TUniqueId& fragment_id); @@ -441,21 +458,10 @@ class Coordinator { /// Attaches split size summary to the appropriate runtime profile void PrintFragmentInstanceInfo(); - /// Create aggregate counters for all scan nodes in any of the fragments - void CreateAggregateCounters(const std::vector<TPlanFragment>& fragments); - /// Collect scan node counters from the profile. /// Assumes lock protecting profile and result is held. void CollectScanNodeCounters(RuntimeProfile*, FragmentInstanceCounters* result); - /// Derived counter function: aggregates throughput for node_id across all fragment - /// instances (id needs to be for a ScanNode). - int64_t ComputeTotalThroughput(int node_id); - - /// Derived counter function: aggregates total completed scan ranges for node_id - /// across all fragment instances (id needs to be for a ScanNode). - int64_t ComputeTotalScanRangesComplete(int node_id); - /// Runs cancel logic. Assumes that lock_ is held. void CancelInternal(); @@ -475,9 +481,9 @@ class Coordinator { /// Returns only when either all fragment instances have reported success or the query /// is in error. Returns the status of the query. /// It is safe to call this concurrently, but any calls must be made only after Exec(). - /// WaitForAllBackends may be called before Wait(), but note that Wait() guarantees + /// WaitForAllInstances may be called before Wait(), but note that Wait() guarantees /// that any coordinator fragment has finished, which this method does not. - Status WaitForAllBackends(); + Status WaitForAllInstances(); /// Perform any post-query cleanup required. Called by Wait() only after all fragment /// instances have returned, or if the query has failed, in which case it only cleans up @@ -490,17 +496,17 @@ class Coordinator { /// Initializes the structures in runtime profile and exec_summary_. Must be /// called before RPCs to start remote fragments. void InitExecProfile(const TQueryExecRequest& request); + void MtInitExecProfiles(); + + /// Initialize the structures to collect execution summary of every plan node + /// (exec_summary_ and plan_node_id_to_summary_map_) + void MtInitExecSummary(); /// Update fragment profile information from a fragment instance state. - /// This is called repeatedly from UpdateFragmentExecStatus(), - /// and also at the end of the query from ReportQuerySummary(). - /// This method calls UpdateAverage() and AddChild(), which obtain their own locks - /// on the instance state. void UpdateAverageProfile(FragmentInstanceState* fragment_instance_state); /// Compute the summary stats (completion_time and rates) - /// for an individual fragment_profile_ based on the specified backed_exec_state. - /// Called only from ReportQuerySummary() below. + /// for an individual fragment_profile_ based on the specified instance state. void ComputeFragmentSummaryStats(FragmentInstanceState* fragment_instance_state); /// Outputs aggregate query profile summary. This is assumed to be called at the end of @@ -509,7 +515,7 @@ class Coordinator { /// Populates the summary execution stats from the profile. Can only be called when the /// query is done. - void UpdateExecSummary(int fragment_idx, int instance_idx, RuntimeProfile* profile); + void UpdateExecSummary(const FragmentInstanceState& instance_state); /// Determines what the permissions of directories created by INSERT statements should /// be if permission inheritance is enabled. Populates a map from all prefixes of @@ -532,10 +538,23 @@ class Coordinator { /// SubplanNode with respect to setting collection-slots to NULL. void ValidateCollectionSlots(RowBatch* batch); + /// Prepare coordinator fragment for execution (update filter routing table, + /// prepare executor, set up output exprs) and create its FragmentInstanceState. + Status PrepareCoordFragment(std::vector<ExprContext*>* output_expr_ctxs); + /// Starts all remote fragments contained in the schedule by issuing RPCs in parallel, - /// and then waiting for all of the RPCs to complete. Returns an error if there was any - /// error starting the fragments. - Status StartRemoteFragments(QuerySchedule* schedule); + /// and then waiting for all of the RPCs to complete. + void StartRemoteFragments(); + + /// Starts all remote fragment instances contained in the schedule by issuing RPCs in + /// parallel and then waiting for all of the RPCs to complete. Also sets up and + /// registers the state for all non-coordinator fragment instance. + void MtStartRemoteFInstances(); + + /// Calls CancelInternal() and returns an error if there was any error starting the + /// fragments. + /// Also updates query_profile_ with the startup latency histogram. + Status FinishRemoteInstanceStartup(); /// Build the filter routing table by iterating over all plan nodes and collecting the /// filters that they either produce or consume. The source and target fragment http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/runtime/runtime-filter-bank.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc index a25bf8d..53755f9 100644 --- a/be/src/runtime/runtime-filter-bank.cc +++ b/be/src/runtime/runtime-filter-bank.cc @@ -81,11 +81,13 @@ RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filte } else { if (consumed_filters_.find(filter_desc.filter_id) == consumed_filters_.end()) { consumed_filters_[filter_desc.filter_id] = ret; + VLOG_QUERY << "registered consumer filter " << filter_desc.filter_id; } else { // The filter has already been registered in this filter bank by another // target node. DCHECK_GT(filter_desc.targets.size(), 1); ret = consumed_filters_[filter_desc.filter_id]; + VLOG_QUERY << "re-registered consumer filter " << filter_desc.filter_id; } } return ret;
