IMPALA-4314: Standardize on MT-related data structures This removes the data structures that were "superceded" in IMPALA-3903 and changes all control flow to utilize the new data structures. The new data structures are renamed to remove the "Mt" prefix.
Change-Id: I465d0e15e2cf17cafe4c747d34c8f595d3645151 Reviewed-on: http://gerrit.cloudera.org:8080/4853 Reviewed-by: Marcel Kornacker <[email protected]> Tested-by: Tim Armstrong <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0d857237 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0d857237 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0d857237 Branch: refs/heads/master Commit: 0d857237a8d5231caf9af4c30d4134b151cd1e0d Parents: 29faca5 Author: Marcel Kornacker <[email protected]> Authored: Tue Oct 4 20:40:18 2016 -0400 Committer: Tim Armstrong <[email protected]> Committed: Mon Oct 31 16:03:32 2016 +0000 ---------------------------------------------------------------------- be/src/benchmarks/expr-benchmark.cc | 2 +- be/src/runtime/coordinator.cc | 415 ++++-------------- be/src/runtime/coordinator.h | 71 +--- be/src/scheduling/query-schedule.cc | 175 ++++---- be/src/scheduling/query-schedule.h | 77 ++-- be/src/scheduling/simple-scheduler-test-util.cc | 10 +- be/src/scheduling/simple-scheduler-test-util.h | 14 +- be/src/scheduling/simple-scheduler.cc | 417 +++++++------------ be/src/scheduling/simple-scheduler.h | 81 ++-- be/src/service/impala-http-handler.cc | 7 +- be/src/service/impala-server.cc | 6 +- be/src/service/query-exec-state.cc | 8 +- common/thrift/Frontend.thrift | 50 +-- common/thrift/Planner.thrift | 4 +- .../impala/planner/DataSourceScanNode.java | 4 +- .../apache/impala/planner/HBaseScanNode.java | 6 +- .../org/apache/impala/planner/HdfsScanNode.java | 6 +- .../org/apache/impala/planner/KuduScanNode.java | 4 +- .../java/org/apache/impala/planner/Planner.java | 4 +- .../org/apache/impala/planner/ScanNode.java | 6 +- .../org/apache/impala/service/Frontend.java | 187 ++------- .../apache/impala/planner/PlannerTestBase.java | 187 +++++---- .../queries/PlannerTest/mt-dop-validation.test | 50 +-- 23 files changed, 627 insertions(+), 1164 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/benchmarks/expr-benchmark.cc ---------------------------------------------------------------------- diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc index b95e6d7..444df3a 100644 --- a/be/src/benchmarks/expr-benchmark.cc +++ b/be/src/benchmarks/expr-benchmark.cc @@ -111,7 +111,7 @@ MemTracker tracker; // constant query static Status PrepareSelectList(const TExecRequest& request, ExprContext** ctx) { const TQueryExecRequest& query_request = request.query_exec_request; - vector<TExpr> texprs = query_request.fragments[0].output_exprs; + vector<TExpr> texprs = query_request.plan_exec_info[0].fragments[0].output_exprs; DCHECK_EQ(texprs.size(), 1); RETURN_IF_ERROR(Expr::CreateExprTree(&pool, texprs[0], ctx)); RETURN_IF_ERROR((*ctx)->Prepare(NULL, RowDescriptor(), &tracker)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 7df0bf0..f719e98 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -125,39 +125,17 @@ struct DebugOptions { /// - updates through UpdateFragmentExecStatus() class Coordinator::FragmentInstanceState { public: - // TODO-MT: remove this c'tor - FragmentInstanceState(FragmentIdx fragment_idx, const FragmentExecParams& params, - int per_fragment_instance_idx, ObjectPool* obj_pool) - : fragment_instance_id_(params.instance_ids[per_fragment_instance_idx]), - fragment_idx_(fragment_idx), - per_fragment_instance_idx_(per_fragment_instance_idx), - impalad_address_(params.hosts[per_fragment_instance_idx]), - total_split_size_(0), - rpc_sent_(false), - done_(false), - profile_created_(false), - profile_(NULL), - total_ranges_complete_(0), - rpc_latency_(0) { - const string& profile_name = Substitute("Instance $0 (host=$1)", - PrintId(fragment_instance_id_), lexical_cast<string>(impalad_address_)); - profile_ = obj_pool->Add(new RuntimeProfile(obj_pool, profile_name)); - } - FragmentInstanceState(const FInstanceExecParams& params, ObjectPool* obj_pool) - : fragment_instance_id_(params.instance_id), - fragment_idx_(params.fragment().idx), - per_fragment_instance_idx_(params.per_fragment_instance_idx), - impalad_address_(params.host), + : exec_params_(params), total_split_size_(0), + profile_(nullptr), + total_ranges_complete_(0), + rpc_latency_(0), rpc_sent_(false), done_(false), - profile_created_(false), - profile_(NULL), - total_ranges_complete_(0), - rpc_latency_(0) { + profile_created_(false) { const string& profile_name = Substitute("Instance $0 (host=$1)", - PrintId(fragment_instance_id_), lexical_cast<string>(impalad_address_)); + PrintId(params.instance_id), lexical_cast<string>(params.host)); profile_ = obj_pool->Add(new RuntimeProfile(obj_pool, profile_name)); } @@ -182,13 +160,13 @@ class Coordinator::FragmentInstanceState { int64_t UpdateNumScanRangesCompleted(); // The following getters do not require lock() to be held. - const TUniqueId& fragment_instance_id() const { return fragment_instance_id_; } - FragmentIdx fragment_idx() const { return fragment_idx_; } + const TUniqueId& fragment_instance_id() const { return exec_params_.instance_id; } + FragmentIdx fragment_idx() const { return exec_params_.fragment().idx; } MonotonicStopWatch* stopwatch() { return &stopwatch_; } - const TNetworkAddress& impalad_address() const { return impalad_address_; } + const TNetworkAddress& impalad_address() const { return exec_params_.host; } int64_t total_split_size() const { return total_split_size_; } bool done() const { return done_; } - int per_fragment_instance_idx() const { return per_fragment_instance_idx_; } + int per_fragment_instance_idx() const { return exec_params_.per_fragment_instance_idx; } bool rpc_sent() const { return rpc_sent_; } int64_t rpc_latency() const { return rpc_latency_; } @@ -218,22 +196,11 @@ class Coordinator::FragmentInstanceState { } private: - /// The unique ID of this instance of this fragment (there may be many instance of the - /// same fragment, but this ID uniquely identifies this FragmentInstanceState). - TUniqueId fragment_instance_id_; - - // Same as TPlanFragment.idx - FragmentIdx fragment_idx_; - - /// range: 0..<# instances of this fragment>-1 - int per_fragment_instance_idx_; + const FInstanceExecParams& exec_params_; /// Wall clock timer for this fragment. MonotonicStopWatch stopwatch_; - /// Address of ImpalaInternalService this fragment is running on. - const TNetworkAddress impalad_address_; - /// Summed across all splits; in bytes. int64_t total_split_size_; @@ -246,16 +213,6 @@ class Coordinator::FragmentInstanceState { /// been initiated; either way, execution must not be cancelled. Status status_; - /// If true, ExecPlanFragment() rpc has been sent - even if it was not determined to be - /// successful. - bool rpc_sent_; - - /// If true, execution terminated; do not cancel in that case. - bool done_; - - /// True after the first call to profile->Update() - bool profile_created_; - /// Owned by coordinator object pool provided in the c'tor RuntimeProfile* profile_; @@ -270,6 +227,32 @@ class Coordinator::FragmentInstanceState { /// Time, in ms, that it took to execute the ExecRemoteFragment() RPC. int64_t rpc_latency_; + + /// If true, ExecPlanFragment() rpc has been sent - even if it was not determined to be + /// successful. + bool rpc_sent_; + + /// If true, execution terminated; do not cancel in that case. + bool done_; + + /// True after the first call to profile->Update() + bool profile_created_; +}; + +/// Represents a runtime filter target. +struct Coordinator::FilterTarget { + TPlanNodeId node_id; + bool is_local; + bool is_bound_by_partition_columns; + + // indices into fragment_instance_states_ + unordered_set<int> fragment_instance_state_idxs; + + FilterTarget(const TRuntimeFilterTargetDesc& tFilterTarget) { + node_id = tFilterTarget.node_id; + is_bound_by_partition_columns = tFilterTarget.is_bound_by_partition_columns; + is_local = tFilterTarget.is_local_target; + } }; @@ -410,7 +393,6 @@ TExecNodePhase::type GetExecNodePhase(const string& key) { return TExecNodePhase::INVALID; } -// TODO: templatize this TDebugAction::type GetDebugAction(const string& key) { map<int, const char*>::const_iterator entry = _TDebugAction_VALUES_TO_NAMES.begin(); @@ -451,7 +433,7 @@ static void ProcessQueryOptions( Status Coordinator::Exec() { const TQueryExecRequest& request = schedule_.request(); - DCHECK(request.fragments.size() > 0 || request.mt_plan_exec_info.size() > 0); + DCHECK(request.plan_exec_info.size() > 0); needs_finalization_ = request.__isset.finalize_params; if (needs_finalization_) finalize_params_ = request.finalize_params; @@ -474,6 +456,10 @@ Status Coordinator::Exec() { const string& str = Substitute("Query $0", PrintId(query_id_)); progress_.Init(str, schedule_.num_scan_ranges()); + // runtime filters not yet supported for mt execution + bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0; + if (is_mt_execution) filter_mode_ = TRuntimeFilterMode::OFF; + // to keep things simple, make async Cancel() calls wait until plan fragment // execution has been initiated, otherwise we might try to cancel fragment // execution at Impala daemons where it hasn't even started @@ -494,16 +480,9 @@ Status Coordinator::Exec() { filter_mem_tracker_.reset( new MemTracker(-1, "Runtime Filter (Coordinator)", query_mem_tracker(), false)); - // Initialize the execution profile structures. - bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0; - if (is_mt_execution) { - MtInitExecProfiles(); - MtInitExecSummary(); - MtStartFInstances(); - } else { - InitExecProfile(request); - StartFragments(); - } + InitExecProfiles(); + InitExecSummary(); + StartFInstances(); // In the error case, it's safe to return and not to get root_sink_ here to close - if // there was an error, but the coordinator fragment was successfully started, it should @@ -543,13 +522,16 @@ Status Coordinator::Exec() { return Status::OK(); } -void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes, - int num_hosts, int start_fragment_instance_state_idx) { +void Coordinator::UpdateFilterRoutingTable(const FragmentExecParams& fragment_params) { + DCHECK(schedule_.request().query_ctx.request.query_options.mt_dop == 0); + int num_hosts = fragment_params.instance_exec_params.size(); + DCHECK_GT(num_hosts, 0); DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF) << "UpdateFilterRoutingTable() called although runtime filters are disabled"; DCHECK(!filter_routing_table_complete_) << "UpdateFilterRoutingTable() called after setting filter_routing_table_complete_"; - for (const TPlanNode& plan_node: plan_nodes) { + + for (const TPlanNode& plan_node: fragment_params.fragment.plan.nodes) { if (!plan_node.__isset.runtime_filters) continue; for (const TRuntimeFilterDesc& filter: plan_node.runtime_filters) { if (filter_mode_ == TRuntimeFilterMode::LOCAL && !filter.has_local_targets) { @@ -564,10 +546,7 @@ void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes, int pending_count = filter.is_broadcast_join ? (filter.has_remote_targets ? 1 : 0) : num_hosts; f->set_pending_count(pending_count); - vector<int> src_idxs; - for (int i = 0; i < num_hosts; ++i) { - src_idxs.push_back(start_fragment_instance_state_idx + i); - } + vector<int> src_idxs = fragment_params.GetInstanceIdxs(); // If this is a broadcast join with only non-local targets, build and publish it // on MAX_BROADCAST_FILTER_PRODUCERS instances. If this is not a broadcast join @@ -586,11 +565,9 @@ void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes, if (filter_mode_ == TRuntimeFilterMode::LOCAL && !tFilterTarget.is_local_target) { continue; } + vector<int> idxs = fragment_params.GetInstanceIdxs(); FilterTarget target(tFilterTarget); - for (int i = 0; i < num_hosts; ++i) { - target.fragment_instance_state_idxs.insert( - start_fragment_instance_state_idx + i); - } + target.fragment_instance_state_idxs.insert(idxs.begin(), idxs.end()); f->targets()->push_back(target); } else { DCHECK(false) << "Unexpected plan node with runtime filters: " @@ -600,7 +577,7 @@ void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes, } } -void Coordinator::StartFragments() { +void Coordinator::StartFInstances() { int num_fragment_instances = schedule_.GetNumFragmentInstances(); DCHECK_GT(num_fragment_instances, 0); @@ -615,78 +592,28 @@ void Coordinator::StartFragments() { VLOG_QUERY << "starting " << num_fragment_instances << " fragment instances for query " << query_id_; query_events_->MarkEvent( - Substitute("Ready to start $0 fragments", num_fragment_instances)); + Substitute("Ready to start $0 fragment instances", num_fragment_instances)); + + // TODO-MT: populate the runtime filter routing table + // This requires local aggregation of filters prior to sending + // for broadcast joins in order to avoid more complicated merge logic here. - int instance_state_idx = 0; if (filter_mode_ != TRuntimeFilterMode::OFF) { + DCHECK_EQ(request.plan_exec_info.size(), 1); // Populate the runtime filter routing table. This should happen before starting the // fragment instances. This code anticipates the indices of the instance states // created later on in ExecRemoteFragment() - for (int fragment_idx = 0; fragment_idx < request.fragments.size(); ++fragment_idx) { - const FragmentExecParams& params = schedule_.exec_params()[fragment_idx]; - int num_hosts = params.hosts.size(); - DCHECK_GT(num_hosts, 0); - UpdateFilterRoutingTable(request.fragments[fragment_idx].plan.nodes, num_hosts, - instance_state_idx); - instance_state_idx += num_hosts; + for (const FragmentExecParams& fragment_params: schedule_.fragment_exec_params()) { + UpdateFilterRoutingTable(fragment_params); } MarkFilterRoutingTableComplete(); } int num_instances = 0; - // Start one fragment instance per fragment per host (number of hosts running each - // fragment may not be constant). - for (int fragment_idx = 0; fragment_idx < request.fragments.size(); ++fragment_idx) { - const FragmentExecParams& params = schedule_.exec_params()[fragment_idx]; - int num_hosts = params.hosts.size(); - DCHECK_GT(num_hosts, 0); - fragment_profiles_[fragment_idx].num_instances = num_hosts; - // Start one fragment instance for every fragment_instance required by the - // schedule. Each fragment instance is assigned a unique ID, numbered from 0, with - // instances for fragment ID 0 being assigned IDs [0 .. num_hosts(fragment_id_0)] and - // so on. - for (int fragment_instance_idx = 0; fragment_instance_idx < num_hosts; - ++fragment_instance_idx, ++num_instances) { - DebugOptions* fragment_instance_debug_options = - debug_options.IsApplicable(instance_state_idx) ? &debug_options : NULL; - exec_env_->fragment_exec_thread_pool()->Offer( - std::bind(&Coordinator::ExecRemoteFragment, this, std::cref(params), - std::cref(request.fragments[fragment_idx]), fragment_instance_debug_options, - fragment_instance_idx)); - } - } - exec_complete_barrier_->Wait(); - query_events_->MarkEvent( - Substitute("All $0 fragments instances started", num_instances)); -} - -void Coordinator::MtStartFInstances() { - int num_fragment_instances = schedule_.GetNumFragmentInstances(); - DCHECK_GT(num_fragment_instances, 0); - - fragment_instance_states_.resize(num_fragment_instances); - exec_complete_barrier_.reset(new CountingBarrier(num_fragment_instances)); - num_remaining_fragment_instances_ = num_fragment_instances; - - DebugOptions debug_options; - ProcessQueryOptions(schedule_.query_options(), &debug_options); - const TQueryExecRequest& request = schedule_.request(); - - VLOG_QUERY << "starting " << num_fragment_instances << " fragment instances for query " - << query_id_; - query_events_->MarkEvent( - Substitute("Ready to start $0 fragment instances", num_fragment_instances)); - - // TODO: populate the runtime filter routing table - // this requires local aggregation of filters prior to sending - // for broadcast joins in order to avoid more complicated merge logic here - - int num_instances = 0; - for (const MtFragmentExecParams& fragment_params: schedule_.mt_fragment_exec_params()) { - for (int i = 0; i < fragment_params.instance_exec_params.size(); - ++i, ++num_instances) { - const FInstanceExecParams& instance_params = - fragment_params.instance_exec_params[i]; + for (const FragmentExecParams& fragment_params: schedule_.fragment_exec_params()) { + num_instances += fragment_params.instance_exec_params.size(); + for (const FInstanceExecParams& instance_params: + fragment_params.instance_exec_params) { FragmentInstanceState* exec_state = obj_pool()->Add( new FragmentInstanceState(instance_params, obj_pool())); int instance_state_idx = GetInstanceIdx(instance_params.instance_id); @@ -695,7 +622,7 @@ void Coordinator::MtStartFInstances() { DebugOptions* instance_debug_options = debug_options.IsApplicable(instance_state_idx) ? &debug_options : NULL; exec_env_->fragment_exec_thread_pool()->Offer( - std::bind(&Coordinator::MtExecRemoteFInstance, + std::bind(&Coordinator::ExecRemoteFInstance, this, std::cref(instance_params), instance_debug_options)); } } @@ -1243,79 +1170,12 @@ void Coordinator::PrintFragmentInstanceInfo() { } } -void Coordinator::InitExecProfile(const TQueryExecRequest& request) { - // Initialize the structure to collect execution summary of every plan node. - fragment_profiles_.resize(request.fragments.size()); - exec_summary_.__isset.nodes = true; - for (int i = 0; i < request.fragments.size(); ++i) { - if (!request.fragments[i].__isset.plan) continue; - const TPlan& plan = request.fragments[i].plan; - int fragment_first_node_idx = exec_summary_.nodes.size(); - - for (int j = 0; j < plan.nodes.size(); ++j) { - TPlanNodeExecSummary node; - node.node_id = plan.nodes[j].node_id; - node.fragment_idx = i; - node.label = plan.nodes[j].label; - node.__set_label_detail(plan.nodes[j].label_detail); - node.num_children = plan.nodes[j].num_children; - - if (plan.nodes[j].__isset.estimated_stats) { - node.__set_estimated_stats(plan.nodes[j].estimated_stats); - } - - plan_node_id_to_summary_map_[plan.nodes[j].node_id] = exec_summary_.nodes.size(); - exec_summary_.nodes.push_back(node); - } - - if (request.fragments[i].__isset.output_sink && - request.fragments[i].output_sink.type == TDataSinkType::DATA_STREAM_SINK) { - const TDataStreamSink& sink = request.fragments[i].output_sink.stream_sink; - int exch_idx = plan_node_id_to_summary_map_[sink.dest_node_id]; - if (sink.output_partition.type == TPartitionType::UNPARTITIONED) { - exec_summary_.nodes[exch_idx].__set_is_broadcast(true); - } - exec_summary_.__isset.exch_to_sender_map = true; - exec_summary_.exch_to_sender_map[exch_idx] = fragment_first_node_idx; - } - } - - // Initialize the runtime profile structure. This adds the per fragment average - // profiles followed by the per fragment instance profiles. - for (int i = 0; i < request.fragments.size(); ++i) { - // Insert the avg profiles in ascending fragment number order. If there is a - // coordinator fragment, it's been placed in fragment_profiles_[0].averaged_profile, - // ensuring that this code will put the first averaged profile immediately after - // it. If there is no coordinator fragment, the first averaged profile will be - // inserted as the first child of query_profile_, and then all other averaged - // fragments will follow. - bool is_coordinator_fragment = (i == 0 && schedule_.GetCoordFragment() != nullptr); - string profile_name = - Substitute(is_coordinator_fragment ? "Coordinator Fragment $0" : "Fragment $0", - request.fragments[i].display_name); - fragment_profiles_[i].root_profile = - obj_pool()->Add(new RuntimeProfile(obj_pool(), profile_name)); - if (is_coordinator_fragment) { - fragment_profiles_[i].averaged_profile = nullptr; - } else { - fragment_profiles_[i].averaged_profile = obj_pool()->Add(new RuntimeProfile( - obj_pool(), - Substitute("Averaged Fragment $0", request.fragments[i].display_name), true)); - query_profile_->AddChild(fragment_profiles_[i].averaged_profile, true, - (i > 0) ? fragment_profiles_[i - 1].averaged_profile : NULL); - } - // Note: we don't start the wall timer here for the fragment - // profile; it's uninteresting and misleading. - query_profile_->AddChild(fragment_profiles_[i].root_profile); - } -} - -void Coordinator::MtInitExecSummary() { +void Coordinator::InitExecSummary() { const TQueryExecRequest& request = schedule_.request(); // init exec_summary_.{nodes, exch_to_sender_map} exec_summary_.__isset.nodes = true; DCHECK(exec_summary_.nodes.empty()); - for (const TPlanExecInfo& plan_exec_info: request.mt_plan_exec_info) { + for (const TPlanExecInfo& plan_exec_info: request.plan_exec_info) { for (const TPlanFragment& fragment: plan_exec_info.fragments) { if (!fragment.__isset.plan) continue; @@ -1354,7 +1214,7 @@ void Coordinator::MtInitExecSummary() { } } -void Coordinator::MtInitExecProfiles() { +void Coordinator::InitExecProfiles() { const TQueryExecRequest& request = schedule_.request(); vector<const TPlanFragment*> fragments; schedule_.GetTPlanFragments(&fragments); @@ -1370,6 +1230,7 @@ void Coordinator::MtInitExecProfiles() { PerFragmentProfileData* data = &fragment_profiles_[fragment->idx]; data->num_instances = schedule_.GetFragmentExecParams(fragment->idx).instance_exec_params.size(); + // TODO-MT: stop special-casing the coordinator fragment if (fragment != coord_fragment) { data->averaged_profile = obj_pool()->Add(new RuntimeProfile( obj_pool(), Substitute("Averaged Fragment $0", fragment->display_name), true)); @@ -1383,7 +1244,6 @@ void Coordinator::MtInitExecProfiles() { } } - void Coordinator::CollectScanNodeCounters(RuntimeProfile* profile, FragmentInstanceCounters* counters) { vector<RuntimeProfile*> children; @@ -1407,11 +1267,11 @@ void Coordinator::CollectScanNodeCounters(RuntimeProfile* profile, } } -void Coordinator::MtExecRemoteFInstance( +void Coordinator::ExecRemoteFInstance( const FInstanceExecParams& exec_params, const DebugOptions* debug_options) { NotifyBarrierOnExit notifier(exec_complete_barrier_.get()); TExecPlanFragmentParams rpc_params; - MtSetExecPlanFragmentParams(exec_params, &rpc_params); + SetExecPlanFragmentParams(exec_params, &rpc_params); if (debug_options != NULL) { rpc_params.fragment_instance_ctx.__set_debug_node_id(debug_options->node_id); rpc_params.fragment_instance_ctx.__set_debug_action(debug_options->action); @@ -1467,70 +1327,6 @@ void Coordinator::MtExecRemoteFInstance( << " instance_id=" << PrintId(exec_state->fragment_instance_id()); } -void Coordinator::ExecRemoteFragment(const FragmentExecParams& fragment_exec_params, - const TPlanFragment& plan_fragment, DebugOptions* debug_options, - int fragment_instance_idx) { - NotifyBarrierOnExit notifier(exec_complete_barrier_.get()); - TExecPlanFragmentParams rpc_params; - SetExecPlanFragmentParams( - plan_fragment, fragment_exec_params, fragment_instance_idx, &rpc_params); - if (debug_options != NULL) { - rpc_params.fragment_instance_ctx.__set_debug_node_id(debug_options->node_id); - rpc_params.fragment_instance_ctx.__set_debug_action(debug_options->action); - rpc_params.fragment_instance_ctx.__set_debug_phase(debug_options->phase); - } - FragmentInstanceState* exec_state = obj_pool()->Add( - new FragmentInstanceState( - plan_fragment.idx, fragment_exec_params, fragment_instance_idx, obj_pool())); - exec_state->ComputeTotalSplitSize( - rpc_params.fragment_instance_ctx.per_node_scan_ranges); - int instance_state_idx = GetInstanceIdx(exec_state->fragment_instance_id()); - fragment_instance_states_[instance_state_idx] = exec_state; - VLOG_FILE << "making rpc: ExecPlanFragment" - << " instance_id=" << PrintId(exec_state->fragment_instance_id()) - << " host=" << exec_state->impalad_address(); - - int64_t start = MonotonicMillis(); - - Status client_connect_status; - ImpalaBackendConnection backend_client(exec_env_->impalad_client_cache(), - exec_state->impalad_address(), &client_connect_status); - if (!client_connect_status.ok()) { - exec_state->SetInitialStatus(client_connect_status, true); - return; - } - - TExecPlanFragmentResult thrift_result; - Status rpc_status = backend_client.DoRpc(&ImpalaBackendClient::ExecPlanFragment, - rpc_params, &thrift_result); - - exec_state->set_rpc_latency(MonotonicMillis() - start); - - const string ERR_TEMPLATE = "ExecPlanRequest rpc instance_id=$0 failed: $1"; - - if (!rpc_status.ok()) { - const string& err_msg = - Substitute(ERR_TEMPLATE, PrintId(exec_state->fragment_instance_id()), - rpc_status.msg().msg()); - VLOG_QUERY << err_msg; - exec_state->SetInitialStatus(Status(err_msg), true); - return; - } - - Status exec_plan_status = Status(thrift_result.status); - if (!exec_plan_status.ok()) { - const string& err_msg = - Substitute(ERR_TEMPLATE, PrintId(exec_state->fragment_instance_id()), - exec_plan_status.msg().GetFullMessageDetails()); - VLOG_QUERY << err_msg; - exec_state->SetInitialStatus(Status(err_msg), true); - return; - } - - exec_state->SetInitialStatus(Status::OK(), true); - return; -} - void Coordinator::Cancel(const Status* cause) { lock_guard<mutex> l(lock_); // if the query status indicates an error, cancellation has already been initiated @@ -1834,14 +1630,6 @@ void Coordinator::UpdateExecSummary(const FragmentInstanceState& instance_state) TPlanNodeExecSummary& exec_summary = exec_summary_.nodes[plan_node_id_to_summary_map_[node_id]]; - if (exec_summary.exec_stats.empty()) { - // First time, make an exec_stats for each instance this plan node is running on. - // TODO-MT: remove this and initialize all runtime state prior to starting - // instances - DCHECK_LT(instance_state.fragment_idx(), fragment_profiles_.size()); - exec_summary.exec_stats.resize( - fragment_profiles_[instance_state.fragment_idx()].num_instances); - } DCHECK_LT(instance_state.per_fragment_instance_idx(), exec_summary.exec_stats.size()); DCHECK_EQ(fragment_profiles_[instance_state.fragment_idx()].num_instances, exec_summary.exec_stats.size()); @@ -1956,7 +1744,7 @@ string Coordinator::GetErrorLog() { return PrintErrorMapToString(merged); } -void Coordinator::MtSetExecPlanFragmentParams( +void Coordinator::SetExecPlanFragmentParams( const FInstanceExecParams& params, TExecPlanFragmentParams* rpc_params) { rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1); rpc_params->__set_query_ctx(query_ctx_); @@ -1965,35 +1753,12 @@ void Coordinator::MtSetExecPlanFragmentParams( TPlanFragmentInstanceCtx fragment_instance_ctx; fragment_ctx.__set_fragment(params.fragment()); - // TODO: Remove filters that weren't selected during filter routing table construction. SetExecPlanDescriptorTable(params.fragment(), rpc_params); - fragment_instance_ctx.__set_request_pool(schedule_.request_pool()); - fragment_instance_ctx.__set_per_node_scan_ranges(params.per_node_scan_ranges); - fragment_instance_ctx.__set_per_exch_num_senders( - params.fragment_exec_params.per_exch_num_senders); - fragment_instance_ctx.__set_destinations( - params.fragment_exec_params.destinations); - fragment_instance_ctx.__set_sender_id(params.sender_id); - fragment_instance_ctx.fragment_instance_id = params.instance_id; - fragment_instance_ctx.per_fragment_instance_idx = params.per_fragment_instance_idx; - rpc_params->__set_fragment_ctx(fragment_ctx); - rpc_params->__set_fragment_instance_ctx(fragment_instance_ctx); -} - -void Coordinator::SetExecPlanFragmentParams( - const TPlanFragment& fragment, const FragmentExecParams& params, - int per_fragment_instance_idx, TExecPlanFragmentParams* rpc_params) { - rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1); - rpc_params->__set_query_ctx(query_ctx_); - - TPlanFragmentCtx fragment_ctx; - TPlanFragmentInstanceCtx fragment_instance_ctx; - - fragment_ctx.__set_fragment(fragment); - int instance_state_idx = GetInstanceIdx(params.instance_ids[per_fragment_instance_idx]); // Remove filters that weren't selected during filter routing table construction. if (filter_mode_ != TRuntimeFilterMode::OFF) { + DCHECK(schedule_.request().query_ctx.request.query_options.mt_dop == 0); + int instance_idx = GetInstanceIdx(params.instance_id); for (TPlanNode& plan_node: rpc_params->fragment_ctx.fragment.plan.nodes) { if (plan_node.__isset.runtime_filters) { vector<TRuntimeFilterDesc> required_filters; @@ -2003,7 +1768,7 @@ void Coordinator::SetExecPlanFragmentParams( if (filter_it == filter_routing_table_.end()) continue; const FilterState& f = filter_it->second; if (plan_node.__isset.hash_join_node) { - if (f.src_fragment_instance_state_idxs().find(instance_state_idx) == + if (f.src_fragment_instance_state_idxs().find(instance_idx) == f.src_fragment_instance_state_idxs().end()) { DCHECK(desc.is_broadcast_join); continue; @@ -2018,24 +1783,16 @@ void Coordinator::SetExecPlanFragmentParams( } } } - SetExecPlanDescriptorTable(fragment, rpc_params); - - TNetworkAddress exec_host = params.hosts[per_fragment_instance_idx]; - FragmentScanRangeAssignment::const_iterator it = - params.scan_range_assignment.find(exec_host); - // Scan ranges may not always be set, so use an empty structure if so. - const PerNodeScanRanges& scan_ranges = - (it != params.scan_range_assignment.end()) ? it->second : PerNodeScanRanges(); fragment_instance_ctx.__set_request_pool(schedule_.request_pool()); - fragment_instance_ctx.__set_per_node_scan_ranges(scan_ranges); - fragment_instance_ctx.__set_per_exch_num_senders(params.per_exch_num_senders); - fragment_instance_ctx.__set_destinations(params.destinations); - fragment_instance_ctx.__set_sender_id( - params.sender_id_base + per_fragment_instance_idx); - fragment_instance_ctx.fragment_instance_id = - params.instance_ids[per_fragment_instance_idx]; - fragment_instance_ctx.per_fragment_instance_idx = per_fragment_instance_idx; + fragment_instance_ctx.__set_per_node_scan_ranges(params.per_node_scan_ranges); + fragment_instance_ctx.__set_per_exch_num_senders( + params.fragment_exec_params.per_exch_num_senders); + fragment_instance_ctx.__set_destinations( + params.fragment_exec_params.destinations); + fragment_instance_ctx.__set_sender_id(params.sender_id); + fragment_instance_ctx.fragment_instance_id = params.instance_id; + fragment_instance_ctx.per_fragment_instance_idx = params.per_fragment_instance_idx; rpc_params->__set_fragment_ctx(fragment_ctx); rpc_params->__set_fragment_instance_ctx(fragment_instance_ctx); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/runtime/coordinator.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h index 176d89d..53f00eb 100644 --- a/be/src/runtime/coordinator.h +++ b/be/src/runtime/coordinator.h @@ -100,10 +100,6 @@ struct DebugOptions; /// /// TODO: move into separate subdirectory and move nested classes into separate files /// and unnest them -/// -/// TODO: remove all data structures and functions that are superceded by their -/// multi-threaded counterpart and remove the "Mt" prefix with which the latter -/// is currently marked class Coordinator { public: Coordinator(const QuerySchedule& schedule, ExecEnv* exec_env, @@ -216,7 +212,8 @@ class Coordinator { private: class FragmentInstanceState; - struct FilterState; + struct FilterTarget; + class FilterState; /// Typedef for boost utility to compute averaged stats /// TODO: including the median doesn't compile, looks like some includes are missing @@ -253,7 +250,7 @@ class Coordinator { /// FragmentInstanceStates for all fragment instances, including that of the coordinator /// fragment. All elements are non-nullptr. Owned by obj_pool(). Filled in - /// PrepareCoordFragment() and StartRemoteFragments()/MtStartRemoteFInstances(). + /// StartFInstances(). std::vector<FragmentInstanceState*> fragment_instance_states_; /// True if the query needs a post-execution step to tidy up @@ -412,22 +409,6 @@ class Coordinator { /// returned, successfully or not. Initialised during Exec(). boost::scoped_ptr<CountingBarrier> exec_complete_barrier_; - /// Represents a runtime filter target. - struct FilterTarget { - TPlanNodeId node_id; - bool is_local; - bool is_bound_by_partition_columns; - - // indices into fragment_instance_states_ - boost::unordered_set<int> fragment_instance_state_idxs; - - FilterTarget(const TRuntimeFilterTargetDesc& tFilterTarget) { - node_id = tFilterTarget.node_id; - is_bound_by_partition_columns = tFilterTarget.is_bound_by_partition_columns; - is_local = tFilterTarget.is_local_target; - } - }; - /// Protects filter_routing_table_. SpinLock filter_lock_; @@ -444,7 +425,7 @@ class Coordinator { RuntimeProfile::Counter* filter_updates_received_; /// The filtering mode for this query. Set in constructor. - const TRuntimeFilterMode::type filter_mode_; + TRuntimeFilterMode::type filter_mode_; /// Tracks the memory consumed by runtime filters during aggregation. Child of /// query_mem_tracker_. @@ -459,22 +440,13 @@ class Coordinator { /// Sets 'filter_routing_table_complete_' and prints the table to the profile and log. void MarkFilterRoutingTableComplete(); - /// Fill in rpc_params based on parameters. - /// 'per_fragment_instance_idx' is the 0-based ordinal of this particular fragment - /// instance within its fragment. - void SetExecPlanFragmentParams(const TPlanFragment& fragment, - const FragmentExecParams& params, int per_fragment_instance_idx, - TExecPlanFragmentParams* rpc_params); - void MtSetExecPlanFragmentParams( + /// Fill in rpc_params based on params. + void SetExecPlanFragmentParams( const FInstanceExecParams& params, TExecPlanFragmentParams* rpc_params); /// Wrapper for ExecPlanFragment() RPC. This function will be called in parallel from - /// multiple threads. Creates a new FragmentInstanceState and registers it in - /// fragment_instance_states_, then calls RPC to issue fragment on remote impalad. - void ExecRemoteFragment(const FragmentExecParams& fragment_exec_params, - const TPlanFragment& plan_fragment, DebugOptions* debug_options, - int fragment_instance_idx); - void MtExecRemoteFInstance( + /// multiple threads. + void ExecRemoteFInstance( const FInstanceExecParams& exec_params, const DebugOptions* debug_options); /// Determine fragment number, given fragment id. @@ -518,14 +490,13 @@ class Coordinator { /// Moves all temporary staging files to their final destinations. Status FinalizeSuccessfulInsert(); - /// Initializes the structures in runtime profile and exec_summary_. Must be - /// called before RPCs to start remote fragments. - void InitExecProfile(const TQueryExecRequest& request); - void MtInitExecProfiles(); + /// Initializes the structures in fragment_profiles_. Must be called before RPCs to + /// start remote fragments. + void InitExecProfiles(); /// Initialize the structures to collect execution summary of every plan node /// (exec_summary_ and plan_node_id_to_summary_map_) - void MtInitExecSummary(); + void InitExecSummary(); /// Update fragment profile information from a fragment instance state. void UpdateAverageProfile(FragmentInstanceState* fragment_instance_state); @@ -556,13 +527,10 @@ class Coordinator { void PopulatePathPermissionCache(hdfsFS fs, const std::string& path_str, PermissionCache* permissions_cache); - /// Starts all fragment instances contained in the schedule by issuing RPCs in parallel, - /// and then waiting for all of the RPCs to complete. - void StartFragments(); - - /// Starts all fragment instances contained in the schedule by issuing RPCs in parallel - /// and then waiting for all of the RPCs to complete. - void MtStartFInstances(); + /// Starts all fragment instances contained in the schedule by issuing RPCs in + /// parallel and then waiting for all of the RPCs to complete. Also sets up and + /// registers the state for all fragment instances. + void StartFInstances(); /// Calls CancelInternal() and returns an error if there was any error starting the /// fragments. @@ -570,11 +538,8 @@ class Coordinator { Status FinishInstanceStartup(); /// Build the filter routing table by iterating over all plan nodes and collecting the - /// filters that they either produce or consume. The source and target fragment - /// instance indexes for filters are numbered in the range - /// [start_fragment_instance_idx .. start_fragment_instance_idx + num_hosts] - void UpdateFilterRoutingTable(const std::vector<TPlanNode>& plan_nodes, int num_hosts, - int start_fragment_instance_idx); + /// filters that they either produce or consume. + void UpdateFilterRoutingTable(const FragmentExecParams& fragment_params); }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/scheduling/query-schedule.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc index 1eb36e3..e2dc7c4 100644 --- a/be/src/scheduling/query-schedule.cc +++ b/be/src/scheduling/query-schedule.cc @@ -50,39 +50,16 @@ QuerySchedule::QuerySchedule(const TUniqueId& query_id, query_options_(query_options), summary_profile_(summary_profile), query_events_(query_events), - num_fragment_instances_(0), num_scan_ranges_(0), next_instance_id_(query_id), is_admitted_(false) { - fragment_exec_params_.resize(request.fragments.size()); - bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0; - - if (is_mt_execution) { - /// TODO-MT: remove else branch and move MtInit() logic here - MtInit(); - } else { - // Build two maps to map node ids to their fragments as well as to the offset in their - // fragment's plan's nodes list. - for (int i = 0; i < request.fragments.size(); ++i) { - int node_idx = 0; - for (const TPlanNode& node: request.fragments[i].plan.nodes) { - if (plan_node_to_fragment_idx_.size() < node.node_id + 1) { - plan_node_to_fragment_idx_.resize(node.node_id + 1); - plan_node_to_plan_node_idx_.resize(node.node_id + 1); - } - DCHECK_EQ(plan_node_to_fragment_idx_.size(), plan_node_to_plan_node_idx_.size()); - plan_node_to_fragment_idx_[node.node_id] = i; - plan_node_to_plan_node_idx_[node.node_id] = node_idx; - ++node_idx; - } - } - } + Init(); } -void QuerySchedule::MtInit() { - // extract TPlanFragments and order by fragment id +void QuerySchedule::Init() { + // extract TPlanFragments and order by fragment idx vector<const TPlanFragment*> fragments; - for (const TPlanExecInfo& plan_exec_info: request_.mt_plan_exec_info) { + for (const TPlanExecInfo& plan_exec_info: request_.plan_exec_info) { for (const TPlanFragment& fragment: plan_exec_info.fragments) { fragments.push_back(&fragment); } @@ -90,41 +67,34 @@ void QuerySchedule::MtInit() { sort(fragments.begin(), fragments.end(), [](const TPlanFragment* a, const TPlanFragment* b) { return a->idx < b->idx; }); - DCHECK_EQ(mt_fragment_exec_params_.size(), 0); + // this must only be called once + DCHECK_EQ(fragment_exec_params_.size(), 0); for (const TPlanFragment* fragment: fragments) { - mt_fragment_exec_params_.emplace_back(*fragment); + fragment_exec_params_.emplace_back(*fragment); } // mark coordinator fragment - const TPlanFragment& coord_fragment = request_.mt_plan_exec_info[0].fragments[0]; - if (coord_fragment.partition.type == TPartitionType::UNPARTITIONED) { - mt_fragment_exec_params_[coord_fragment.idx].is_coord_fragment = true; - next_instance_id_.lo = 1; // generated instance ids start at 1 + const TPlanFragment& root_fragment = request_.plan_exec_info[0].fragments[0]; + if (request_.stmt_type == TStmtType::QUERY) { + fragment_exec_params_[root_fragment.idx].is_coord_fragment = true; + // the coordinator instance gets index 0, generated instance ids start at 1 + next_instance_id_.lo = 1; } - // compute input fragments and find max node id + // find max node id int max_node_id = 0; - for (const TPlanExecInfo& plan_exec_info: request_.mt_plan_exec_info) { + for (const TPlanExecInfo& plan_exec_info: request_.plan_exec_info) { for (const TPlanFragment& fragment: plan_exec_info.fragments) { for (const TPlanNode& node: fragment.plan.nodes) { max_node_id = max(node.node_id, max_node_id); } } - - // fragments[i] sends its output to fragments[dest_fragment_idx[i-1]] - for (int i = 1; i < plan_exec_info.fragments.size(); ++i) { - const TPlanFragment& fragment = plan_exec_info.fragments[i]; - FragmentIdx dest_idx = - plan_exec_info.fragments[plan_exec_info.dest_fragment_idx[i - 1]].idx; - MtFragmentExecParams& dest_params = mt_fragment_exec_params_[dest_idx]; - dest_params.input_fragments.push_back(fragment.idx); - } } // populate plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_ plan_node_to_fragment_idx_.resize(max_node_id + 1); plan_node_to_plan_node_idx_.resize(max_node_id + 1); - for (const TPlanExecInfo& plan_exec_info: request_.mt_plan_exec_info) { + for (const TPlanExecInfo& plan_exec_info: request_.plan_exec_info) { for (const TPlanFragment& fragment: plan_exec_info.fragments) { for (int i = 0; i < fragment.plan.nodes.size(); ++i) { const TPlanNode& node = fragment.plan.nodes[i]; @@ -133,8 +103,72 @@ void QuerySchedule::MtInit() { } } } + + // compute input fragments + for (const TPlanExecInfo& plan_exec_info: request_.plan_exec_info) { + // each fragment sends its output to the fragment containing the destination node + // of its output sink + for (const TPlanFragment& fragment: plan_exec_info.fragments) { + if (!fragment.output_sink.__isset.stream_sink) continue; + PlanNodeId dest_node_id = fragment.output_sink.stream_sink.dest_node_id; + FragmentIdx dest_idx = plan_node_to_fragment_idx_[dest_node_id]; + FragmentExecParams& dest_params = fragment_exec_params_[dest_idx]; + dest_params.input_fragments.push_back(fragment.idx); + } + } } +void QuerySchedule::Validate() const { + // all fragments have a FragmentExecParams + int num_fragments = 0; + for (const TPlanExecInfo& plan_exec_info: request_.plan_exec_info) { + for (const TPlanFragment& fragment: plan_exec_info.fragments) { + DCHECK_LT(fragment.idx, fragment_exec_params_.size()); + DCHECK_EQ(fragment.idx, fragment_exec_params_[fragment.idx].fragment.idx); + ++num_fragments; + } + } + DCHECK_EQ(num_fragments, fragment_exec_params_.size()); + + // we assigned the correct number of scan ranges per (host, node id): + // assemble a map from host -> (map from node id -> #scan ranges) + unordered_map<TNetworkAddress, map<TPlanNodeId, int>> count_map; + for (const FragmentExecParams& fp: fragment_exec_params_) { + for (const FInstanceExecParams& ip: fp.instance_exec_params) { + auto host_it = count_map.find(ip.host); + if (host_it == count_map.end()) { + count_map.insert(make_pair(ip.host, map<TPlanNodeId, int>())); + host_it = count_map.find(ip.host); + } + map<TPlanNodeId, int>& node_map = host_it->second; + + for (const PerNodeScanRanges::value_type& instance_entry: ip.per_node_scan_ranges) { + TPlanNodeId node_id = instance_entry.first; + auto count_entry = node_map.find(node_id); + if (count_entry == node_map.end()) { + node_map.insert(make_pair(node_id, 0)); + count_entry = node_map.find(node_id); + } + count_entry->second += instance_entry.second.size(); + } + } + } + + for (const FragmentExecParams& fp: fragment_exec_params_) { + for (const FragmentScanRangeAssignment::value_type& assignment_entry: + fp.scan_range_assignment) { + const TNetworkAddress& host = assignment_entry.first; + DCHECK_GT(count_map.count(host), 0); + map<TPlanNodeId, int>& node_map = count_map.find(host)->second; + for (const PerNodeScanRanges::value_type& node_assignment: + assignment_entry.second) { + TPlanNodeId node_id = node_assignment.first; + DCHECK_GT(node_map.count(node_id), 0); + DCHECK_EQ(node_map[node_id], node_assignment.second.size()); + } + } + } +} int64_t QuerySchedule::GetClusterMemoryEstimate() const { DCHECK_GT(unique_hosts_.size(), 0); @@ -199,15 +233,8 @@ const TPlanFragment& FInstanceExecParams::fragment() const { int QuerySchedule::GetNumFragmentInstances() const { int result = 0; - if (mt_fragment_exec_params_.empty()) { - DCHECK(!fragment_exec_params_.empty()); - for (const FragmentExecParams& fragment_exec_params : fragment_exec_params_) { - result += fragment_exec_params.hosts.size(); - } - } else { - for (const MtFragmentExecParams& fragment_exec_params : mt_fragment_exec_params_) { - result += fragment_exec_params.instance_exec_params.size(); - } + for (const FragmentExecParams& fragment_exec_params : fragment_exec_params_) { + result += fragment_exec_params.instance_exec_params.size(); } return result; } @@ -215,37 +242,35 @@ int QuerySchedule::GetNumFragmentInstances() const { const TPlanFragment* QuerySchedule::GetCoordFragment() const { // Only have coordinator fragment for statements that return rows. if (request_.stmt_type != TStmtType::QUERY) return nullptr; - bool is_mt_exec = request_.query_ctx.request.query_options.mt_dop > 0; - const TPlanFragment* fragment = is_mt_exec - ? &request_.mt_plan_exec_info[0].fragments[0] : &request_.fragments[0]; - - return fragment; + const TPlanFragment* fragment = &request_.plan_exec_info[0].fragments[0]; + DCHECK_EQ(fragment->partition.type, TPartitionType::UNPARTITIONED); + return fragment; } + void QuerySchedule::GetTPlanFragments(vector<const TPlanFragment*>* fragments) const { fragments->clear(); - bool is_mt_exec = request_.query_ctx.request.query_options.mt_dop > 0; - if (is_mt_exec) { - for (const TPlanExecInfo& plan_info: request_.mt_plan_exec_info) { - for (const TPlanFragment& fragment: plan_info.fragments) { - fragments->push_back(&fragment); - } - } - } else { - for (const TPlanFragment& fragment: request_.fragments) { + for (const TPlanExecInfo& plan_info: request_.plan_exec_info) { + for (const TPlanFragment& fragment: plan_info.fragments) { fragments->push_back(&fragment); } } } const FInstanceExecParams& QuerySchedule::GetCoordInstanceExecParams() const { - const TPlanFragment& coord_fragment = request_.mt_plan_exec_info[0].fragments[0]; - DCHECK_EQ(coord_fragment.partition.type, TPartitionType::UNPARTITIONED); - const MtFragmentExecParams* fragment_params = - &mt_fragment_exec_params_[coord_fragment.idx]; - DCHECK(fragment_params != nullptr); - DCHECK_EQ(fragment_params->instance_exec_params.size(), 1); - return fragment_params->instance_exec_params[0]; + DCHECK_EQ(request_.stmt_type, TStmtType::QUERY); + const TPlanFragment& coord_fragment = request_.plan_exec_info[0].fragments[0]; + const FragmentExecParams& fragment_params = fragment_exec_params_[coord_fragment.idx]; + DCHECK_EQ(fragment_params.instance_exec_params.size(), 1); + return fragment_params.instance_exec_params[0]; +} + +vector<int> FragmentExecParams::GetInstanceIdxs() const { + vector<int> result; + for (const FInstanceExecParams& instance_params: instance_exec_params) { + result.push_back(GetInstanceIdx(instance_params.instance_id)); + } + return result; } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/scheduling/query-schedule.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h index 77c9cd6..703c07c 100644 --- a/be/src/scheduling/query-schedule.h +++ b/be/src/scheduling/query-schedule.h @@ -35,7 +35,7 @@ namespace impala { class Coordinator; -struct MtFragmentExecParams; +struct FragmentExecParams; /// map from scan node id to a list of scan ranges typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges; @@ -45,22 +45,6 @@ typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges; typedef std::unordered_map<TNetworkAddress, PerNodeScanRanges> FragmentScanRangeAssignment; -/// execution parameters for a single fragment; used to assemble the -/// TPlanFragmentInstanceCtx; -/// hosts.size() == instance_ids.size() -struct FragmentExecParams { - std::vector<TNetworkAddress> hosts; // execution backends - std::vector<TUniqueId> instance_ids; - std::vector<TPlanFragmentDestination> destinations; - std::map<PlanNodeId, int> per_exch_num_senders; - FragmentScanRangeAssignment scan_range_assignment; - /// In its role as a data sender, a fragment instance is assigned a "sender id" to - /// uniquely identify it to a receiver. The id that a particular fragment instance - /// is assigned ranges from [sender_id_base, sender_id_base + N - 1], where - /// N = hosts.size (i.e. N = number of fragment instances) - int sender_id_base; -}; - /// execution parameters for a single fragment instance; used to assemble the /// TPlanFragmentInstanceCtx struct FInstanceExecParams { @@ -75,12 +59,12 @@ struct FInstanceExecParams { /// uniquely identify it to a receiver. -1 = invalid. int sender_id; - /// the parent MtFragmentExecParams - const MtFragmentExecParams& fragment_exec_params; + /// the parent FragmentExecParams + const FragmentExecParams& fragment_exec_params; const TPlanFragment& fragment() const; FInstanceExecParams(const TUniqueId& instance_id, const TNetworkAddress& host, - int per_fragment_instance_idx, const MtFragmentExecParams& fragment_exec_params) + int per_fragment_instance_idx, const FragmentExecParams& fragment_exec_params) : instance_id(instance_id), host(host), per_fragment_instance_idx(per_fragment_instance_idx), sender_id(-1), @@ -88,7 +72,7 @@ struct FInstanceExecParams { }; /// Execution parameters shared between fragment instances -struct MtFragmentExecParams { +struct FragmentExecParams { /// output destinations of this fragment std::vector<TPlanFragmentDestination> destinations; @@ -105,8 +89,11 @@ struct MtFragmentExecParams { std::vector<FragmentIdx> input_fragments; std::vector<FInstanceExecParams> instance_exec_params; - MtFragmentExecParams(const TPlanFragment& fragment) + FragmentExecParams(const TPlanFragment& fragment) : is_coord_fragment(false), fragment(fragment) {} + + // extract instance indices from instance_exec_params.instance_id + std::vector<int> GetInstanceIdxs() const; }; /// A QuerySchedule contains all necessary information for a query coordinator to @@ -123,6 +110,11 @@ class QuerySchedule { const TQueryOptions& query_options, RuntimeProfile* summary_profile, RuntimeProfile::EventSequence* query_events); + /// Verifies that the schedule is well-formed (and DCHECKs if it isn't): + /// - all fragments have a FragmentExecParams + /// - all scan ranges are assigned + void Validate() const; + const TUniqueId& query_id() const { return query_id_; } const TQueryExecRequest& request() const { return request_; } const TQueryOptions& query_options() const { return query_options_; } @@ -165,30 +157,24 @@ class QuerySchedule { TUniqueId GetNextInstanceId(); const TPlanFragment& GetContainingFragment(PlanNodeId node_id) const { - return mt_fragment_exec_params_[GetFragmentIdx(node_id)].fragment; + FragmentIdx fragment_idx = GetFragmentIdx(node_id); + DCHECK_LT(fragment_idx, fragment_exec_params_.size()); + return fragment_exec_params_[fragment_idx].fragment; } - /// Map node ids to the index of their node inside their plan.nodes list. - /// TODO-MT: remove; only needed for the ST path - int32_t GetNodeIdx(PlanNodeId id) const { return plan_node_to_plan_node_idx_[id]; } - const TPlanNode& GetNode(PlanNodeId id) const { const TPlanFragment& fragment = GetContainingFragment(id); return fragment.plan.nodes[plan_node_to_plan_node_idx_[id]]; } - std::vector<FragmentExecParams>* exec_params() { return &fragment_exec_params_; } - const std::vector<FragmentExecParams>& exec_params() const { + const std::vector<FragmentExecParams>& fragment_exec_params() const { return fragment_exec_params_; } - const std::vector<MtFragmentExecParams>& mt_fragment_exec_params() const { - return mt_fragment_exec_params_; - } - const MtFragmentExecParams& GetFragmentExecParams(FragmentIdx idx) const { - return mt_fragment_exec_params_[idx]; + const FragmentExecParams& GetFragmentExecParams(FragmentIdx idx) const { + return fragment_exec_params_[idx]; } - MtFragmentExecParams* GetFragmentExecParams(FragmentIdx idx) { - return &mt_fragment_exec_params_[idx]; + FragmentExecParams* GetFragmentExecParams(FragmentIdx idx) { + return &fragment_exec_params_[idx]; } const FInstanceExecParams& GetCoordInstanceExecParams() const; @@ -211,6 +197,8 @@ class QuerySchedule { /// The query options from the TClientRequest const TQueryOptions& query_options_; + + /// TODO: move these into QueryState RuntimeProfile* summary_profile_; RuntimeProfile::EventSequence* query_events_; @@ -220,21 +208,14 @@ class QuerySchedule { /// Maps from plan node id to its index in plan.nodes. Filled in c'tor. std::vector<int32_t> plan_node_to_plan_node_idx_; - /// vector is indexed by fragment index from TQueryExecRequest.fragments; - /// populated by Scheduler::Schedule() + // populated in Init() and Scheduler::Schedule() + // (SimpleScheduler::ComputeFInstanceExecParams()), indexed by fragment idx + // (TPlanFragment.idx) std::vector<FragmentExecParams> fragment_exec_params_; - // populated by Scheduler::Schedule (SimpleScheduler::ComputeMtFInstanceExecParams()) - // indexed by fragment idx (TPlanFragment.idx) - std::vector<MtFragmentExecParams> mt_fragment_exec_params_; - /// The set of hosts that the query will run on excluding the coordinator. boost::unordered_set<TNetworkAddress> unique_hosts_; - /// Number of backends executing plan fragments on behalf of this query. - /// TODO-MT: remove - int64_t num_fragment_instances_; - /// Total number of scan ranges of this query. int64_t num_scan_ranges_; @@ -247,10 +228,10 @@ class QuerySchedule { /// Indicates if the query has been admitted for execution. bool is_admitted_; - /// Populate mt_fragment_exec_params_ from request_.mt_plan_exec_info. + /// Populate fragment_exec_params_ from request_.plan_exec_info. /// Sets is_coord_fragment and input_fragments. /// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_. - void MtInit(); + void Init(); }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/scheduling/simple-scheduler-test-util.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler-test-util.cc b/be/src/scheduling/simple-scheduler-test-util.cc index d3f5584..911279c 100644 --- a/be/src/scheduling/simple-scheduler-test-util.cc +++ b/be/src/scheduling/simple-scheduler-test-util.cc @@ -202,7 +202,7 @@ const vector<TNetworkAddress>& Plan::referenced_datanodes() const { return referenced_datanodes_; } -const vector<TScanRangeLocations>& Plan::scan_range_locations() const { +const vector<TScanRangeLocationList>& Plan::scan_range_locations() const { return scan_range_locations_; } @@ -211,14 +211,14 @@ void Plan::AddTableScan(const TableName& table_name) { const vector<Block>& blocks = table.blocks; for (int i = 0; i < blocks.size(); ++i) { const Block& block = blocks[i]; - TScanRangeLocations scan_range_locations; - BuildTScanRangeLocations(table_name, block, i, &scan_range_locations); + TScanRangeLocationList scan_range_locations; + BuildTScanRangeLocationList(table_name, block, i, &scan_range_locations); scan_range_locations_.push_back(scan_range_locations); } } -void Plan::BuildTScanRangeLocations(const TableName& table_name, const Block& block, - int block_idx, TScanRangeLocations* scan_range_locations) { +void Plan::BuildTScanRangeLocationList(const TableName& table_name, const Block& block, + int block_idx, TScanRangeLocationList* scan_range_locations) { const vector<int>& replica_idxs = block.replica_host_idxs; const vector<bool>& is_cached = block.replica_host_idx_is_cached; DCHECK_EQ(replica_idxs.size(), is_cached.size()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/scheduling/simple-scheduler-test-util.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler-test-util.h b/be/src/scheduling/simple-scheduler-test-util.h index ab46e2a..82190ad 100644 --- a/be/src/scheduling/simple-scheduler-test-util.h +++ b/be/src/scheduling/simple-scheduler-test-util.h @@ -231,11 +231,11 @@ class Plan { const std::vector<TNetworkAddress>& referenced_datanodes() const; - const std::vector<TScanRangeLocations>& scan_range_locations() const; + const std::vector<TScanRangeLocationList>& scan_range_locations() const; /// Add a scan of table 'table_name' to the plan. This method will populate the internal - /// list of TScanRangeLocations and can be called multiple times for the same table to - /// schedule additional scans. + /// list of TScanRangeLocationList and can be called multiple times for the same table + /// to schedule additional scans. void AddTableScan(const TableName& table_name); private: @@ -252,11 +252,11 @@ class Plan { std::unordered_map<int, int> host_idx_to_datanode_idx_; /// List of all scan range locations, which can be passed to the SimpleScheduler. - std::vector<TScanRangeLocations> scan_range_locations_; + std::vector<TScanRangeLocationList> scan_range_locations_; - /// Initialize a TScanRangeLocations object in place. - void BuildTScanRangeLocations(const TableName& table_name, const Block& block, - int block_idx, TScanRangeLocations* scan_range_locations); + /// Initialize a TScanRangeLocationList object in place. + void BuildTScanRangeLocationList(const TableName& table_name, const Block& block, + int block_idx, TScanRangeLocationList* scan_range_locations); void BuildScanRange(const TableName& table_name, const Block& block, int block_idx, TScanRange* scan_range);
