IMPALA-3627: Clean up RPC structures in ImpalaInternalService This change is a pre-requisite for IMPALA-2550.
Change-Id: I0659c94f6b80bd7bbe0bd150ce243f9efa9a41ad TODO: Write commit message Reviewed-on: http://gerrit.cloudera.org:8080/3202 Reviewed-by: Lars Volker <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5be7c68e Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5be7c68e Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5be7c68e Branch: refs/heads/master Commit: 5be7c68ed86bee90bec1e9d64ec8829c38d2026d Parents: 32c40f9 Author: Lars Volker <[email protected]> Authored: Tue May 24 13:11:18 2016 +0200 Committer: Tim Armstrong <[email protected]> Committed: Tue May 31 23:32:12 2016 -0700 ---------------------------------------------------------------------- be/src/exec/data-sink.cc | 6 +- be/src/exec/data-sink.h | 7 +- be/src/exec/union-node.cc | 2 +- be/src/runtime/coordinator.cc | 111 +++++++++++++----------- be/src/runtime/coordinator.h | 13 +-- be/src/runtime/plan-fragment-executor.cc | 75 ++++++++-------- be/src/runtime/plan-fragment-executor.h | 3 +- be/src/runtime/runtime-state.cc | 12 +-- be/src/runtime/runtime-state.h | 12 +-- be/src/runtime/test-env.cc | 4 +- be/src/scheduling/query-schedule.h | 2 +- be/src/service/fragment-exec-state.cc | 4 +- be/src/service/fragment-exec-state.h | 11 +-- be/src/service/fragment-mgr.cc | 10 +-- be/src/service/impala-server.cc | 4 +- common/thrift/ImpalaInternalService.thrift | 95 ++++++++++---------- common/thrift/PlanNodes.thrift | 2 +- 17 files changed, 193 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/exec/data-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc index dc3a17c..9440368 100644 --- a/be/src/exec/data-sink.cc +++ b/be/src/exec/data-sink.cc @@ -36,7 +36,7 @@ namespace impala { Status DataSink::CreateDataSink(ObjectPool* pool, const TDataSink& thrift_sink, const vector<TExpr>& output_exprs, - const TPlanFragmentExecParams& params, + const TPlanFragmentInstanceCtx& fragment_instance_ctx, const RowDescriptor& row_desc, scoped_ptr<DataSink>* sink) { DataSink* tmp_sink = NULL; switch (thrift_sink.type) { @@ -47,8 +47,8 @@ Status DataSink::CreateDataSink(ObjectPool* pool, // TODO: figure out good buffer size based on size of output row tmp_sink = new DataStreamSender(pool, - params.sender_id, row_desc, thrift_sink.stream_sink, - params.destinations, 16 * 1024); + fragment_instance_ctx.sender_id, row_desc, thrift_sink.stream_sink, + fragment_instance_ctx.destinations, 16 * 1024); sink->reset(tmp_sink); break; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/exec/data-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h index 8935727..9c9b708 100644 --- a/be/src/exec/data-sink.h +++ b/be/src/exec/data-sink.h @@ -32,7 +32,7 @@ class RuntimeProfile; class RuntimeState; class TPlanExecRequest; class TPlanExecParams; -class TPlanFragmentExecParams; +class TPlanFragmentInstanceCtx; class RowDescriptor; /// Superclass of all data sinks. @@ -66,7 +66,7 @@ class DataSink { /// new sink is written to *sink, and is owned by the caller. static Status CreateDataSink(ObjectPool* pool, const TDataSink& thrift_sink, const std::vector<TExpr>& output_exprs, - const TPlanFragmentExecParams& params, + const TPlanFragmentInstanceCtx& fragment_instance_ctx, const RowDescriptor& row_desc, boost::scoped_ptr<DataSink>* sink); /// Returns the runtime profile for the sink. @@ -77,7 +77,8 @@ class DataSink { static void MergeInsertStats(const TInsertStats& src_stats, TInsertStats* dst_stats); - /// Outputs the insert stats contained in the map of insert partition updates to a string + /// Outputs the insert stats contained in the map of insert partition updates to a + /// string static std::string OutputInsertStats(const PartitionStatusMap& stats, const std::string& prefix = ""); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/exec/union-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc index 42d9b97..2e4cbf5 100644 --- a/be/src/exec/union-node.cc +++ b/be/src/exec/union-node.cc @@ -168,7 +168,7 @@ Status UnionNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { // Evaluate and materialize the const expr lists exactly once. while (const_result_expr_idx_ < const_result_expr_ctx_lists_.size()) { // Only evaluate the const expr lists by the first fragment instance. - if (state->fragment_ctx().per_fragment_instance_idx == 0) { + if (state->fragment_ctx().fragment_instance_idx == 0) { // Materialize expr results into row_batch. RETURN_IF_ERROR(EvalAndMaterializeExprs( const_result_expr_ctx_lists_[const_result_expr_idx_], true, &tuple, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 7cb6c97..6f7368c 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -95,24 +95,24 @@ namespace impala { // Maximum number of fragment instances that can publish each broadcast filter. static const int MAX_BROADCAST_FILTER_PRODUCERS = 3; -// container for debug options in TPlanFragmentExecParams (debug_node, debug_action, +// container for debug options in TPlanFragmentInstanceCtx (debug_node, debug_action, // debug_phase) struct DebugOptions { - int fragment_instance_idx; + int instance_state_idx; int node_id; TDebugAction::type action; TExecNodePhase::type phase; // INVALID: debug options invalid DebugOptions() - : fragment_instance_idx(-1), node_id(-1), action(TDebugAction::WAIT), + : instance_state_idx(-1), node_id(-1), action(TDebugAction::WAIT), phase(TExecNodePhase::INVALID) {} // If these debug options apply to the candidate fragment instance, returns true // otherwise returns false. bool IsApplicable(int candidate_fragment_instance_idx) { if (phase == TExecNodePhase::INVALID) return false; - return (fragment_instance_idx == -1 || - fragment_instance_idx == candidate_fragment_instance_idx); + return (instance_state_idx == -1 || + instance_state_idx == candidate_fragment_instance_idx); } }; @@ -360,12 +360,12 @@ static void ProcessQueryOptions( split(components, query_options.debug_action, is_any_of(":"), token_compress_on); if (components.size() < 3 || components.size() > 4) return; if (components.size() == 3) { - debug_options->fragment_instance_idx = -1; + debug_options->instance_state_idx = -1; debug_options->node_id = atoi(components[0].c_str()); debug_options->phase = GetExecNodePhase(components[1]); debug_options->action = GetDebugAction(components[2]); } else { - debug_options->fragment_instance_idx = atoi(components[0].c_str()); + debug_options->instance_state_idx = atoi(components[0].c_str()); debug_options->node_id = atoi(components[1].c_str()); debug_options->phase = GetExecNodePhase(components[2]); debug_options->action = GetDebugAction(components[3]); @@ -552,7 +552,7 @@ Status Coordinator::StartRemoteFragments(QuerySchedule* schedule) { query_events_->MarkEvent( Substitute("Ready to start $0 remote fragments", num_fragment_instances)); - int fragment_instance_idx = 0; + int instance_state_idx = 0; bool has_coordinator_fragment = request.fragments[0].partition.type == TPartitionType::UNPARTITIONED; int first_remote_fragment_idx = has_coordinator_fragment ? 1 : 0; @@ -565,13 +565,13 @@ Status Coordinator::StartRemoteFragments(QuerySchedule* schedule) { int num_hosts = params->hosts.size(); DCHECK_GT(num_hosts, 0); UpdateFilterRoutingTable(request.fragments[fragment_idx].plan.nodes, num_hosts, - fragment_instance_idx); - fragment_instance_idx += num_hosts; + instance_state_idx); + instance_state_idx += num_hosts; } MarkFilterRoutingTableComplete(); } - fragment_instance_idx = 0; + instance_state_idx = 0; // Start one fragment instance per fragment per host (number of hosts running each // fragment may not be constant). for (int fragment_idx = first_remote_fragment_idx; @@ -584,24 +584,24 @@ Status Coordinator::StartRemoteFragments(QuerySchedule* schedule) { // schedule. Each fragment instance is assigned a unique ID, numbered from 0, with // instances for fragment ID 0 being assigned IDs [0 .. num_hosts(fragment_id_0)] and // so on. - for (int per_fragment_instance_idx = 0; per_fragment_instance_idx < num_hosts; - ++per_fragment_instance_idx) { + for (int fragment_instance_idx = 0; fragment_instance_idx < num_hosts; + ++fragment_instance_idx) { DebugOptions* fragment_instance_debug_options = - debug_options.IsApplicable(fragment_instance_idx) ? &debug_options : NULL; + debug_options.IsApplicable(instance_state_idx) ? &debug_options : NULL; exec_env_->fragment_exec_thread_pool()->Offer( bind<void>(mem_fn(&Coordinator::ExecRemoteFragment), this, params, // fragment_exec_params &request.fragments[fragment_idx], // plan_fragment, fragment_instance_debug_options, schedule, - fragment_instance_idx++, + instance_state_idx++, fragment_idx, - per_fragment_instance_idx)); + fragment_instance_idx)); } } exec_complete_barrier_->Wait(); query_events_->MarkEvent( - Substitute("All $0 remote fragments started", fragment_instance_idx)); + Substitute("All $0 remote fragments started", instance_state_idx)); Status status = Status::OK(); const TMetricDef& def = @@ -1360,23 +1360,24 @@ int64_t Coordinator::ComputeTotalScanRangesComplete(int node_id) { void Coordinator::ExecRemoteFragment(const FragmentExecParams* fragment_exec_params, const TPlanFragment* plan_fragment, DebugOptions* debug_options, - QuerySchedule* schedule, int fragment_instance_idx, int fragment_idx, - int per_fragment_instance_idx) { + QuerySchedule* schedule, int instance_state_idx, int fragment_idx, + int fragment_instance_idx) { NotifyBarrierOnExit notifier(exec_complete_barrier_.get()); TExecPlanFragmentParams rpc_params; SetExecPlanFragmentParams(*schedule, *plan_fragment, *fragment_exec_params, - fragment_instance_idx, fragment_idx, per_fragment_instance_idx, + instance_state_idx, fragment_idx, fragment_instance_idx, MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port), &rpc_params); if (debug_options != NULL) { - rpc_params.params.__set_debug_node_id(debug_options->node_id); - rpc_params.params.__set_debug_action(debug_options->action); - rpc_params.params.__set_debug_phase(debug_options->phase); + rpc_params.fragment_instance_ctx.__set_debug_node_id(debug_options->node_id); + rpc_params.fragment_instance_ctx.__set_debug_action(debug_options->action); + rpc_params.fragment_instance_ctx.__set_debug_phase(debug_options->phase); } FragmentInstanceState* exec_state = obj_pool()->Add( - new FragmentInstanceState(fragment_idx, fragment_exec_params, - per_fragment_instance_idx, obj_pool())); - exec_state->ComputeTotalSplitSize(rpc_params.params.per_node_scan_ranges); - fragment_instance_states_[fragment_instance_idx] = exec_state; + new FragmentInstanceState(fragment_idx, fragment_exec_params, fragment_instance_idx, + obj_pool())); + exec_state->ComputeTotalSplitSize( + rpc_params.fragment_instance_ctx.per_node_scan_ranges); + fragment_instance_states_[instance_state_idx] = exec_state; VLOG_FILE << "making rpc: ExecPlanFragment query_id=" << query_id_ << " instance_id=" << exec_state->fragment_instance_id() << " host=" << exec_state->impalad_address(); @@ -1510,13 +1511,13 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para VLOG_FILE << "UpdateFragmentExecStatus() query_id=" << query_id_ << " status=" << params.status.status_code << " done=" << (params.done ? "true" : "false"); - uint32_t fragment_instance_idx = params.fragment_instance_idx; - if (fragment_instance_idx >= fragment_instance_states_.size()) { + uint32_t instance_state_idx = params.instance_state_idx; + if (instance_state_idx >= fragment_instance_states_.size()) { return Status(TErrorCode::INTERNAL_ERROR, Substitute("Unknown fragment instance index $0 (max known: $1)", - fragment_instance_idx, fragment_instance_states_.size() - 1)); + instance_state_idx, fragment_instance_states_.size() - 1)); } - FragmentInstanceState* exec_state = fragment_instance_states_[fragment_instance_idx]; + FragmentInstanceState* exec_state = fragment_instance_states_[instance_state_idx]; const TRuntimeProfileTree& cumulative_profile = params.profile; Status status(params.status); @@ -1611,7 +1612,7 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para lock_guard<mutex> l(lock_); exec_state->stopwatch()->Stop(); DCHECK_GT(num_remaining_fragment_instances_, 0); - VLOG_QUERY << "Fragment instance " << params.fragment_instance_idx << "(" + VLOG_QUERY << "Fragment instance " << params.instance_state_idx << "(" << exec_state->fragment_instance_id() << ") on host " << exec_state->impalad_address() << " completed, " << num_remaining_fragment_instances_ - 1 << " remaining: query_id=" @@ -1857,13 +1858,18 @@ string Coordinator::GetErrorLog() { void Coordinator::SetExecPlanFragmentParams(QuerySchedule& schedule, const TPlanFragment& fragment, const FragmentExecParams& params, - int fragment_instance_idx, int fragment_idx, int per_fragment_instance_idx, + int instance_state_idx, int fragment_idx, int fragment_instance_idx, const TNetworkAddress& coord, TExecPlanFragmentParams* rpc_params) { rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1); - rpc_params->__set_fragment(fragment); + rpc_params->__set_query_ctx(query_ctx_); + + TPlanFragmentCtx fragment_ctx; + TPlanFragmentInstanceCtx fragment_instance_ctx; + + fragment_ctx.__set_fragment(fragment); // Remove filters that weren't selected during filter routing table construction. if (filter_mode_ != TRuntimeFilterMode::OFF) { - for (TPlanNode& plan_node: rpc_params->fragment.plan.nodes) { + for (TPlanNode& plan_node: rpc_params->fragment_ctx.fragment.plan.nodes) { if (plan_node.__isset.runtime_filters) { vector<TRuntimeFilterDesc> required_filters; for (const TRuntimeFilterDesc& desc: plan_node.runtime_filters) { @@ -1872,7 +1878,7 @@ void Coordinator::SetExecPlanFragmentParams(QuerySchedule& schedule, if (filter_it == filter_routing_table_.end()) continue; FilterState* f = &filter_it->second; if (plan_node.__isset.hash_join_node) { - if (f->src_fragment_instance_idxs.find(fragment_instance_idx) == + if (f->src_fragment_instance_idxs.find(instance_state_idx) == f->src_fragment_instance_idxs.end()) { DCHECK(desc.is_broadcast_join); continue; @@ -1889,7 +1895,7 @@ void Coordinator::SetExecPlanFragmentParams(QuerySchedule& schedule, } SetExecPlanDescriptorTable(fragment, rpc_params); - TNetworkAddress exec_host = params.hosts[per_fragment_instance_idx]; + TNetworkAddress exec_host = params.hosts[fragment_instance_idx]; if (schedule.HasReservation()) { // The reservation has already have been validated at this point. TNetworkAddress resource_hostport; @@ -1900,33 +1906,32 @@ void Coordinator::SetExecPlanFragmentParams(QuerySchedule& schedule, // fragment. Otherwise, don't set it (usually this the coordinator fragment), and it // won't participate in dynamic RM controls. if (it != schedule.reservation()->allocated_resources.end()) { - rpc_params->__set_reserved_resource(it->second); - rpc_params->__set_local_resource_address(resource_hostport); + fragment_instance_ctx.__set_reserved_resource(it->second); + fragment_instance_ctx.__set_local_resource_address(resource_hostport); } } - rpc_params->params.__set_request_pool(schedule.request_pool()); FragmentScanRangeAssignment::const_iterator it = params.scan_range_assignment.find(exec_host); // Scan ranges may not always be set, so use an empty structure if so. const PerNodeScanRanges& scan_ranges = (it != params.scan_range_assignment.end()) ? it->second : PerNodeScanRanges(); - rpc_params->params.__set_per_node_scan_ranges(scan_ranges); - rpc_params->params.__set_per_exch_num_senders(params.per_exch_num_senders); - rpc_params->params.__set_destinations(params.destinations); - rpc_params->params.__set_sender_id(params.sender_id_base + per_fragment_instance_idx); - rpc_params->__isset.params = true; - rpc_params->fragment_instance_ctx.__set_query_ctx(query_ctx_); - rpc_params->fragment_instance_ctx.fragment_instance_id = - params.instance_ids[per_fragment_instance_idx]; - rpc_params->fragment_instance_ctx.per_fragment_instance_idx = per_fragment_instance_idx; - rpc_params->fragment_instance_ctx.num_fragment_instances = params.instance_ids.size(); - rpc_params->fragment_instance_ctx.fragment_instance_idx = fragment_instance_idx; - rpc_params->__isset.fragment_instance_ctx = true; + fragment_ctx.num_fragment_instances = params.instance_ids.size(); + fragment_instance_ctx.__set_request_pool(schedule.request_pool()); + fragment_instance_ctx.__set_per_node_scan_ranges(scan_ranges); + fragment_instance_ctx.__set_per_exch_num_senders(params.per_exch_num_senders); + fragment_instance_ctx.__set_destinations(params.destinations); + fragment_instance_ctx.__set_sender_id(params.sender_id_base + fragment_instance_idx); + fragment_instance_ctx.fragment_instance_id = params.instance_ids[fragment_instance_idx]; + fragment_instance_ctx.fragment_instance_idx = fragment_instance_idx; + fragment_instance_ctx.instance_state_idx = instance_state_idx; + rpc_params->__set_fragment_ctx(fragment_ctx); + rpc_params->__set_fragment_instance_ctx(fragment_instance_ctx); } void Coordinator::SetExecPlanDescriptorTable(const TPlanFragment& fragment, TExecPlanFragmentParams* rpc_params) { + DCHECK(rpc_params->__isset.query_ctx); TDescriptorTable thrift_desc_tbl; // Always add the Tuple and Slot descriptors. @@ -1991,7 +1996,7 @@ void Coordinator::SetExecPlanDescriptorTable(const TPlanFragment& fragment, thrift_desc_tbl.__isset.tableDescriptors = true; } - rpc_params->__set_desc_tbl(thrift_desc_tbl); + rpc_params->query_ctx.__set_desc_tbl(thrift_desc_tbl); } namespace { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/runtime/coordinator.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h index 578c61d..b4eab0e 100644 --- a/be/src/runtime/coordinator.h +++ b/be/src/runtime/coordinator.h @@ -425,14 +425,15 @@ class Coordinator { void MarkFilterRoutingTableComplete(); /// Fill in rpc_params based on parameters. - /// 'fragment_instance_idx' is the 0-based query-wide ordinal of the fragment instance. + /// 'instance_state_idx' is the index of the fragment instance state in + /// fragment_instance_states_. /// 'fragment_idx' is the 0-based query-wide ordinal of the fragment of which it is an /// instance. - /// 'per_fragment_instance_idx' is the 0-based ordinal of this particular fragment + /// 'fragment_instance_idx' is the 0-based ordinal of this particular fragment /// instance within its fragment. void SetExecPlanFragmentParams(QuerySchedule& schedule, const TPlanFragment& fragment, - const FragmentExecParams& params, int fragment_instance_idx, int fragment_idx, - int per_fragment_instance_idx, const TNetworkAddress& coord, + const FragmentExecParams& params, int instance_state_idx, int fragment_idx, + int fragment_instance_idx, const TNetworkAddress& coord, TExecPlanFragmentParams* rpc_params); /// Wrapper for ExecPlanFragment() RPC. This function will be called in parallel from @@ -440,8 +441,8 @@ class Coordinator { /// fragment_instance_states_, then calls RPC to issue fragment on remote impalad. void ExecRemoteFragment(const FragmentExecParams* fragment_exec_params, const TPlanFragment* plan_fragment, DebugOptions* debug_options, - QuerySchedule* schedule, int fragment_instance_idx, int fragment_idx, - int per_fragment_instance_idx); + QuerySchedule* schedule, int instance_state_idx, int fragment_idx, + int fragment_instance_idx); /// Determine fragment number, given fragment id. int GetFragmentNum(const TUniqueId& fragment_id); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/runtime/plan-fragment-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc index 75c5f58..ef39206 100644 --- a/be/src/runtime/plan-fragment-executor.cc +++ b/be/src/runtime/plan-fragment-executor.cc @@ -88,20 +88,23 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { is_prepared_ = true; // TODO: Break this method up. fragment_sw_.Start(); - const TPlanFragmentExecParams& params = request.params; - query_id_ = request.fragment_instance_ctx.query_ctx.query_id; + const TPlanFragmentInstanceCtx& fragment_instance_ctx = request.fragment_instance_ctx; + query_id_ = request.query_ctx.query_id; VLOG_QUERY << "Prepare(): query_id=" << PrintId(query_id_) << " instance_id=" << PrintId(request.fragment_instance_ctx.fragment_instance_id); - VLOG(2) << "params:\n" << ThriftDebugString(params); + VLOG(2) << "fragment_instance_ctx:\n" << ThriftDebugString(fragment_instance_ctx); - if (request.__isset.reserved_resource) { + DCHECK(request.__isset.fragment_ctx); + bool request_has_reserved_resource = + request.fragment_instance_ctx.__isset.reserved_resource; + if (request_has_reserved_resource) { VLOG_QUERY << "Executing fragment in reserved resource:\n" - << request.reserved_resource; + << request.fragment_instance_ctx.reserved_resource; } string cgroup = ""; - if (FLAGS_enable_rm && request.__isset.reserved_resource) { + if (FLAGS_enable_rm && request_has_reserved_resource) { cgroup = exec_env_->cgroups_mgr()->UniqueIdToCgroup(PrintId(query_id_, "_")); } @@ -114,34 +117,34 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { SCOPED_TIMER(profile()->total_time_counter()); // Register after setting runtime_state_ to ensure proper cleanup. - if (FLAGS_enable_rm && !cgroup.empty() && request.__isset.reserved_resource) { + if (FLAGS_enable_rm && !cgroup.empty() && request_has_reserved_resource) { bool is_first; RETURN_IF_ERROR(exec_env_->cgroups_mgr()->RegisterFragment( request.fragment_instance_ctx.fragment_instance_id, cgroup, &is_first)); // The first fragment using cgroup sets the cgroup's CPU shares based on the reserved // resource. if (is_first) { - DCHECK(request.__isset.reserved_resource); + DCHECK(request_has_reserved_resource); int32_t cpu_shares = exec_env_->cgroups_mgr()->VirtualCoresToCpuShares( - request.reserved_resource.v_cpu_cores); + request.fragment_instance_ctx.reserved_resource.v_cpu_cores); RETURN_IF_ERROR(exec_env_->cgroups_mgr()->SetCpuShares(cgroup, cpu_shares)); } } // TODO: Find the reservation id when the resource request is not set - if (FLAGS_enable_rm && request.__isset.reserved_resource) { + if (FLAGS_enable_rm && request_has_reserved_resource) { TUniqueId reservation_id; - reservation_id << request.reserved_resource.reservation_id; + reservation_id << request.fragment_instance_ctx.reserved_resource.reservation_id; // TODO: Combine this with RegisterFragment() etc. QueryResourceMgr* res_mgr; bool is_first = exec_env_->resource_broker()->GetQueryResourceMgr(query_id_, - reservation_id, request.local_resource_address, &res_mgr); + reservation_id, request.fragment_instance_ctx.local_resource_address, &res_mgr); DCHECK(res_mgr != NULL); runtime_state_->SetQueryResourceMgr(res_mgr); if (is_first) { runtime_state_->query_resource_mgr()->InitVcoreAcquisition( - request.reserved_resource.v_cpu_cores); + request.fragment_instance_ctx.reserved_resource.v_cpu_cores); } } @@ -155,9 +158,11 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { } int64_t rm_reservation_size_bytes = -1; - if (request.__isset.reserved_resource && request.reserved_resource.memory_mb > 0) { - rm_reservation_size_bytes = - static_cast<int64_t>(request.reserved_resource.memory_mb) * 1024L * 1024L; + if (request_has_reserved_resource && + request.fragment_instance_ctx.reserved_resource.memory_mb > 0) { + int64_t rm_reservation_size_mb = + static_cast<int64_t>(request.fragment_instance_ctx.reserved_resource.memory_mb); + rm_reservation_size_bytes = rm_reservation_size_mb * 1024L * 1024L; // Queries that use more than the hard limit will be killed, so it's not useful to // have a reservation larger than the hard limit. Clamp reservation bytes limit to the // hard limit (if it exists). @@ -171,8 +176,8 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { << PrettyPrinter::Print(rm_reservation_size_bytes, TUnit::BYTES); } - DCHECK(!params.request_pool.empty()); - runtime_state_->InitMemTrackers(query_id_, ¶ms.request_pool, + DCHECK(!fragment_instance_ctx.request_pool.empty()); + runtime_state_->InitMemTrackers(query_id_, &fragment_instance_ctx.request_pool, bytes_limit, rm_reservation_size_bytes); RETURN_IF_ERROR(runtime_state_->CreateBlockMgr()); @@ -197,25 +202,26 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { // set up desc tbl DescriptorTbl* desc_tbl = NULL; - DCHECK(request.__isset.desc_tbl); + DCHECK(request.__isset.query_ctx); + DCHECK(request.query_ctx.__isset.desc_tbl); RETURN_IF_ERROR( - DescriptorTbl::Create(obj_pool(), request.desc_tbl, &desc_tbl)); + DescriptorTbl::Create(obj_pool(), request.query_ctx.desc_tbl, &desc_tbl)); runtime_state_->set_desc_tbl(desc_tbl); VLOG_QUERY << "descriptor table for fragment=" << request.fragment_instance_ctx.fragment_instance_id << "\n" << desc_tbl->DebugString(); // set up plan - DCHECK(request.__isset.fragment); - RETURN_IF_ERROR(ExecNode::CreateTree(runtime_state_.get(), request.fragment.plan, - *desc_tbl, &plan_)); + DCHECK(request.__isset.fragment_ctx); + RETURN_IF_ERROR(ExecNode::CreateTree(runtime_state_.get(), + request.fragment_ctx.fragment.plan, *desc_tbl, &plan_)); runtime_state_->set_fragment_root_id(plan_->id()); - if (request.params.__isset.debug_node_id) { - DCHECK(request.params.__isset.debug_action); - DCHECK(request.params.__isset.debug_phase); - ExecNode::SetDebugOptions(request.params.debug_node_id, - request.params.debug_phase, request.params.debug_action, plan_); + if (fragment_instance_ctx.__isset.debug_node_id) { + DCHECK(fragment_instance_ctx.__isset.debug_action); + DCHECK(fragment_instance_ctx.__isset.debug_phase); + ExecNode::SetDebugOptions(fragment_instance_ctx.debug_node_id, + fragment_instance_ctx.debug_phase, fragment_instance_ctx.debug_action, plan_); } // set #senders of exchange nodes before calling Prepare() @@ -224,7 +230,7 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { for (ExecNode* exch_node: exch_nodes) { DCHECK_EQ(exch_node->type(), TPlanNodeType::EXCHANGE_NODE); - int num_senders = FindWithDefault(params.per_exch_num_senders, + int num_senders = FindWithDefault(fragment_instance_ctx.per_exch_num_senders, exch_node->id(), 0); DCHECK_GT(num_senders, 0); static_cast<ExchangeNode*>(exch_node)->set_num_senders(num_senders); @@ -237,7 +243,7 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { for (int i = 0; i < scan_nodes.size(); ++i) { ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]); const vector<TScanRangeParams>& scan_ranges = FindWithDefault( - params.per_node_scan_ranges, scan_node->id(), no_scan_ranges); + fragment_instance_ctx.per_node_scan_ranges, scan_node->id(), no_scan_ranges); scan_node->SetScanRanges(scan_ranges); } @@ -247,13 +253,14 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { RETURN_IF_ERROR(plan_->Prepare(runtime_state_.get())); } - PrintVolumeIds(params.per_node_scan_ranges); + PrintVolumeIds(fragment_instance_ctx.per_node_scan_ranges); // set up sink, if required - if (request.fragment.__isset.output_sink) { + if (request.fragment_ctx.fragment.__isset.output_sink) { RETURN_IF_ERROR(DataSink::CreateDataSink( - obj_pool(), request.fragment.output_sink, request.fragment.output_exprs, - params, row_desc(), &sink_)); + obj_pool(), request.fragment_ctx.fragment.output_sink, + request.fragment_ctx.fragment.output_exprs, + fragment_instance_ctx, row_desc(), &sink_)); RETURN_IF_ERROR(sink_->Prepare(runtime_state())); RuntimeProfile* sink_profile = sink_->profile(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/runtime/plan-fragment-executor.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h index 29250a3..082442e 100644 --- a/be/src/runtime/plan-fragment-executor.h +++ b/be/src/runtime/plan-fragment-executor.h @@ -40,7 +40,6 @@ class RuntimeState; class TRowBatch; class TPlanExecRequest; class TPlanFragment; -class TPlanFragmentExecParams; class TPlanExecParams; /// PlanFragmentExecutor handles all aspects of the execution of a single plan fragment, @@ -242,7 +241,7 @@ class PlanFragmentExecutor { ObjectPool* obj_pool() { return runtime_state_->obj_pool(); } - /// typedef for TPlanFragmentExecParams.per_node_scan_ranges + /// typedef for TPlanFragmentInstanceCtx.per_node_scan_ranges typedef std::map<TPlanNodeId, std::vector<TScanRangeParams> > PerNodeScanRanges; /// Main loop of profile reporting thread. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/runtime/runtime-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc index c970163..3701907 100644 --- a/be/src/runtime/runtime-state.cc +++ b/be/src/runtime/runtime-state.cc @@ -68,8 +68,8 @@ RuntimeState::RuntimeState(const TExecPlanFragmentParams& fragment_params, const string& cgroup, ExecEnv* exec_env) : obj_pool_(new ObjectPool()), fragment_params_(fragment_params), - now_(new TimestampValue(fragment_ctx().query_ctx.now_string.c_str(), - fragment_ctx().query_ctx.now_string.size())), + now_(new TimestampValue(query_ctx().now_string.c_str(), + query_ctx().now_string.size())), cgroup_(cgroup), codegen_expr_(false), profile_(obj_pool_.get(), @@ -77,7 +77,7 @@ RuntimeState::RuntimeState(const TExecPlanFragmentParams& fragment_params, is_cancelled_(false), query_resource_mgr_(NULL), root_node_id_(-1), - filter_bank_(new RuntimeFilterBank(fragment_ctx().query_ctx, this)) { + filter_bank_(new RuntimeFilterBank(query_ctx(), this)) { Status status = Init(exec_env); DCHECK(status.ok()) << status.GetDetail(); } @@ -93,8 +93,8 @@ RuntimeState::RuntimeState(const TQueryCtx& query_ctx) query_resource_mgr_(NULL), root_node_id_(-1), filter_bank_(new RuntimeFilterBank(query_ctx, this)) { - fragment_params_.fragment_instance_ctx.__set_query_ctx(query_ctx); - fragment_params_.fragment_instance_ctx.query_ctx.request.query_options + fragment_params_.__set_query_ctx(query_ctx); + fragment_params_.query_ctx.request.query_options .__set_batch_size(DEFAULT_BATCH_SIZE); } @@ -119,7 +119,7 @@ Status RuntimeState::Init(ExecEnv* exec_env) { SCOPED_TIMER(profile_.total_time_counter()); exec_env_ = exec_env; TQueryOptions& query_options = - fragment_params_.fragment_instance_ctx.query_ctx.request.query_options; + fragment_params_.query_ctx.request.query_options; // max_errors does not indicate how many errors in total have been recorded, but rather // how many are distinct. It is defined as the sum of the number of generic errors and http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/runtime/runtime-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index 15c8d9c..22cecaf 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -100,7 +100,7 @@ class RuntimeState { bool abort_on_default_limit_exceeded() const { return query_ctx().request.query_options.abort_on_default_limit_exceeded; } - const TQueryCtx& query_ctx() const { return fragment_ctx().query_ctx; } + const TQueryCtx& query_ctx() const { return fragment_params_.query_ctx; } const TPlanFragmentInstanceCtx& fragment_ctx() const { return fragment_params_.fragment_instance_ctx; } @@ -243,9 +243,9 @@ class RuntimeState { Status SetMemLimitExceeded(MemTracker* tracker = NULL, int64_t failed_allocation_size = 0, const ErrorMsg* msg = NULL); - /// Returns a non-OK status if query execution should stop (e.g., the query was cancelled - /// or a mem limit was exceeded). Exec nodes should check this periodically so execution - /// doesn't continue if the query terminates abnormally. + /// Returns a non-OK status if query execution should stop (e.g., the query was + /// cancelled or a mem limit was exceeded). Exec nodes should check this periodically so + /// execution doesn't continue if the query terminates abnormally. Status CheckQueryState(); QueryResourceMgr* query_resource_mgr() const { return query_resource_mgr_; } @@ -337,8 +337,8 @@ class RuntimeState { SpinLock query_status_lock_; Status query_status_; - /// Query-wide resource manager for resource expansion etc. Not owned by us; owned by the - /// ResourceBroker instead. + /// Query-wide resource manager for resource expansion etc. Not owned by us; owned by + /// the ResourceBroker instead. QueryResourceMgr* query_resource_mgr_; /// Reader contexts that need to be closed when the fragment is closed. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/runtime/test-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc index 39fc7cd..223ecad 100644 --- a/be/src/runtime/test-env.cc +++ b/be/src/runtime/test-env.cc @@ -61,8 +61,8 @@ TestEnv::~TestEnv() { RuntimeState* TestEnv::CreateRuntimeState(int64_t query_id) { TExecPlanFragmentParams plan_params = TExecPlanFragmentParams(); - plan_params.fragment_instance_ctx.query_ctx.query_id.hi = 0; - plan_params.fragment_instance_ctx.query_ctx.query_id.lo = query_id; + plan_params.query_ctx.query_id.hi = 0; + plan_params.query_ctx.query_id.lo = query_id; return new RuntimeState(plan_params, "", exec_env_.get()); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/scheduling/query-schedule.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h index 5c891a8..20f7dfe 100644 --- a/be/src/scheduling/query-schedule.h +++ b/be/src/scheduling/query-schedule.h @@ -42,7 +42,7 @@ typedef boost::unordered_map<TNetworkAddress, PerNodeScanRanges> FragmentScanRangeAssignment; /// execution parameters for a single fragment; used to assemble the -/// per-fragment instance TPlanFragmentExecParams; +/// TPlanFragmentInstanceCtx; /// hosts.size() == instance_ids.size() struct FragmentExecParams { std::vector<TNetworkAddress> hosts; // execution backends http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/service/fragment-exec-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/fragment-exec-state.cc b/be/src/service/fragment-exec-state.cc index 38866f1..e80cd78 100644 --- a/be/src/service/fragment-exec-state.cc +++ b/be/src/service/fragment-exec-state.cc @@ -77,8 +77,8 @@ void FragmentMgr::FragmentExecState::ReportStatusCb( TReportExecStatusParams params; params.protocol_version = ImpalaInternalServiceVersion::V1; - params.__set_query_id(fragment_instance_ctx_.query_ctx.query_id); - params.__set_fragment_instance_idx(fragment_instance_ctx_.fragment_instance_idx); + params.__set_query_id(query_ctx_.query_id); + params.__set_instance_state_idx( fragment_instance_ctx_.instance_state_idx); params.__set_fragment_instance_id(fragment_instance_ctx_.fragment_instance_id); exec_status.SetTStatus(¶ms); params.__set_done(done); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/service/fragment-exec-state.h ---------------------------------------------------------------------- diff --git a/be/src/service/fragment-exec-state.h b/be/src/service/fragment-exec-state.h index c940386..8c36198 100644 --- a/be/src/service/fragment-exec-state.h +++ b/be/src/service/fragment-exec-state.h @@ -29,7 +29,7 @@ namespace impala { class FragmentMgr::FragmentExecState { public: FragmentExecState(const TExecPlanFragmentParams& params, ExecEnv* exec_env) - : fragment_instance_ctx_(params.fragment_instance_ctx), + : query_ctx_(params.query_ctx), fragment_instance_ctx_(params.fragment_instance_ctx), executor_(exec_env, boost::bind<void>( boost::mem_fn(&FragmentMgr::FragmentExecState::ReportStatusCb), this, _1, _2, _3)), @@ -50,17 +50,13 @@ class FragmentMgr::FragmentExecState { /// Main loop of plan fragment execution. Blocks until execution finishes. void Exec(); - const TUniqueId& query_id() const { - return fragment_instance_ctx_.query_ctx.query_id; - } + const TUniqueId& query_id() const { return query_ctx_.query_id; } const TUniqueId& fragment_instance_id() const { return fragment_instance_ctx_.fragment_instance_id; } - const TNetworkAddress& coord_address() const { - return fragment_instance_ctx_.query_ctx.coord_address; - } + const TNetworkAddress& coord_address() const { return query_ctx_.coord_address; } /// Set the execution thread, taking ownership of the object. void set_exec_thread(Thread* exec_thread) { exec_thread_.reset(exec_thread); } @@ -69,6 +65,7 @@ class FragmentMgr::FragmentExecState { void PublishFilter(int32_t filter_id, const TBloomFilter& thrift_bloom_filter); private: + TQueryCtx query_ctx_; TPlanFragmentInstanceCtx fragment_instance_ctx_; PlanFragmentExecutor executor_; ImpalaBackendClientCache* client_cache_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/service/fragment-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/service/fragment-mgr.cc b/be/src/service/fragment-mgr.cc index f64ca7e..8472832 100644 --- a/be/src/service/fragment-mgr.cc +++ b/be/src/service/fragment-mgr.cc @@ -36,9 +36,9 @@ DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory Status FragmentMgr::ExecPlanFragment(const TExecPlanFragmentParams& exec_params) { VLOG_QUERY << "ExecPlanFragment() instance_id=" << exec_params.fragment_instance_ctx.fragment_instance_id - << " coord=" << exec_params.fragment_instance_ctx.query_ctx.coord_address + << " coord=" << exec_params.query_ctx.coord_address << " fragment instance#=" - << exec_params.fragment_instance_ctx.fragment_instance_idx; + << exec_params.fragment_instance_ctx.instance_state_idx; // Preparing and opening the fragment creates a thread and consumes a non-trivial // amount of memory. If we are already starved for memory, cancel the fragment as @@ -48,13 +48,13 @@ Status FragmentMgr::ExecPlanFragment(const TExecPlanFragmentParams& exec_params) string msg = Substitute("Instance $0 of plan fragment $1 of query $2 could not " "start because the backend Impala daemon is over its memory limit", PrintId(exec_params.fragment_instance_ctx.fragment_instance_id), - exec_params.fragment.display_name, - PrintId(exec_params.fragment_instance_ctx.query_ctx.query_id)); + exec_params.fragment_ctx.fragment.display_name, + PrintId(exec_params.query_ctx.query_id)); return process_mem_tracker->MemLimitExceeded(NULL, msg, 0); } // Remote fragments must always have a sink. Remove when IMPALA-2905 is resolved. - DCHECK(exec_params.fragment.__isset.output_sink); + DCHECK(exec_params.fragment_ctx.fragment.__isset.output_sink); shared_ptr<FragmentExecState> exec_state( new FragmentExecState(exec_params, ExecEnv::GetInstance())); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index e4a08ea..53e7dde 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -1087,7 +1087,7 @@ Status ImpalaServer::GetSessionState(const TUniqueId& session_id, void ImpalaServer::ReportExecStatus( TReportExecStatusResult& return_val, const TReportExecStatusParams& params) { VLOG_FILE << "ReportExecStatus() query_id=" << params.query_id - << " fragment instance#=" << params.fragment_instance_idx + << " fragment instance#=" << params.instance_state_idx << " instance_id=" << params.fragment_instance_id << " done=" << (params.done ? "true" : "false"); // TODO: implement something more efficient here, we're currently @@ -1104,7 +1104,7 @@ void ImpalaServer::ReportExecStatus( const string& err = Substitute("ReportExecStatus(): Received report for unknown " "query ID (probably closed or cancelled). (query_id: $0, backend: $1, instance:" " $2 done: $3)", PrintId(params.query_id), - params.fragment_instance_idx, PrintId(params.fragment_instance_id), + params.instance_state_idx, PrintId(params.fragment_instance_id), params.done); return_val.status.error_msgs.push_back(err); // TODO: Re-enable logging when this only happens once per fragment. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 55dd838..cfe0080 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -284,25 +284,19 @@ struct TQueryCtx { // results returned from multiple scan nodes are consistent. // This defaults to -1 when no timestamp is specified. 11: optional i64 snapshot_timestamp = -1; -} - -// Context of a fragment instance, including its unique id, the total number -// of fragment instances, the query context, the coordinator address, etc. -struct TPlanFragmentInstanceCtx { - // context of the query this fragment instance belongs to - 1: required TQueryCtx query_ctx - // the globally unique fragment instance id - 2: required Types.TUniqueId fragment_instance_id + // Contains only the union of those descriptors referenced by list of fragments destined + // for a single host. Optional for frontend tests. + 12: optional Descriptors.TDescriptorTable desc_tbl +} - // ordinal of this fragment instance, range [0, num_fragment_instances) - 3: required i32 per_fragment_instance_idx +// Context to collect information, which is shared among all instances of that plan +// fragment. +struct TPlanFragmentCtx { + 1: required Planner.TPlanFragment fragment // total number of instances of this fragment - 4: required i32 num_fragment_instances - - // Index of this fragment instance across all combined instances in this query. - 5: required i32 fragment_instance_idx + 2: required i32 num_fragment_instances } // A scan range plus the parameters needed to execute that scan. @@ -322,35 +316,53 @@ struct TPlanFragmentDestination { 2: required Types.TNetworkAddress server } -// Parameters for a single execution instance of a particular TPlanFragment +// Execution parameters of a fragment instance, including its unique id, the total number +// of fragment instances, the query context, the coordinator address, etc. // TODO: for range partitioning, we also need to specify the range boundaries -struct TPlanFragmentExecParams { - // initial scan ranges for each scan node in TPlanFragment.plan_tree - 1: required map<Types.TPlanNodeId, list<TScanRangeParams>> per_node_scan_ranges +struct TPlanFragmentInstanceCtx { + // the globally unique fragment instance id + 1: required Types.TUniqueId fragment_instance_id + + // Index of this fragment instance accross all instances of its parent fragment, + // range [0, TPlanFragmentCtx.num_fragment_instances). + 2: required i32 fragment_instance_idx + + // Index of this fragment instance in Coordinator::fragment_instance_states_. + 3: required i32 instance_state_idx + + // Initial scan ranges for each scan node in TPlanFragment.plan_tree + 4: required map<Types.TPlanNodeId, list<TScanRangeParams>> per_node_scan_ranges - // number of senders for ExchangeNodes contained in TPlanFragment.plan_tree; + // Number of senders for ExchangeNodes contained in TPlanFragment.plan_tree; // needed to create a DataStreamRecvr - 2: required map<Types.TPlanNodeId, i32> per_exch_num_senders + 5: required map<Types.TPlanNodeId, i32> per_exch_num_senders // Output destinations, one per output partition. // The partitioning of the output is specified by // TPlanFragment.output_sink.output_partition. // The number of output partitions is destinations.size(). - 3: list<TPlanFragmentDestination> destinations + 6: list<TPlanFragmentDestination> destinations // Debug options: perform some action in a particular phase of a particular node - 4: optional Types.TPlanNodeId debug_node_id - 5: optional PlanNodes.TExecNodePhase debug_phase - 6: optional PlanNodes.TDebugAction debug_action + 7: optional Types.TPlanNodeId debug_node_id + 8: optional PlanNodes.TExecNodePhase debug_phase + 9: optional PlanNodes.TDebugAction debug_action // The pool to which this request has been submitted. Used to update pool statistics // for admission control. - 7: optional string request_pool + 10: optional string request_pool // Id of this fragment in its role as a sender. - 8: optional i32 sender_id + 11: optional i32 sender_id + + // Resource reservation to run this plan fragment in. + 12: optional Llama.TAllocatedResource reserved_resource + + // Address of local node manager (used for expanding resource allocations) + 13: optional Types.TNetworkAddress local_resource_address } + // Service Protocol Details enum ImpalaInternalServiceVersion { @@ -363,25 +375,15 @@ enum ImpalaInternalServiceVersion { struct TExecPlanFragmentParams { 1: required ImpalaInternalServiceVersion protocol_version - // required in V1 - 2: optional Planner.TPlanFragment fragment + // Context of the query, which this fragment is part of. + 2: optional TQueryCtx query_ctx - // required in V1 - // Contains only those descriptors referenced by fragment's scan nodes and data sink - 3: optional Descriptors.TDescriptorTable desc_tbl + // Context of this fragment. + 3: optional TPlanFragmentCtx fragment_ctx - // required in V1 - 4: optional TPlanFragmentExecParams params - - // Context of this fragment, including its instance id, the total number fragment - // instances, the query context, etc. - 5: optional TPlanFragmentInstanceCtx fragment_instance_ctx - - // Resource reservation to run this plan fragment in. - 6: optional Llama.TAllocatedResource reserved_resource - - // Address of local node manager (used for expanding resource allocations) - 7: optional Types.TNetworkAddress local_resource_address + // Context of this fragment instance, including its instance id, the total number + // fragment instances, the query context, etc. + 4: optional TPlanFragmentInstanceCtx fragment_instance_ctx } struct TExecPlanFragmentResult { @@ -450,9 +452,10 @@ struct TReportExecStatusParams { // required in V1 2: optional Types.TUniqueId query_id - // passed into ExecPlanFragment() as TPlanFragmentInstanceCtx.backend_num // required in V1 - 3: optional i32 fragment_instance_idx + // Used to look up the fragment instance state in the coordinator, same value as + // TExecPlanFragmentParams.instance_state_idx. + 3: optional i32 instance_state_idx // required in V1 4: optional Types.TUniqueId fragment_instance_id http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/common/thrift/PlanNodes.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index d095047..6db725c 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -16,7 +16,7 @@ // This file contains all structs, enums, etc., that together make up // a plan tree. All information recorded in struct TPlan and below is independent // of the execution parameters of any one of the backends on which it is running -// (those are recorded in TPlanFragmentExecParams). +// (those are recorded in TPlanFragmentInstanceCtx). namespace cpp impala namespace java com.cloudera.impala.thrift
