http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index bede0ef..741da32 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -56,10 +56,13 @@ #include "runtime/hdfs-fs-cache.h" #include "runtime/mem-tracker.h" #include "runtime/parallel-executor.h" -#include "runtime/plan-fragment-executor.h" #include "runtime/query-exec-mgr.h" #include "runtime/row-batch.h" #include "runtime/tuple-row.h" +#include "runtime/coordinator-filter-state.h" +#include "runtime/coordinator-backend-state.h" +#include "runtime/debug-options.h" +#include "runtime/query-state.h" #include "scheduling/scheduler.h" #include "util/bloom-filter.h" #include "util/container-util.h" @@ -78,7 +81,6 @@ using namespace apache::thrift; using namespace strings; -namespace accumulators = boost::accumulators; using boost::algorithm::iequals; using boost::algorithm::is_any_of; using boost::algorithm::join; @@ -98,336 +100,17 @@ namespace impala { // Maximum number of fragment instances that can publish each broadcast filter. static const int MAX_BROADCAST_FILTER_PRODUCERS = 3; -// container for debug options in TPlanFragmentInstanceCtx (debug_node, debug_action, -// debug_phase) -struct DebugOptions { - int instance_state_idx; - int node_id; - TDebugAction::type action; - TExecNodePhase::type phase; // INVALID: debug options invalid - - DebugOptions() - : instance_state_idx(-1), node_id(-1), action(TDebugAction::WAIT), - phase(TExecNodePhase::INVALID) {} - - // If these debug options apply to the candidate fragment instance, returns true - // otherwise returns false. - bool IsApplicable(int candidate_instance_state_idx) { - if (phase == TExecNodePhase::INVALID) return false; - return (instance_state_idx == -1 || - instance_state_idx == candidate_instance_state_idx); - } -}; - -/// Execution state of a particular fragment instance. -/// -/// Concurrent accesses: -/// - updates through UpdateFragmentExecStatus() -class Coordinator::InstanceState { - public: - InstanceState(const FInstanceExecParams& params, ObjectPool* obj_pool) - : exec_params_(params), - total_split_size_(0), - profile_(nullptr), - total_ranges_complete_(0), - rpc_latency_(0), - rpc_sent_(false), - done_(false), - profile_created_(false) { - const string& profile_name = Substitute("Instance $0 (host=$1)", - PrintId(params.instance_id), lexical_cast<string>(params.host)); - profile_ = obj_pool->Add(new RuntimeProfile(obj_pool, profile_name)); - } - - /// Called to set the initial status of the fragment instance after the - /// ExecRemoteFragment() RPC has returned. If 'rpc_sent' is true, - /// CancelFragmentInstances() will include this instance in the set of potential - /// fragment instances to cancel. - void SetInitialStatus(const Status& status, bool rpc_sent) { - DCHECK(!rpc_sent_); - rpc_sent_ = rpc_sent; - status_ = status; - if (!status_.ok()) return; - stopwatch_.Start(); - } - - /// Computes sum of split sizes of leftmost scan. - void ComputeTotalSplitSize(const PerNodeScanRanges& per_node_scan_ranges); - - /// Updates the total number of scan ranges complete for this fragment. Returns the - /// delta since the last time this was called. Not thread-safe without lock() being - /// acquired by the caller. - int64_t UpdateNumScanRangesCompleted(); - - // The following getters do not require lock() to be held. - const TUniqueId& fragment_instance_id() const { return exec_params_.instance_id; } - FragmentIdx fragment_idx() const { return exec_params_.fragment().idx; } - MonotonicStopWatch* stopwatch() { return &stopwatch_; } - const TNetworkAddress& impalad_address() const { return exec_params_.host; } - int64_t total_split_size() const { return total_split_size_; } - bool done() const { return done_; } - int per_fragment_instance_idx() const { return exec_params_.per_fragment_instance_idx; } - bool rpc_sent() const { return rpc_sent_; } - int64_t rpc_latency() const { return rpc_latency_; } - - mutex* lock() { return &lock_; } - - void set_status(const Status& status) { status_ = status; } - void set_done(bool done) { done_ = done; } - void set_rpc_latency(int64_t millis) { - DCHECK_EQ(rpc_latency_, 0); - rpc_latency_ = millis; - } - - // Return values of the following functions must be accessed with lock() held - RuntimeProfile* profile() const { return profile_; } - void set_profile(RuntimeProfile* profile) { profile_ = profile; } - FragmentInstanceCounters* aggregate_counters() { return &aggregate_counters_; } - ErrorLogMap* error_log() { return &error_log_; } - Status* status() { return &status_; } - - /// Registers that the fragment instance's profile has been created and initially - /// populated. Returns whether the profile had already been initialised so that callers - /// can tell if they are the first to do so. Not thread-safe. - bool SetProfileCreated() { - bool cur = profile_created_; - profile_created_ = true; - return cur; - } - - private: - const FInstanceExecParams& exec_params_; - - /// Wall clock timer for this fragment. - MonotonicStopWatch stopwatch_; - - /// Summed across all splits; in bytes. - int64_t total_split_size_; - - /// Protects fields below. Can be held while doing an RPC, so SpinLock is a bad idea. - /// lock ordering: Coordinator::lock_ must only be obtained *prior* to lock_ - mutex lock_; - - /// If the status indicates an error status, execution of this fragment has either been - /// aborted by the executing impalad (which then reported the error) or cancellation has - /// been initiated; either way, execution must not be cancelled. - Status status_; - - /// Owned by coordinator object pool provided in the c'tor - RuntimeProfile* profile_; - - /// Errors reported by this fragment instance. - ErrorLogMap error_log_; - - /// Total scan ranges complete across all scan nodes. - int64_t total_ranges_complete_; - - /// Summary counters aggregated across the duration of execution. - FragmentInstanceCounters aggregate_counters_; - - /// Time, in ms, that it took to execute the ExecRemoteFragment() RPC. - int64_t rpc_latency_; - - /// If true, ExecPlanFragment() rpc has been sent - even if it was not determined to be - /// successful. - bool rpc_sent_; - - /// If true, execution terminated; do not cancel in that case. - bool done_; - - /// True after the first call to profile->Update() - bool profile_created_; -}; - -/// Represents a runtime filter target. -struct Coordinator::FilterTarget { - TPlanNodeId node_id; - bool is_local; - bool is_bound_by_partition_columns; - - // indices into fragment_instance_states_ - unordered_set<int> fragment_instance_state_idxs; - - FilterTarget(const TRuntimeFilterTargetDesc& tFilterTarget) { - node_id = tFilterTarget.node_id; - is_bound_by_partition_columns = tFilterTarget.is_bound_by_partition_columns; - is_local = tFilterTarget.is_local_target; - } -}; - - -/// State of filters that are received for aggregation. -/// -/// A broadcast join filter is published as soon as the first update is received for it -/// and subsequent updates are ignored (as they will be the same). -/// Updates for a partitioned join filter are aggregated in 'bloom_filter' and this is -/// published once 'pending_count' reaches 0 and if the filter was not disabled before -/// that. -/// -/// A filter is disabled if an always_true filter update is received, an OOM is hit, -/// filter aggregation is complete or if the query is complete. -/// Once a filter is disabled, subsequent updates for that filter are ignored. -class Coordinator::FilterState { - public: - FilterState(const TRuntimeFilterDesc& desc, const TPlanNodeId& src) : desc_(desc), - src_(src), pending_count_(0), first_arrival_time_(0L), completion_time_(0L), - disabled_(false) { } - - TBloomFilter* bloom_filter() { return bloom_filter_.get(); } - boost::unordered_set<int>* src_fragment_instance_state_idxs() { - return &src_fragment_instance_state_idxs_; - } - const boost::unordered_set<int>& src_fragment_instance_state_idxs() const { - return src_fragment_instance_state_idxs_; - } - std::vector<FilterTarget>* targets() { return &targets_; } - const std::vector<FilterTarget>& targets() const { return targets_; } - int64_t first_arrival_time() const { return first_arrival_time_; } - int64_t completion_time() const { return completion_time_; } - const TPlanNodeId& src() const { return src_; } - const TRuntimeFilterDesc& desc() const { return desc_; } - int pending_count() const { return pending_count_; } - void set_pending_count(int pending_count) { pending_count_ = pending_count; } - bool disabled() const { return disabled_; } - - /// Aggregates partitioned join filters and updates memory consumption. - /// Disables filter if always_true filter is received or OOM is hit. - void ApplyUpdate(const TUpdateFilterParams& params, Coordinator* coord); - - /// Disables a filter. A disabled filter consumes no memory. - void Disable(MemTracker* tracker); - - private: - /// Contains the specification of the runtime filter. - TRuntimeFilterDesc desc_; - - TPlanNodeId src_; - std::vector<FilterTarget> targets_; - - // Index into fragment_instance_states_ for source fragment instances. - boost::unordered_set<int> src_fragment_instance_state_idxs_; - - /// Number of remaining backends to hear from before filter is complete. - int pending_count_; - - /// BloomFilter aggregated from all source plan nodes, to be broadcast to all - /// destination plan fragment instances. Owned by this object so that it can be - /// deallocated once finished with. Only set for partitioned joins (broadcast joins - /// need no aggregation). - /// In order to avoid memory spikes, an incoming filter is moved (vs. copied) to the - /// output structure in the case of a broadcast join. Similarly, for partitioned joins, - /// the filter is moved from the following member to the output structure. - std::unique_ptr<TBloomFilter> bloom_filter_; - - /// Time at which first local filter arrived. - int64_t first_arrival_time_; - - /// Time at which all local filters arrived. - int64_t completion_time_; - - /// True if the filter is permanently disabled for this query. - bool disabled_; - - /// TODO: Add a per-object lock so that we can avoid holding the global filter_lock_ - /// for every filter update. - -}; - -void Coordinator::InstanceState::ComputeTotalSplitSize( - const PerNodeScanRanges& per_node_scan_ranges) { - total_split_size_ = 0; - - for (const PerNodeScanRanges::value_type& entry: per_node_scan_ranges) { - for (const TScanRangeParams& scan_range_params: entry.second) { - if (!scan_range_params.scan_range.__isset.hdfs_file_split) continue; - total_split_size_ += scan_range_params.scan_range.hdfs_file_split.length; - } - } -} - -int64_t Coordinator::InstanceState::UpdateNumScanRangesCompleted() { - int64_t total = 0; - CounterMap& complete = aggregate_counters_.scan_ranges_complete_counters; - for (CounterMap::iterator i = complete.begin(); i != complete.end(); ++i) { - total += i->second->value(); - } - int64_t delta = total - total_ranges_complete_; - total_ranges_complete_ = total; - DCHECK_GE(delta, 0); - return delta; -} - -Coordinator::Coordinator(const QuerySchedule& schedule, ExecEnv* exec_env, - RuntimeProfile::EventSequence* events) +Coordinator::Coordinator( + const QuerySchedule& schedule, RuntimeProfile::EventSequence* events) : schedule_(schedule), - exec_env_(exec_env), - has_called_wait_(false), - returned_all_results_(false), - query_state_(nullptr), - num_remaining_fragment_instances_(0), - obj_pool_(new ObjectPool()), - query_events_(events), - filter_routing_table_complete_(false), filter_mode_(schedule.query_options().runtime_filter_mode), - torn_down_(false) {} + obj_pool_(new ObjectPool()), + query_events_(events) {} Coordinator::~Coordinator() { DCHECK(torn_down_) << "TearDown() must be called before Coordinator is destroyed"; } -PlanFragmentExecutor* Coordinator::executor() { - return (coord_instance_ == nullptr) ? nullptr : coord_instance_->executor(); -} - -TExecNodePhase::type GetExecNodePhase(const string& key) { - map<int, const char*>::const_iterator entry = - _TExecNodePhase_VALUES_TO_NAMES.begin(); - for (; entry != _TExecNodePhase_VALUES_TO_NAMES.end(); ++entry) { - if (iequals(key, (*entry).second)) { - return static_cast<TExecNodePhase::type>(entry->first); - } - } - return TExecNodePhase::INVALID; -} - -TDebugAction::type GetDebugAction(const string& key) { - map<int, const char*>::const_iterator entry = - _TDebugAction_VALUES_TO_NAMES.begin(); - for (; entry != _TDebugAction_VALUES_TO_NAMES.end(); ++entry) { - if (iequals(key, (*entry).second)) { - return static_cast<TDebugAction::type>(entry->first); - } - } - return TDebugAction::WAIT; -} - -static void ProcessQueryOptions( - const TQueryOptions& query_options, DebugOptions* debug_options) { - DCHECK(debug_options != NULL); - if (!query_options.__isset.debug_action || query_options.debug_action.empty()) { - debug_options->phase = TExecNodePhase::INVALID; // signal not set - return; - } - vector<string> components; - split(components, query_options.debug_action, is_any_of(":"), token_compress_on); - if (components.size() < 3 || components.size() > 4) return; - if (components.size() == 3) { - debug_options->instance_state_idx = -1; - debug_options->node_id = atoi(components[0].c_str()); - debug_options->phase = GetExecNodePhase(components[1]); - debug_options->action = GetDebugAction(components[2]); - } else { - debug_options->instance_state_idx = atoi(components[0].c_str()); - debug_options->node_id = atoi(components[1].c_str()); - debug_options->phase = GetExecNodePhase(components[2]); - debug_options->action = GetDebugAction(components[3]); - } - DCHECK(!(debug_options->phase == TExecNodePhase::CLOSE && - debug_options->action == TDebugAction::WAIT)) - << "Do not use CLOSE:WAIT debug actions " - << "because nodes cannot be cancelled in Close()"; -} - Status Coordinator::Exec() { const TQueryExecRequest& request = schedule_.request(); DCHECK(request.plan_exec_info.size() > 0); @@ -438,19 +121,21 @@ Status Coordinator::Exec() { VLOG_QUERY << "Exec() query_id=" << schedule_.query_id() << " stmt=" << request.query_ctx.client_request.stmt; stmt_type_ = request.stmt_type; - query_id_ = schedule_.query_id(); - desc_tbl_ = request.desc_tbl; query_ctx_ = request.query_ctx; + // set descriptor table here globally + // TODO: remove TQueryExecRequest.desc_tbl + query_ctx_.__set_desc_tbl(request.desc_tbl); + query_ctx_.__set_request_pool(schedule_.request_pool()); query_profile_.reset( - new RuntimeProfile(obj_pool(), "Execution Profile " + PrintId(query_id_))); + new RuntimeProfile(obj_pool(), "Execution Profile " + PrintId(query_id()))); finalization_timer_ = ADD_TIMER(query_profile_, "FinalizationTimer"); filter_updates_received_ = ADD_COUNTER(query_profile_, "FiltersReceived", TUnit::UNIT); SCOPED_TIMER(query_profile_->total_time_counter()); // initialize progress updater - const string& str = Substitute("Query $0", PrintId(query_id_)); + const string& str = Substitute("Query $0", PrintId(query_id())); progress_.Init(str, schedule_.num_scan_ranges()); // runtime filters not yet supported for mt execution @@ -460,182 +145,279 @@ Status Coordinator::Exec() { // to keep things simple, make async Cancel() calls wait until plan fragment // execution has been initiated, otherwise we might try to cancel fragment // execution at Impala daemons where it hasn't even started + // TODO: revisit this, it may not be true anymore lock_guard<mutex> l(lock_); - query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState( - query_ctx_, schedule_.request_pool()); + query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx_); filter_mem_tracker_.reset(new MemTracker( -1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), false)); - InitExecProfiles(); - InitExecSummary(); - StartFInstances(); + InitFragmentStats(); + // create BackendStates and per-instance state, including profiles, and install + // the latter in the FragmentStats' root profile + InitBackendStates(); + exec_summary_.Init(schedule_); - // In the error case, it's safe to return and not to get coord_sink_ here to close - if - // there was an error, but the coordinator fragment was successfully started, it should - // cancel itself when it receives an error status after reporting its profile. - RETURN_IF_ERROR(FinishInstanceStartup()); + // TODO-MT: populate the runtime filter routing table + // This requires local aggregation of filters prior to sending + // for broadcast joins in order to avoid more complicated merge logic here. - // Grab executor and wait until Prepare() has finished so that runtime state etc. will - // be set up. Must do this here in order to get a reference to coord_instance_ - // so that coord_sink_ remains valid throughout query lifetime. + if (filter_mode_ != TRuntimeFilterMode::OFF) { + DCHECK_EQ(request.plan_exec_info.size(), 1); + // Populate the runtime filter routing table. This should happen before starting the + // fragment instances. This code anticipates the indices of the instance states + // created later on in ExecRemoteFragment() + InitFilterRoutingTable(); + } + + // At this point, all static setup is done and all structures are initialized. + // Only runtime-related state changes past this point (examples: + // num_remaining_backends_, fragment instance profiles, etc.) + + StartBackendExec(); + RETURN_IF_ERROR(FinishBackendStartup()); + + // set coord_instance_ and coord_sink_ if (schedule_.GetCoordFragment() != nullptr) { - coord_instance_ = query_state_->GetFInstanceState(query_id_); + // this blocks until all fragment instances have finished their Prepare phase + coord_instance_ = query_state_->GetFInstanceState(query_id()); if (coord_instance_ == nullptr) { - // Coordinator instance might have failed and unregistered itself even - // though it was successfully started (e.g. Prepare() might have failed). - InstanceState* coord_state = fragment_instance_states_[0]; - DCHECK(coord_state != nullptr); - lock_guard<mutex> instance_state_lock(*coord_state->lock()); - // Try and return the fragment instance status if it was already set. - // TODO: Consider waiting for coord_state->done() here. - RETURN_IF_ERROR(*coord_state->status()); - return Status( - Substitute("Coordinator fragment instance ($0) failed", PrintId(query_id_))); + // at this point, the query is done with the Prepare phase, and we expect + // to have a coordinator instance, but coord_instance_ == nullptr, + // which means we failed Prepare + Status prepare_status = query_state_->WaitForPrepare(); + DCHECK(!prepare_status.ok()); + return prepare_status; } - // When WaitForPrepare() returns OK(), the executor's root sink will be set up. At - // that point, the coordinator must be sure to call root_sink()->CloseConsumer(); the + // When GetFInstanceState() returns the coordinator instance, the Prepare phase + // is done and the FragmentInstanceState's root sink will be set up. At that point, + // the coordinator must be sure to call root_sink()->CloseConsumer(); the // fragment instance's executor will not complete until that point. + // TODO: what does this mean? // TODO: Consider moving this to Wait(). - Status prepare_status = executor()->WaitForPrepare(); - coord_sink_ = executor()->root_sink(); - RETURN_IF_ERROR(prepare_status); + // TODO: clarify need for synchronization on this event + DCHECK(coord_instance_->IsPrepared() && coord_instance_->WaitForPrepare().ok()); + coord_sink_ = coord_instance_->root_sink(); DCHECK(coord_sink_ != nullptr); } - PrintFragmentInstanceInfo(); return Status::OK(); } -void Coordinator::UpdateFilterRoutingTable(const FragmentExecParams& fragment_params) { +void Coordinator::InitFragmentStats() { + vector<const TPlanFragment*> fragments; + schedule_.GetTPlanFragments(&fragments); + const TPlanFragment* coord_fragment = schedule_.GetCoordFragment(); + + for (const TPlanFragment* fragment: fragments) { + string root_profile_name = + Substitute( + fragment == coord_fragment ? "Coordinator Fragment $0" : "Fragment $0", + fragment->display_name); + string avg_profile_name = + Substitute("Averaged Fragment $0", fragment->display_name); + int num_instances = + schedule_.GetFragmentExecParams(fragment->idx).instance_exec_params.size(); + // TODO: special-case the coordinator fragment? + FragmentStats* fragment_stats = obj_pool()->Add( + new FragmentStats( + avg_profile_name, root_profile_name, num_instances, obj_pool())); + fragment_stats_.push_back(fragment_stats); + query_profile_->AddChild(fragment_stats->avg_profile(), true); + query_profile_->AddChild(fragment_stats->root_profile()); + } +} + +void Coordinator::InitBackendStates() { + int num_backends = schedule_.unique_hosts().size(); + DCHECK_GT(num_backends, 0); + backend_states_.resize(num_backends); + + // collect the FInstanceExecParams for each host + typedef map<TNetworkAddress, vector<const FInstanceExecParams*>> BackendParamsMap; + BackendParamsMap backend_params_map; + for (const FragmentExecParams& fragment_params: schedule_.fragment_exec_params()) { + for (const FInstanceExecParams& instance_params: + fragment_params.instance_exec_params) { + backend_params_map[instance_params.host].push_back(&instance_params); + } + } + + // create BackendStates + bool has_coord_fragment = schedule_.GetCoordFragment() != nullptr; + const TNetworkAddress& coord_address = ExecEnv::GetInstance()->backend_address(); + int backend_idx = 0; + for (const auto& entry: backend_params_map) { + if (has_coord_fragment && coord_address == entry.first) { + coord_backend_idx_ = backend_idx; + } + BackendState* backend_state = obj_pool()->Add( + new BackendState(query_id(), backend_idx, filter_mode_)); + backend_state->Init(entry.second, fragment_stats_, obj_pool()); + backend_states_[backend_idx++] = backend_state; + } + DCHECK(!has_coord_fragment || coord_backend_idx_ != -1); +} + +void Coordinator::ExecSummary::Init(const QuerySchedule& schedule) { + const TQueryExecRequest& request = schedule.request(); + // init exec_summary_.{nodes, exch_to_sender_map} + thrift_exec_summary.__isset.nodes = true; + DCHECK(thrift_exec_summary.nodes.empty()); + for (const TPlanExecInfo& plan_exec_info: request.plan_exec_info) { + for (const TPlanFragment& fragment: plan_exec_info.fragments) { + if (!fragment.__isset.plan) continue; + + // eventual index of fragment's root node in exec_summary_.nodes + int root_node_idx = thrift_exec_summary.nodes.size(); + + const TPlan& plan = fragment.plan; + int num_instances = + schedule.GetFragmentExecParams(fragment.idx).instance_exec_params.size(); + for (const TPlanNode& node: plan.nodes) { + node_id_to_idx_map[node.node_id] = thrift_exec_summary.nodes.size(); + thrift_exec_summary.nodes.emplace_back(); + TPlanNodeExecSummary& node_summary = thrift_exec_summary.nodes.back(); + node_summary.__set_node_id(node.node_id); + node_summary.__set_fragment_idx(fragment.idx); + node_summary.__set_label(node.label); + node_summary.__set_label_detail(node.label_detail); + node_summary.__set_num_children(node.num_children); + if (node.__isset.estimated_stats) { + node_summary.__set_estimated_stats(node.estimated_stats); + } + node_summary.exec_stats.resize(num_instances); + } + + if (fragment.__isset.output_sink + && fragment.output_sink.type == TDataSinkType::DATA_STREAM_SINK) { + const TDataStreamSink& sink = fragment.output_sink.stream_sink; + int exch_idx = node_id_to_idx_map[sink.dest_node_id]; + if (sink.output_partition.type == TPartitionType::UNPARTITIONED) { + thrift_exec_summary.nodes[exch_idx].__set_is_broadcast(true); + } + thrift_exec_summary.__isset.exch_to_sender_map = true; + thrift_exec_summary.exch_to_sender_map[exch_idx] = root_node_idx; + } + } + } +} + +void Coordinator::InitFilterRoutingTable() { DCHECK(schedule_.request().query_ctx.client_request.query_options.mt_dop == 0); - int num_hosts = fragment_params.instance_exec_params.size(); - DCHECK_GT(num_hosts, 0); DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF) - << "UpdateFilterRoutingTable() called although runtime filters are disabled"; + << "InitFilterRoutingTable() called although runtime filters are disabled"; DCHECK(!filter_routing_table_complete_) - << "UpdateFilterRoutingTable() called after setting filter_routing_table_complete_"; + << "InitFilterRoutingTable() called after setting filter_routing_table_complete_"; - for (const TPlanNode& plan_node: fragment_params.fragment.plan.nodes) { - if (!plan_node.__isset.runtime_filters) continue; - for (const TRuntimeFilterDesc& filter: plan_node.runtime_filters) { - if (filter_mode_ == TRuntimeFilterMode::LOCAL && !filter.has_local_targets) { - continue; - } - FilterRoutingTable::iterator i = filter_routing_table_.emplace( - filter.filter_id, FilterState(filter, plan_node.node_id)).first; - FilterState* f = &(i->second); - if (plan_node.__isset.hash_join_node) { - // Set the 'pending_count_' to zero to indicate that for a filter with local-only - // targets the coordinator does not expect to receive any filter updates. - int pending_count = filter.is_broadcast_join ? - (filter.has_remote_targets ? 1 : 0) : num_hosts; - f->set_pending_count(pending_count); - vector<int> src_idxs = fragment_params.GetInstanceIdxs(); - - // If this is a broadcast join with only non-local targets, build and publish it - // on MAX_BROADCAST_FILTER_PRODUCERS instances. If this is not a broadcast join - // or it is a broadcast join with local targets, it should be generated - // everywhere the join is executed. - if (filter.is_broadcast_join && !filter.has_local_targets - && num_hosts > MAX_BROADCAST_FILTER_PRODUCERS) { - random_shuffle(src_idxs.begin(), src_idxs.end()); - src_idxs.resize(MAX_BROADCAST_FILTER_PRODUCERS); - } - f->src_fragment_instance_state_idxs()->insert(src_idxs.begin(), src_idxs.end()); - } else if (plan_node.__isset.hdfs_scan_node) { - auto it = filter.planid_to_target_ndx.find(plan_node.node_id); - DCHECK(it != filter.planid_to_target_ndx.end()); - const TRuntimeFilterTargetDesc& tFilterTarget = filter.targets[it->second]; - if (filter_mode_ == TRuntimeFilterMode::LOCAL && !tFilterTarget.is_local_target) { + for (const FragmentExecParams& fragment_params: schedule_.fragment_exec_params()) { + int num_instances = fragment_params.instance_exec_params.size(); + DCHECK_GT(num_instances, 0); + + for (const TPlanNode& plan_node: fragment_params.fragment.plan.nodes) { + if (!plan_node.__isset.runtime_filters) continue; + for (const TRuntimeFilterDesc& filter: plan_node.runtime_filters) { + if (filter_mode_ == TRuntimeFilterMode::LOCAL && !filter.has_local_targets) { continue; } - vector<int> idxs = fragment_params.GetInstanceIdxs(); - FilterTarget target(tFilterTarget); - target.fragment_instance_state_idxs.insert(idxs.begin(), idxs.end()); - f->targets()->push_back(target); - } else { - DCHECK(false) << "Unexpected plan node with runtime filters: " - << ThriftDebugString(plan_node); + FilterRoutingTable::iterator i = filter_routing_table_.emplace( + filter.filter_id, FilterState(filter, plan_node.node_id)).first; + FilterState* f = &(i->second); + + // source plan node of filter + if (plan_node.__isset.hash_join_node) { + // Set the 'pending_count_' to zero to indicate that for a filter with + // local-only targets the coordinator does not expect to receive any filter + // updates. + int pending_count = filter.is_broadcast_join + ? (filter.has_remote_targets ? 1 : 0) : num_instances; + f->set_pending_count(pending_count); + + // determine source instances + // TODO: store this in FInstanceExecParams, not in FilterState + vector<int> src_idxs = fragment_params.GetInstanceIdxs(); + + // If this is a broadcast join with only non-local targets, build and publish it + // on MAX_BROADCAST_FILTER_PRODUCERS instances. If this is not a broadcast join + // or it is a broadcast join with local targets, it should be generated + // everywhere the join is executed. + if (filter.is_broadcast_join && !filter.has_local_targets + && num_instances > MAX_BROADCAST_FILTER_PRODUCERS) { + random_shuffle(src_idxs.begin(), src_idxs.end()); + src_idxs.resize(MAX_BROADCAST_FILTER_PRODUCERS); + } + f->src_fragment_instance_idxs()->insert(src_idxs.begin(), src_idxs.end()); + + // target plan node of filter + } else if (plan_node.__isset.hdfs_scan_node) { + auto it = filter.planid_to_target_ndx.find(plan_node.node_id); + DCHECK(it != filter.planid_to_target_ndx.end()); + const TRuntimeFilterTargetDesc& t_target = filter.targets[it->second]; + if (filter_mode_ == TRuntimeFilterMode::LOCAL && !t_target.is_local_target) { + continue; + } + f->targets()->emplace_back(t_target, fragment_params.fragment.idx); + } else { + DCHECK(false) << "Unexpected plan node with runtime filters: " + << ThriftDebugString(plan_node); + } } } } -} -void Coordinator::StartFInstances() { - int num_fragment_instances = schedule_.GetNumFragmentInstances(); - DCHECK_GT(num_fragment_instances, 0); - - fragment_instance_states_.resize(num_fragment_instances); - exec_complete_barrier_.reset(new CountingBarrier(num_fragment_instances)); - num_remaining_fragment_instances_ = num_fragment_instances; + query_profile_->AddInfoString( + "Number of filters", Substitute("$0", filter_routing_table_.size())); + query_profile_->AddInfoString("Filter routing table", FilterDebugString()); + if (VLOG_IS_ON(2)) VLOG_QUERY << FilterDebugString(); + filter_routing_table_complete_ = true; +} - DebugOptions debug_options; - ProcessQueryOptions(schedule_.query_options(), &debug_options); - const TQueryExecRequest& request = schedule_.request(); +void Coordinator::StartBackendExec() { + int num_backends = backend_states_.size(); + exec_complete_barrier_.reset(new CountingBarrier(num_backends)); + num_remaining_backends_ = num_backends; - VLOG_QUERY << "starting " << num_fragment_instances << " fragment instances for query " - << query_id_; - query_events_->MarkEvent( - Substitute("Ready to start $0 fragment instances", num_fragment_instances)); + DebugOptions debug_options(schedule_.query_options()); - // TODO-MT: populate the runtime filter routing table - // This requires local aggregation of filters prior to sending - // for broadcast joins in order to avoid more complicated merge logic here. + VLOG_QUERY << "starting execution on " << num_backends << " backends for query " + << query_id(); + query_events_->MarkEvent(Substitute("Ready to start on $0 backends", num_backends)); - if (filter_mode_ != TRuntimeFilterMode::OFF) { - DCHECK_EQ(request.plan_exec_info.size(), 1); - // Populate the runtime filter routing table. This should happen before starting the - // fragment instances. This code anticipates the indices of the instance states - // created later on in ExecRemoteFragment() - for (const FragmentExecParams& fragment_params: schedule_.fragment_exec_params()) { - UpdateFilterRoutingTable(fragment_params); - } - MarkFilterRoutingTableComplete(); + for (BackendState* backend_state: backend_states_) { + ExecEnv::GetInstance()->exec_rpc_thread_pool()->Offer( + [backend_state, this, &debug_options]() { + backend_state->Exec(query_ctx_, debug_options, filter_routing_table_, + exec_complete_barrier_.get()); + }); } - int num_instances = 0; - for (const FragmentExecParams& fragment_params: schedule_.fragment_exec_params()) { - num_instances += fragment_params.instance_exec_params.size(); - for (const FInstanceExecParams& instance_params: - fragment_params.instance_exec_params) { - InstanceState* exec_state = obj_pool()->Add( - new InstanceState(instance_params, obj_pool())); - int instance_state_idx = GetInstanceIdx(instance_params.instance_id); - fragment_instance_states_[instance_state_idx] = exec_state; - - DebugOptions* instance_debug_options = - debug_options.IsApplicable(instance_state_idx) ? &debug_options : NULL; - exec_env_->fragment_exec_thread_pool()->Offer( - std::bind(&Coordinator::ExecRemoteFInstance, - this, std::cref(instance_params), instance_debug_options)); - } - } exec_complete_barrier_->Wait(); - VLOG_QUERY << "started " << num_fragment_instances << " fragment instances for query " - << query_id_; + VLOG_QUERY << "started execution on " << num_backends << " backends for query " + << query_id(); query_events_->MarkEvent( - Substitute("All $0 fragment instances started", num_instances)); + Substitute("All $0 execution backends ($1 fragment instances) started", + num_backends, schedule_.GetNumFragmentInstances())); } -Status Coordinator::FinishInstanceStartup() { +Status Coordinator::FinishBackendStartup() { Status status = Status::OK(); const TMetricDef& def = - MakeTMetricDef("fragment-latencies", TMetricKind::HISTOGRAM, TUnit::TIME_MS); + MakeTMetricDef("backend-startup-latencies", TMetricKind::HISTOGRAM, TUnit::TIME_MS); HistogramMetric latencies(def, 20000, 3); - for (InstanceState* exec_state: fragment_instance_states_) { - lock_guard<mutex> l(*exec_state->lock()); - // Preserve the first non-OK status, if there is one - if (status.ok()) status = *exec_state->status(); - latencies.Update(exec_state->rpc_latency()); + for (BackendState* backend_state: backend_states_) { + // preserve the first non-OK, if there is one + Status backend_status = backend_state->GetStatus(); + if (!backend_status.ok() && status.ok()) status = backend_status; + latencies.Update(backend_state->rpc_latency()); } query_profile_->AddInfoString( - "Fragment instance start latencies", latencies.ToHumanReadable()); + "Backend startup latencies", latencies.ToHumanReadable()); if (!status.ok()) { - DCHECK(query_status_.ok()); // nobody should have been able to cancel + // TODO: do not allow cancellation via the debug page until Exec() has returned + //DCHECK(query_status_.ok()); // nobody should have been able to cancel query_status_ = status; CancelInternal(); } @@ -647,7 +429,6 @@ string Coordinator::FilterDebugString() { table_printer.AddColumn("ID", false); table_printer.AddColumn("Src. Node", false); table_printer.AddColumn("Tgt. Node(s)", false); - table_printer.AddColumn("Targets", false); table_printer.AddColumn("Target type", false); table_printer.AddColumn("Partition filter", false); @@ -665,25 +446,21 @@ string Coordinator::FilterDebugString() { row.push_back(lexical_cast<string>(v.first)); row.push_back(lexical_cast<string>(state.src())); vector<string> target_ids; - vector<string> num_target_instances; vector<string> target_types; vector<string> partition_filter; for (const FilterTarget& target: state.targets()) { target_ids.push_back(lexical_cast<string>(target.node_id)); - num_target_instances.push_back( - lexical_cast<string>(target.fragment_instance_state_idxs.size())); target_types.push_back(target.is_local ? "LOCAL" : "REMOTE"); partition_filter.push_back(target.is_bound_by_partition_columns ? "true" : "false"); } row.push_back(join(target_ids, ", ")); - row.push_back(join(num_target_instances, ", ")); row.push_back(join(target_types, ", ")); row.push_back(join(partition_filter, ", ")); if (filter_mode_ == TRuntimeFilterMode::GLOBAL) { int pending_count = state.completion_time() != 0L ? 0 : state.pending_count(); row.push_back(Substitute("$0 ($1)", pending_count, - state.src_fragment_instance_state_idxs().size())); + state.src_fragment_instance_idxs().size())); if (state.first_arrival_time() == 0L) { row.push_back("N/A"); } else { @@ -704,16 +481,6 @@ string Coordinator::FilterDebugString() { return Substitute("\n$0", table_printer.ToString()); } -void Coordinator::MarkFilterRoutingTableComplete() { - DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF) - << "MarkFilterRoutingTableComplete() called although runtime filters are disabled"; - query_profile_->AddInfoString( - "Number of filters", Substitute("$0", filter_routing_table_.size())); - query_profile_->AddInfoString("Filter routing table", FilterDebugString()); - if (VLOG_IS_ON(2)) VLOG_QUERY << FilterDebugString(); - filter_routing_table_complete_ = true; -} - Status Coordinator::GetStatus() { lock_guard<mutex> l(lock_); return query_status_; @@ -724,7 +491,7 @@ Status Coordinator::UpdateStatus(const Status& status, const TUniqueId& instance { lock_guard<mutex> l(lock_); - // The query is done and we are just waiting for fragment instances to clean up. + // The query is done and we are just waiting for backends to clean up. // Ignore their cancelled updates. if (returned_all_results_ && status.IsCancelled()) return query_status_; @@ -738,8 +505,8 @@ Status Coordinator::UpdateStatus(const Status& status, const TUniqueId& instance CancelInternal(); } - // Log the id of the fragment that first failed so we can track it down easier. - VLOG_QUERY << "Query id=" << query_id_ << " failed because fragment id=" + // Log the id of the fragment that first failed so we can track it down more easily. + VLOG_QUERY << "Query id=" << query_id() << " failed because instance id=" << instance_id << " on host=" << instance_hostname << " failed."; return query_status_; @@ -789,7 +556,7 @@ void Coordinator::PopulatePathPermissionCache(hdfsFS fs, const string& path_str, PermissionCache::const_iterator it = permissions_cache->find(path); if (it == permissions_cache->end()) { hdfsFileInfo* info = hdfsGetPathInfo(fs, path.c_str()); - if (info != NULL) { + if (info != nullptr) { // File exists, so fill the cache with its current permissions. permissions_cache->insert( make_pair(path, make_pair(false, info->mPermissions))); @@ -815,11 +582,15 @@ Status Coordinator::FinalizeSuccessfulInsert() { // 1. If OVERWRITE, remove all the files in the target directory // 2. Create all the necessary partition directories. DescriptorTbl* descriptor_table; - DescriptorTbl::Create(obj_pool(), desc_tbl_, &descriptor_table); + // TODO: add DescriptorTbl::CreateTableDescriptor() so we can create a + // descriptor for just the output table, calling Create() can be very + // expensive. + DescriptorTbl::Create(obj_pool(), query_ctx_.desc_tbl, nullptr, &descriptor_table); HdfsTableDescriptor* hdfs_table = static_cast<HdfsTableDescriptor*>( descriptor_table->GetTableDescriptor(finalize_params_.table_id)); - DCHECK(hdfs_table != NULL) << "INSERT target table not known in descriptor table: " - << finalize_params_.table_id; + DCHECK(hdfs_table != nullptr) + << "INSERT target table not known in descriptor table: " + << finalize_params_.table_id; // Loop over all partitions that were updated by this insert, and create the set of // filesystem operations required to create the correct partition structure on disk. @@ -843,9 +614,9 @@ Status Coordinator::FinalizeSuccessfulInsert() { part_path_ss << finalize_params_.hdfs_base_dir << "/" << partition.first; } else { HdfsPartitionDescriptor* part = hdfs_table->GetPartition(partition.second.id); - DCHECK(part != NULL) << "table_id=" << hdfs_table->id() - << " partition_id=" << partition.second.id - << "\n" << PrintThrift(runtime_state()->instance_ctx()); + DCHECK(part != nullptr) + << "table_id=" << hdfs_table->id() << " partition_id=" << partition.second.id + << "\n" << PrintThrift(runtime_state()->instance_ctx()); part_path_ss << part->location(); } const string& part_path = part_path_ss.str(); @@ -869,15 +640,15 @@ Status Coordinator::FinalizeSuccessfulInsert() { errno = 0; hdfsFileInfo* existing_files = hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files); - if (existing_files == NULL && errno == EAGAIN) { + if (existing_files == nullptr && errno == EAGAIN) { errno = 0; existing_files = hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files); } - // hdfsListDirectory() returns NULL not only when there is an error but also + // hdfsListDirectory() returns nullptr not only when there is an error but also // when the directory is empty(HDFS-8407). Need to check errno to make sure // the call fails. - if (existing_files == NULL && errno != 0) { + if (existing_files == nullptr && errno != 0) { return GetHdfsErrorMsg("Could not list directory: ", part_path); } for (int i = 0; i < num_files; ++i) { @@ -927,7 +698,8 @@ Status Coordinator::FinalizeSuccessfulInsert() { { SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "Overwrite/PartitionCreationTimer", "FinalizationTimer")); - if (!partition_create_ops.Execute(exec_env_->hdfs_op_thread_pool(), false)) { + if (!partition_create_ops.Execute( + ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) { for (const HdfsOperationSet::Error& err: partition_create_ops.errors()) { // It's ok to ignore errors creating the directories, since they may already // exist. If there are permission errors, we'll run into them later. @@ -962,7 +734,7 @@ Status Coordinator::FinalizeSuccessfulInsert() { { SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "FileMoveTimer", "FinalizationTimer")); - if (!move_ops.Execute(exec_env_->hdfs_op_thread_pool(), false)) { + if (!move_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) { stringstream ss; ss << "Error(s) moving partition files. First error (of " << move_ops.errors().size() << ") was: " << move_ops.errors()[0].second; @@ -974,7 +746,7 @@ Status Coordinator::FinalizeSuccessfulInsert() { { SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "FileDeletionTimer", "FinalizationTimer")); - if (!dir_deletion_ops.Execute(exec_env_->hdfs_op_thread_pool(), false)) { + if (!dir_deletion_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) { stringstream ss; ss << "Error(s) deleting staging directories. First error (of " << dir_deletion_ops.errors().size() << ") was: " @@ -996,7 +768,7 @@ Status Coordinator::FinalizeSuccessfulInsert() { chmod_ops.Add(CHMOD, perm.first, permissions); } } - if (!chmod_ops.Execute(exec_env_->hdfs_op_thread_pool(), false)) { + if (!chmod_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) { stringstream ss; ss << "Error(s) setting permissions on newly created partition directories. First" << " error (of " << chmod_ops.errors().size() << ") was: " @@ -1015,7 +787,7 @@ Status Coordinator::FinalizeQuery() { DCHECK(has_called_wait_); DCHECK(needs_finalization_); - VLOG_QUERY << "Finalizing query: " << query_id_; + VLOG_QUERY << "Finalizing query: " << query_id(); SCOPED_TIMER(finalization_timer_); Status return_status = GetStatus(); if (return_status.ok()) { @@ -1024,7 +796,7 @@ Status Coordinator::FinalizeQuery() { stringstream staging_dir; DCHECK(finalize_params_.__isset.staging_dir); - staging_dir << finalize_params_.staging_dir << "/" << PrintId(query_id_,"_") << "/"; + staging_dir << finalize_params_.staging_dir << "/" << PrintId(query_id(),"_") << "/"; hdfsFS hdfs_conn; RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(staging_dir.str(), &hdfs_conn)); @@ -1034,17 +806,17 @@ Status Coordinator::FinalizeQuery() { return return_status; } -Status Coordinator::WaitForAllInstances() { +Status Coordinator::WaitForBackendCompletion() { unique_lock<mutex> l(lock_); - while (num_remaining_fragment_instances_ > 0 && query_status_.ok()) { - VLOG_QUERY << "Coordinator waiting for fragment instances to finish, " - << num_remaining_fragment_instances_ << " remaining"; - instance_completion_cv_.wait(l); + while (num_remaining_backends_ > 0 && query_status_.ok()) { + VLOG_QUERY << "Coordinator waiting for backends to finish, " + << num_remaining_backends_ << " remaining"; + backend_completion_cv_.wait(l); } if (query_status_.ok()) { - VLOG_QUERY << "All fragment instances finished successfully."; + VLOG_QUERY << "All backends finished successfully."; } else { - VLOG_QUERY << "All fragment instances finished due to one or more errors. " + VLOG_QUERY << "All backends finished due to one or more errors. " << query_status_.GetDetail(); } @@ -1058,9 +830,9 @@ Status Coordinator::Wait() { has_called_wait_ = true; if (stmt_type_ == TStmtType::QUERY) { - DCHECK(executor() != nullptr); - return UpdateStatus(executor()->WaitForOpen(), runtime_state()->fragment_instance_id(), - FLAGS_hostname); + DCHECK(coord_instance_ != nullptr); + return UpdateStatus(coord_instance_->WaitForOpen(), + runtime_state()->fragment_instance_id(), FLAGS_hostname); } DCHECK_EQ(stmt_type_, TStmtType::DML); @@ -1070,7 +842,7 @@ Status Coordinator::Wait() { // fragment which will be available after Open() returns. // Ignore the returned status if finalization is required., since FinalizeQuery() will // pick it up and needs to execute regardless. - Status status = WaitForAllInstances(); + Status status = WaitForBackendCompletion(); if (!needs_finalization_ && !status.ok()) return status; // Query finalization is required only for HDFS table sinks @@ -1078,16 +850,14 @@ Status Coordinator::Wait() { query_profile_->AddInfoString( "DML Stats", DataSink::OutputDmlStats(per_partition_status_, "\n")); - // For DML queries, when Wait is done, the query is complete. Report aggregate - // query profiles at this point. - // TODO: make sure ReportQuerySummary gets called on error - ReportQuerySummary(); + // For DML queries, when Wait is done, the query is complete. + ComputeQuerySummary(); return status; } Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) { - VLOG_ROW << "GetNext() query_id=" << query_id_; + VLOG_ROW << "GetNext() query_id=" << query_id(); DCHECK(has_called_wait_); SCOPED_TIMER(query_profile_->total_time_counter()); @@ -1112,7 +882,7 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) { if (*eos) { returned_all_results_ = true; // Trigger tear-down of coordinator fragment by closing the consumer. Must do before - // WaitForAllInstances(). + // WaitForBackendCompletion(). coord_sink_->CloseConsumer(); coord_sink_ = nullptr; @@ -1120,211 +890,23 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) { // all instances to complete before ultimately signalling the end of execution via a // NULL batch. After NULL is returned, the coordinator may tear down query state, and // perform post-query finalization which might depend on the reports from all - // instances. + // backends. // // TODO: Waiting should happen in TearDown() (and then we wouldn't need to call // CloseConsumer() here). See IMPALA-4275 for details. - RETURN_IF_ERROR(WaitForAllInstances()); - if (query_status_.ok()) { - // If the query completed successfully, report aggregate query profiles. - ReportQuerySummary(); - } + RETURN_IF_ERROR(WaitForBackendCompletion()); + // if the query completed successfully, compute the summary + if (query_status_.ok()) ComputeQuerySummary(); } return Status::OK(); } -void Coordinator::PrintFragmentInstanceInfo() { - for (InstanceState* state: fragment_instance_states_) { - SummaryStats& acc = fragment_profiles_[state->fragment_idx()].bytes_assigned; - acc(state->total_split_size()); - } - - for (int id = (executor() == NULL ? 0 : 1); id < fragment_profiles_.size(); ++id) { - SummaryStats& acc = fragment_profiles_[id].bytes_assigned; - double min = accumulators::min(acc); - double max = accumulators::max(acc); - double mean = accumulators::mean(acc); - double stddev = sqrt(accumulators::variance(acc)); - stringstream ss; - ss << " min: " << PrettyPrinter::Print(min, TUnit::BYTES) - << ", max: " << PrettyPrinter::Print(max, TUnit::BYTES) - << ", avg: " << PrettyPrinter::Print(mean, TUnit::BYTES) - << ", stddev: " << PrettyPrinter::Print(stddev, TUnit::BYTES); - fragment_profiles_[id].averaged_profile->AddInfoString("split sizes", ss.str()); - - if (VLOG_FILE_IS_ON) { - VLOG_FILE << "Byte split for fragment " << id << " " << ss.str(); - for (InstanceState* exec_state: fragment_instance_states_) { - if (exec_state->fragment_idx() != id) continue; - VLOG_FILE << "data volume for ipaddress " << exec_state << ": " - << PrettyPrinter::Print(exec_state->total_split_size(), TUnit::BYTES); - } - } - } -} - -void Coordinator::InitExecSummary() { - const TQueryExecRequest& request = schedule_.request(); - // init exec_summary_.{nodes, exch_to_sender_map} - exec_summary_.__isset.nodes = true; - DCHECK(exec_summary_.nodes.empty()); - for (const TPlanExecInfo& plan_exec_info: request.plan_exec_info) { - for (const TPlanFragment& fragment: plan_exec_info.fragments) { - if (!fragment.__isset.plan) continue; - - // eventual index of fragment's root node in exec_summary_.nodes - int root_node_idx = exec_summary_.nodes.size(); - - const TPlan& plan = fragment.plan; - int num_instances = - schedule_.GetFragmentExecParams(fragment.idx).instance_exec_params.size(); - for (const TPlanNode& node: plan.nodes) { - plan_node_id_to_summary_map_[node.node_id] = exec_summary_.nodes.size(); - exec_summary_.nodes.emplace_back(); - TPlanNodeExecSummary& node_summary = exec_summary_.nodes.back(); - node_summary.__set_node_id(node.node_id); - node_summary.__set_fragment_idx(fragment.idx); - node_summary.__set_label(node.label); - node_summary.__set_label_detail(node.label_detail); - node_summary.__set_num_children(node.num_children); - if (node.__isset.estimated_stats) { - node_summary.__set_estimated_stats(node.estimated_stats); - } - node_summary.exec_stats.resize(num_instances); - } - - if (fragment.__isset.output_sink - && fragment.output_sink.type == TDataSinkType::DATA_STREAM_SINK) { - const TDataStreamSink& sink = fragment.output_sink.stream_sink; - int exch_idx = plan_node_id_to_summary_map_[sink.dest_node_id]; - if (sink.output_partition.type == TPartitionType::UNPARTITIONED) { - exec_summary_.nodes[exch_idx].__set_is_broadcast(true); - } - exec_summary_.__isset.exch_to_sender_map = true; - exec_summary_.exch_to_sender_map[exch_idx] = root_node_idx; - } - } - } -} - -void Coordinator::InitExecProfiles() { - vector<const TPlanFragment*> fragments; - schedule_.GetTPlanFragments(&fragments); - fragment_profiles_.resize(fragments.size()); - - const TPlanFragment* coord_fragment = schedule_.GetCoordFragment(); - - // Initialize the runtime profile structure. This adds the per fragment average - // profiles followed by the per fragment instance profiles. - for (const TPlanFragment* fragment: fragments) { - string profile_name = - (fragment == coord_fragment) ? "Coordinator Fragment $0" : "Fragment $0"; - PerFragmentProfileData* data = &fragment_profiles_[fragment->idx]; - data->num_instances = - schedule_.GetFragmentExecParams(fragment->idx).instance_exec_params.size(); - // TODO-MT: stop special-casing the coordinator fragment - if (fragment != coord_fragment) { - data->averaged_profile = obj_pool()->Add(new RuntimeProfile( - obj_pool(), Substitute("Averaged Fragment $0", fragment->display_name), true)); - query_profile_->AddChild(data->averaged_profile, true); - } - data->root_profile = obj_pool()->Add( - new RuntimeProfile(obj_pool(), Substitute(profile_name, fragment->display_name))); - // Note: we don't start the wall timer here for the fragment profile; - // it's uninteresting and misleading. - query_profile_->AddChild(data->root_profile); - } -} - -void Coordinator::CollectScanNodeCounters(RuntimeProfile* profile, - FragmentInstanceCounters* counters) { - vector<RuntimeProfile*> children; - profile->GetAllChildren(&children); - for (RuntimeProfile* p: children) { - PlanNodeId id = ExecNode::GetNodeIdFromProfile(p); - - // This profile is not for an exec node. - if (id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) continue; - - RuntimeProfile::Counter* throughput_counter = - p->GetCounter(ScanNode::TOTAL_THROUGHPUT_COUNTER); - if (throughput_counter != NULL) { - counters->throughput_counters[id] = throughput_counter; - } - RuntimeProfile::Counter* scan_ranges_counter = - p->GetCounter(ScanNode::SCAN_RANGES_COMPLETE_COUNTER); - if (scan_ranges_counter != NULL) { - counters->scan_ranges_complete_counters[id] = scan_ranges_counter; - } - } -} - -void Coordinator::ExecRemoteFInstance( - const FInstanceExecParams& exec_params, const DebugOptions* debug_options) { - NotifyBarrierOnExit notifier(exec_complete_barrier_.get()); - TExecPlanFragmentParams rpc_params; - SetExecPlanFragmentParams(exec_params, &rpc_params); - if (debug_options != NULL) { - rpc_params.fragment_instance_ctx.__set_debug_node_id(debug_options->node_id); - rpc_params.fragment_instance_ctx.__set_debug_action(debug_options->action); - rpc_params.fragment_instance_ctx.__set_debug_phase(debug_options->phase); - } - int instance_state_idx = GetInstanceIdx(exec_params.instance_id); - InstanceState* exec_state = fragment_instance_states_[instance_state_idx]; - exec_state->ComputeTotalSplitSize( - rpc_params.fragment_instance_ctx.per_node_scan_ranges); - VLOG_FILE << "making rpc: ExecPlanFragment" - << " host=" << exec_state->impalad_address() - << " instance_id=" << PrintId(exec_state->fragment_instance_id()); - - // Guard against concurrent UpdateExecStatus() that may arrive after RPC returns. - lock_guard<mutex> l(*exec_state->lock()); - int64_t start = MonotonicMillis(); - - Status client_connect_status; - ImpalaBackendConnection backend_client(exec_env_->impalad_client_cache(), - exec_state->impalad_address(), &client_connect_status); - if (!client_connect_status.ok()) { - exec_state->SetInitialStatus(client_connect_status, false); - return; - } - - TExecPlanFragmentResult thrift_result; - Status rpc_status = backend_client.DoRpc(&ImpalaBackendClient::ExecPlanFragment, - rpc_params, &thrift_result); - exec_state->set_rpc_latency(MonotonicMillis() - start); - - const string ERR_TEMPLATE = "ExecPlanRequest rpc query_id=$0 instance_id=$1 failed: $2"; - - if (!rpc_status.ok()) { - const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id()), - PrintId(exec_state->fragment_instance_id()), rpc_status.msg().msg()); - VLOG_QUERY << err_msg; - exec_state->SetInitialStatus(Status(err_msg), true); - return; - } - - Status exec_status = Status(thrift_result.status); - if (!exec_status.ok()) { - const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id()), - PrintId(exec_state->fragment_instance_id()), - exec_status.msg().GetFullMessageDetails()); - VLOG_QUERY << err_msg; - exec_state->SetInitialStatus(Status(err_msg), true); - return; - } - - exec_state->SetInitialStatus(Status::OK(), true); - VLOG_FILE << "rpc succeeded: ExecPlanFragment" - << " instance_id=" << PrintId(exec_state->fragment_instance_id()); -} - void Coordinator::Cancel(const Status* cause) { lock_guard<mutex> l(lock_); - // if the query status indicates an error, cancellation has already been initiated - if (!query_status_.ok()) return; + // if the query status indicates an error, cancellation has already been initiated; // prevent others from cancelling a second time + if (!query_status_.ok()) return; // TODO: This should default to OK(), not CANCELLED if there is no cause (or callers // should explicitly pass Status::OK()). Fragment instances may be cancelled at the end @@ -1335,214 +917,105 @@ void Coordinator::Cancel(const Status* cause) { } void Coordinator::CancelInternal() { - VLOG_QUERY << "Cancel() query_id=" << query_id_; - CancelFragmentInstances(); + VLOG_QUERY << "Cancel() query_id=" << query_id(); - // Report the summary with whatever progress the query made before being cancelled. - ReportQuerySummary(); -} - -void Coordinator::CancelFragmentInstances() { int num_cancelled = 0; - for (InstanceState* exec_state: fragment_instance_states_) { - DCHECK(exec_state != nullptr); - - // lock each exec_state individually to synchronize correctly with - // UpdateFragmentExecStatus() (which doesn't get the global lock_ - // to set its status) - lock_guard<mutex> l(*exec_state->lock()); - - // Nothing to cancel if the exec rpc was not sent - if (!exec_state->rpc_sent()) continue; - - // don't cancel if it already finished - if (exec_state->done()) continue; - - /// If the status is not OK, we still try to cancel - !OK status might mean - /// communication failure between fragment instance and coordinator, but fragment - /// instance might still be running. - - // set an error status to make sure we only cancel this once - exec_state->set_status(Status::CANCELLED); - - // if we get an error while trying to get a connection to the backend, - // keep going - Status status; - ImpalaBackendConnection backend_client( - exec_env_->impalad_client_cache(), exec_state->impalad_address(), &status); - if (!status.ok()) continue; - ++num_cancelled; - TCancelPlanFragmentParams params; - params.protocol_version = ImpalaInternalServiceVersion::V1; - params.__set_fragment_instance_id(exec_state->fragment_instance_id()); - TCancelPlanFragmentResult res; - VLOG_QUERY << "sending CancelPlanFragment rpc for instance_id=" - << exec_state->fragment_instance_id() << " backend=" - << exec_state->impalad_address(); - Status rpc_status; - // Try to send the RPC 3 times before failing. - bool retry_is_safe; - for (int i = 0; i < 3; ++i) { - rpc_status = backend_client.DoRpc(&ImpalaBackendClient::CancelPlanFragment, - params, &res, &retry_is_safe); - if (rpc_status.ok() || !retry_is_safe) break; - } - if (!rpc_status.ok()) { - exec_state->status()->MergeStatus(rpc_status); - stringstream msg; - msg << "CancelPlanFragment rpc query_id=" << query_id_ - << " instance_id=" << exec_state->fragment_instance_id() - << " failed: " << rpc_status.msg().msg(); - // make a note of the error status, but keep on cancelling the other fragments - exec_state->status()->AddDetail(msg.str()); - continue; - } - if (res.status.status_code != TErrorCode::OK) { - exec_state->status()->AddDetail(join(res.status.error_msgs, "; ")); - } + for (BackendState* backend_state: backend_states_) { + DCHECK(backend_state != nullptr); + if (backend_state->Cancel()) ++num_cancelled; } VLOG_QUERY << Substitute( - "CancelFragmentInstances() query_id=$0, tried to cancel $1 fragment instances", - PrintId(query_id_), num_cancelled); + "CancelBackends() query_id=$0, tried to cancel $1 backends", + PrintId(query_id()), num_cancelled); + backend_completion_cv_.notify_all(); - // Notify that we completed with an error. - instance_completion_cv_.notify_all(); + // Report the summary with whatever progress the query made before being cancelled. + ComputeQuerySummary(); } -Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& params) { - VLOG_FILE << "UpdateFragmentExecStatus() " - << " instance=" << PrintId(params.fragment_instance_id) - << " status=" << params.status.status_code - << " done=" << (params.done ? "true" : "false"); - int instance_state_idx = GetInstanceIdx(params.fragment_instance_id); - if (instance_state_idx >= fragment_instance_states_.size()) { +Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& params) { + VLOG_FILE << "UpdateBackendExecStatus() backend_idx=" << params.coord_state_idx; + if (params.coord_state_idx >= backend_states_.size()) { return Status(TErrorCode::INTERNAL_ERROR, - Substitute("Unknown fragment instance index $0 (max known: $1)", - instance_state_idx, fragment_instance_states_.size() - 1)); - } - InstanceState* exec_state = fragment_instance_states_[instance_state_idx]; - - const TRuntimeProfileTree& cumulative_profile = params.profile; - Status status(params.status); - { - lock_guard<mutex> l(*exec_state->lock()); - if (!status.ok()) { - // During query cancellation, exec_state is set to CANCELLED. However, we might - // process a non-error message from a fragment executor that is sent - // before query cancellation is invoked. Make sure we don't go from error status to - // OK. - exec_state->set_status(status); - } - exec_state->set_done(params.done); - if (exec_state->status()->ok()) { - // We can't update this backend's profile if ReportQuerySummary() is running, - // because it depends on all profiles not changing during its execution (when it - // calls SortChildren()). ReportQuerySummary() only gets called after - // WaitForAllInstances() returns or at the end of CancelFragmentInstances(). - // WaitForAllInstances() only returns after all backends have completed (in which - // case we wouldn't be in this function), or when there's an error, in which case - // CancelFragmentInstances() is called. CancelFragmentInstances sets all - // exec_state's statuses to cancelled. - // TODO: We're losing this profile information. Call ReportQuerySummary only after - // all backends have completed. - exec_state->profile()->Update(cumulative_profile); - - // Update the average profile for the fragment corresponding to this instance. - exec_state->profile()->ComputeTimeInProfile(); - UpdateAverageProfile(exec_state); - UpdateExecSummary(*exec_state); - } - if (!exec_state->SetProfileCreated()) { - CollectScanNodeCounters(exec_state->profile(), exec_state->aggregate_counters()); - } - - // Log messages aggregated by type - if (params.__isset.error_log && params.error_log.size() > 0) { - // Append the log messages from each update with the global state of the query - // execution - MergeErrorMaps(exec_state->error_log(), params.error_log); - VLOG_FILE << "instance_id=" << exec_state->fragment_instance_id() - << " error log: " << PrintErrorMapToString(*exec_state->error_log()); - } - progress_.Update(exec_state->UpdateNumScanRangesCompleted()); + Substitute("Unknown backend index $0 (max known: $1)", + params.coord_state_idx, backend_states_.size() - 1)); } + BackendState* backend_state = backend_states_[params.coord_state_idx]; + // ignore stray exec reports if we're already done, otherwise we lose + // track of num_remaining_backends_ + if (backend_state->IsDone()) return Status::OK(); + // TODO: return here if returned_all_results_? + // TODO: return CANCELLED in that case? Although that makes the cancellation propagation + // path more irregular. - if (params.done && params.__isset.insert_exec_status) { - lock_guard<mutex> l(lock_); - // Merge in table update data (partitions written to, files to be moved as part of - // finalization) - for (const PartitionStatusMap::value_type& partition: - params.insert_exec_status.per_partition_status) { - TInsertPartitionStatus* status = &(per_partition_status_[partition.first]); - status->__set_num_modified_rows( - status->num_modified_rows + partition.second.num_modified_rows); - status->__set_kudu_latest_observed_ts(std::max( - partition.second.kudu_latest_observed_ts, status->kudu_latest_observed_ts)); - status->__set_id(partition.second.id); - status->__set_partition_base_dir(partition.second.partition_base_dir); - - if (partition.second.__isset.stats) { - if (!status->__isset.stats) status->__set_stats(TInsertStats()); - DataSink::MergeDmlStats(partition.second.stats, &status->stats); - } - } - files_to_move_.insert( - params.insert_exec_status.files_to_move.begin(), - params.insert_exec_status.files_to_move.end()); - } + bool done; + backend_state->ApplyExecStatusReport(params, &exec_summary_, &progress_, &done); - if (VLOG_FILE_IS_ON) { - stringstream s; - exec_state->profile()->PrettyPrint(&s); - VLOG_FILE << "profile for instance_id=" << exec_state->fragment_instance_id() - << "\n" << s.str(); - } - // also print the cumulative profile - // TODO: fix the coordinator/PlanFragmentExecutor, so this isn't needed - if (VLOG_FILE_IS_ON) { - stringstream s; - query_profile_->PrettyPrint(&s); - VLOG_FILE << "cumulative profile for query_id=" << query_id_ - << "\n" << s.str(); + // TODO: only do this when the sink is done; probably missing a done field + // in TReportExecStatus for that + if (params.__isset.insert_exec_status) { + UpdateInsertExecStatus(params.insert_exec_status); } - // for now, abort the query if we see any error except if the error is cancelled - // and returned_all_results_ is true. + // for now, abort the query if we see any error except if returned_all_results_ is true // (UpdateStatus() initiates cancellation, if it hasn't already been) - if (!(returned_all_results_ && status.IsCancelled()) && !status.ok()) { - UpdateStatus(status, exec_state->fragment_instance_id(), - TNetworkAddressToString(exec_state->impalad_address())); + // TODO: clarify control flow here, it's unclear we should even process this status + // report if returned_all_results_ is true + TUniqueId failed_instance_id; + Status status = backend_state->GetStatus(&failed_instance_id); + if (!status.ok() && !returned_all_results_) { + Status ignored = UpdateStatus(status, failed_instance_id, + TNetworkAddressToString(backend_state->impalad_address())); return Status::OK(); } - if (params.done) { + if (done) { lock_guard<mutex> l(lock_); - exec_state->stopwatch()->Stop(); - DCHECK_GT(num_remaining_fragment_instances_, 0); - VLOG_QUERY << "Fragment instance completed:" - << " id=" << PrintId(exec_state->fragment_instance_id()) - << " host=" << exec_state->impalad_address() - << " remaining=" << num_remaining_fragment_instances_ - 1; - if (VLOG_QUERY_IS_ON && num_remaining_fragment_instances_ > 1) { + DCHECK_GT(num_remaining_backends_, 0); + VLOG_QUERY << "Backend completed: " + << " host=" << backend_state->impalad_address() + << " remaining=" << num_remaining_backends_ - 1; + if (VLOG_QUERY_IS_ON && num_remaining_backends_ > 1) { // print host/port info for the first backend that's still in progress as a // debugging aid for backend deadlocks - for (InstanceState* exec_state: fragment_instance_states_) { - lock_guard<mutex> l2(*exec_state->lock()); - if (!exec_state->done()) { - VLOG_QUERY << "query_id=" << query_id_ << ": first in-progress backend: " - << exec_state->impalad_address(); + for (BackendState* backend_state: backend_states_) { + if (!backend_state->IsDone()) { + VLOG_QUERY << "query_id=" << query_id() << ": first in-progress backend: " + << backend_state->impalad_address(); break; } } } - if (--num_remaining_fragment_instances_ == 0) { - instance_completion_cv_.notify_all(); + if (--num_remaining_backends_ == 0 || !status.ok()) { + backend_completion_cv_.notify_all(); } } return Status::OK(); } +void Coordinator::UpdateInsertExecStatus(const TInsertExecStatus& insert_exec_status) { + lock_guard<mutex> l(lock_); + for (const PartitionStatusMap::value_type& partition: + insert_exec_status.per_partition_status) { + TInsertPartitionStatus* status = &(per_partition_status_[partition.first]); + status->__set_num_modified_rows( + status->num_modified_rows + partition.second.num_modified_rows); + status->__set_kudu_latest_observed_ts(std::max( + partition.second.kudu_latest_observed_ts, status->kudu_latest_observed_ts)); + status->__set_id(partition.second.id); + status->__set_partition_base_dir(partition.second.partition_base_dir); + + if (partition.second.__isset.stats) { + if (!status->__isset.stats) status->__set_stats(TInsertStats()); + DataSink::MergeDmlStats(partition.second.stats, &status->stats); + } + } + files_to_move_.insert( + insert_exec_status.files_to_move.begin(), insert_exec_status.files_to_move.end()); +} + + uint64_t Coordinator::GetLatestKuduInsertTimestamp() const { uint64_t max_ts = 0; for (const auto& entry : per_partition_status_) { @@ -1553,7 +1026,7 @@ uint64_t Coordinator::GetLatestKuduInsertTimestamp() const { } RuntimeState* Coordinator::runtime_state() { - return executor() == NULL ? NULL : executor()->runtime_state(); + return coord_instance_ == nullptr ? nullptr : coord_instance_->runtime_state(); } bool Coordinator::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) { @@ -1567,330 +1040,46 @@ bool Coordinator::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) { return catalog_update->created_partitions.size() != 0; } -// Comparator to order RuntimeProfiles by descending total time -typedef struct { - typedef pair<RuntimeProfile*, bool> Profile; - bool operator()(const Profile& a, const Profile& b) const { - // Reverse ordering: we want the longest first - return - a.first->total_time_counter()->value() > b.first->total_time_counter()->value(); - } -} InstanceComparator; - -void Coordinator::UpdateAverageProfile(InstanceState* instance_state) { - FragmentIdx fragment_idx = instance_state->fragment_idx(); - DCHECK_GE(fragment_idx, 0); - DCHECK_LT(fragment_idx, fragment_profiles_.size()); - PerFragmentProfileData* data = &fragment_profiles_[fragment_idx]; - - // No locks are taken since UpdateAverage() and AddChild() take their own locks - if (data->averaged_profile != nullptr) { - data->averaged_profile->UpdateAverage(instance_state->profile()); - } - data->root_profile->AddChild(instance_state->profile()); -} - -void Coordinator::ComputeFragmentSummaryStats(InstanceState* instance_state) { - FragmentIdx fragment_idx = instance_state->fragment_idx(); - DCHECK_GE(fragment_idx, 0); - DCHECK_LT(fragment_idx, fragment_profiles_.size()); - PerFragmentProfileData* data = &fragment_profiles_[fragment_idx]; - - int64_t completion_time = instance_state->stopwatch()->ElapsedTime(); - data->completion_times(completion_time); - data->rates(instance_state->total_split_size() - / (completion_time / 1000.0 / 1000.0 / 1000.0)); - - // Add the child in case it has not been added previously - // via UpdateAverageProfile(). AddChild() will do nothing if the child - // already exists. - data->root_profile->AddChild(instance_state->profile()); -} - -void Coordinator::UpdateExecSummary(const InstanceState& instance_state) { - vector<RuntimeProfile*> children; - instance_state.profile()->GetAllChildren(&children); - - lock_guard<SpinLock> l(exec_summary_lock_); - for (int i = 0; i < children.size(); ++i) { - int node_id = ExecNode::GetNodeIdFromProfile(children[i]); - if (node_id == -1) continue; - - TPlanNodeExecSummary& exec_summary = - exec_summary_.nodes[plan_node_id_to_summary_map_[node_id]]; - DCHECK_LT(instance_state.per_fragment_instance_idx(), exec_summary.exec_stats.size()); - DCHECK_EQ(fragment_profiles_[instance_state.fragment_idx()].num_instances, - exec_summary.exec_stats.size()); - TExecStats& stats = - exec_summary.exec_stats[instance_state.per_fragment_instance_idx()]; - - RuntimeProfile::Counter* rows_counter = children[i]->GetCounter("RowsReturned"); - RuntimeProfile::Counter* mem_counter = children[i]->GetCounter("PeakMemoryUsage"); - if (rows_counter != NULL) stats.__set_cardinality(rows_counter->value()); - if (mem_counter != NULL) stats.__set_memory_used(mem_counter->value()); - stats.__set_latency_ns(children[i]->local_time()); - // TODO: we don't track cpu time per node now. Do that. - exec_summary.__isset.exec_stats = true; - } - VLOG(2) << PrintExecSummary(exec_summary_); -} - -// This function appends summary information to the query_profile_ before -// outputting it to VLOG. It adds: -// 1. Averaged fragment instance profiles (TODO: add outliers) -// 2. Summary of fragment instance durations (min, max, mean, stddev) -// 3. Summary of fragment instance rates (min, max, mean, stddev) // TODO: add histogram/percentile -void Coordinator::ReportQuerySummary() { +void Coordinator::ComputeQuerySummary() { // In this case, the query did not even get to start all fragment instances. // Some of the state that is used below might be uninitialized. In this case, // the query has made so little progress, reporting a summary is not very useful. if (!has_called_wait_) return; - if (!fragment_instance_states_.empty()) { - // Average all fragment instances for each fragment. - for (InstanceState* state: fragment_instance_states_) { - state->profile()->ComputeTimeInProfile(); - UpdateAverageProfile(state); - // Skip coordinator fragment, if one exists. - // TODO: Can we remove the special casing here? - if (coord_instance_ == nullptr || state->fragment_idx() != 0) { - ComputeFragmentSummaryStats(state); - } - UpdateExecSummary(*state); - } + if (backend_states_.empty()) return; + // make sure fragment_stats_ are up-to-date + for (BackendState* backend_state: backend_states_) { + backend_state->UpdateExecStats(fragment_stats_); + } - InstanceComparator comparator; - // Per fragment instances have been collected, output summaries - for (int i = (executor() != NULL ? 1 : 0); i < fragment_profiles_.size(); ++i) { - fragment_profiles_[i].root_profile->SortChildren(comparator); - SummaryStats& completion_times = fragment_profiles_[i].completion_times; - SummaryStats& rates = fragment_profiles_[i].rates; - - stringstream times_label; - times_label - << "min:" << PrettyPrinter::Print( - accumulators::min(completion_times), TUnit::TIME_NS) - << " max:" << PrettyPrinter::Print( - accumulators::max(completion_times), TUnit::TIME_NS) - << " mean: " << PrettyPrinter::Print( - accumulators::mean(completion_times), TUnit::TIME_NS) - << " stddev:" << PrettyPrinter::Print( - sqrt(accumulators::variance(completion_times)), TUnit::TIME_NS); - - stringstream rates_label; - rates_label - << "min:" << PrettyPrinter::Print( - accumulators::min(rates), TUnit::BYTES_PER_SECOND) - << " max:" << PrettyPrinter::Print( - accumulators::max(rates), TUnit::BYTES_PER_SECOND) - << " mean:" << PrettyPrinter::Print( - accumulators::mean(rates), TUnit::BYTES_PER_SECOND) - << " stddev:" << PrettyPrinter::Print( - sqrt(accumulators::variance(rates)), TUnit::BYTES_PER_SECOND); - - fragment_profiles_[i].averaged_profile->AddInfoString( - "completion times", times_label.str()); - fragment_profiles_[i].averaged_profile->AddInfoString( - "execution rates", rates_label.str()); - fragment_profiles_[i].averaged_profile->AddInfoString( - "num instances", lexical_cast<string>(fragment_profiles_[i].num_instances)); - } + for (FragmentStats* fragment_stats: fragment_stats_) { + fragment_stats->AddSplitStats(); + // TODO: output the split info string and detailed stats to VLOG_FILE again? + fragment_stats->AddExecStats(); + } - // Add per node peak memory usage as InfoString - // Map from Impalad address to peak memory usage of this query - typedef unordered_map<TNetworkAddress, int64_t> PerNodePeakMemoryUsage; - PerNodePeakMemoryUsage per_node_peak_mem_usage; - for (InstanceState* state: fragment_instance_states_) { - int64_t initial_usage = 0; - int64_t* mem_usage = FindOrInsert(&per_node_peak_mem_usage, - state->impalad_address(), initial_usage); - RuntimeProfile::Counter* mem_usage_counter = - state->profile()->GetCounter(PlanFragmentExecutor::PER_HOST_PEAK_MEM_COUNTER); - if (mem_usage_counter != NULL && mem_usage_counter->value() > *mem_usage) { - per_node_peak_mem_usage[state->impalad_address()] = mem_usage_counter->value(); - } - } - stringstream info; - for (PerNodePeakMemoryUsage::value_type entry: per_node_peak_mem_usage) { - info << entry.first << "(" - << PrettyPrinter::Print(entry.second, TUnit::BYTES) << ") "; - } - query_profile_->AddInfoString("Per Node Peak Memory Usage", info.str()); + stringstream info; + for (BackendState* backend_state: backend_states_) { + info << backend_state->impalad_address() << "(" + << PrettyPrinter::Print(backend_state->GetPeakConsumption(), TUnit::BYTES) + << ") "; } + query_profile_->AddInfoString("Per Node Peak Memory Usage", info.str()); } string Coordinator::GetErrorLog() { ErrorLogMap merged; - for (InstanceState* state: fragment_instance_states_) { - lock_guard<mutex> l(*state->lock()); - if (state->error_log()->size() > 0) MergeErrorMaps(&merged, *state->error_log()); + for (BackendState* state: backend_states_) { + state->MergeErrorLog(&merged); } return PrintErrorMapToString(merged); } -void Coordinator::SetExecPlanFragmentParams( - const FInstanceExecParams& params, TExecPlanFragmentParams* rpc_params) { - rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1); - rpc_params->__set_query_ctx(query_ctx_); - - TPlanFragmentCtx fragment_ctx; - TPlanFragmentInstanceCtx fragment_instance_ctx; - - fragment_ctx.__set_fragment(params.fragment()); - SetExecPlanDescriptorTable(params.fragment(), rpc_params); - - // Remove filters that weren't selected during filter routing table construction. - if (filter_mode_ != TRuntimeFilterMode::OFF) { - DCHECK(schedule_.request().query_ctx.client_request.query_options.mt_dop == 0); - int instance_idx = GetInstanceIdx(params.instance_id); - for (TPlanNode& plan_node: rpc_params->fragment_ctx.fragment.plan.nodes) { - if (plan_node.__isset.runtime_filters) { - vector<TRuntimeFilterDesc> required_filters; - for (const TRuntimeFilterDesc& desc: plan_node.runtime_filters) { - FilterRoutingTable::iterator filter_it = - filter_routing_table_.find(desc.filter_id); - if (filter_it == filter_routing_table_.end()) continue; - const FilterState& f = filter_it->second; - if (plan_node.__isset.hash_join_node) { - if (f.src_fragment_instance_state_idxs().find(instance_idx) == - f.src_fragment_instance_state_idxs().end()) { - DCHECK(desc.is_broadcast_join); - continue; - } - } - // We don't need a target-side check here, because a filter is either sent to - // all its targets or none, and the none case is handled by checking if the - // filter is in the routing table. - required_filters.push_back(desc); - } - plan_node.__set_runtime_filters(required_filters); - } - } - } - - fragment_instance_ctx.__set_request_pool(schedule_.request_pool()); - fragment_instance_ctx.__set_per_node_scan_ranges(params.per_node_scan_ranges); - fragment_instance_ctx.__set_per_exch_num_senders( - params.fragment_exec_params.per_exch_num_senders); - fragment_instance_ctx.__set_destinations( - params.fragment_exec_params.destinations); - fragment_instance_ctx.__set_sender_id(params.sender_id); - fragment_instance_ctx.fragment_instance_id = params.instance_id; - fragment_instance_ctx.per_fragment_instance_idx = params.per_fragment_instance_idx; - rpc_params->__set_fragment_ctx(fragment_ctx); - rpc_params->__set_fragment_instance_ctx(fragment_instance_ctx); -} - -void Coordinator::SetExecPlanDescriptorTable(const TPlanFragment& fragment, - TExecPlanFragmentParams* rpc_params) { - DCHECK(rpc_params->__isset.query_ctx); - TDescriptorTable thrift_desc_tbl; - - // Always add the Tuple and Slot descriptors. - thrift_desc_tbl.__set_tupleDescriptors(desc_tbl_.tupleDescriptors); - thrift_desc_tbl.__set_slotDescriptors(desc_tbl_.slotDescriptors); - - // Collect the TTupleId(s) for ScanNode(s). - unordered_set<TTupleId> tuple_ids; - for (const TPlanNode& plan_node: fragment.plan.nodes) { - switch (plan_node.node_type) { - case TPlanNodeType::HDFS_SCAN_NODE: - tuple_ids.insert(plan_node.hdfs_scan_node.tuple_id); - break; - case TPlanNodeType::KUDU_SCAN_NODE: - tuple_ids.insert(plan_node.kudu_scan_node.tuple_id); - break; - case TPlanNodeType::HBASE_SCAN_NODE: - tuple_ids.insert(plan_node.hbase_scan_node.tuple_id); - break; - case TPlanNodeType::DATA_SOURCE_NODE: - tuple_ids.insert(plan_node.data_source_node.tuple_id); - break; - case TPlanNodeType::HASH_JOIN_NODE: - case TPlanNodeType::AGGREGATION_NODE: - case TPlanNodeType::SORT_NODE: - case TPlanNodeType::EMPTY_SET_NODE: - case TPlanNodeType::EXCHANGE_NODE: - case TPlanNodeType::UNION_NODE: - case TPlanNodeType::SELECT_NODE: - case TPlanNodeType::NESTED_LOOP_JOIN_NODE: - case TPlanNodeType::ANALYTIC_EVAL_NODE: - case TPlanNodeType::SINGULAR_ROW_SRC_NODE: - case TPlanNodeType::UNNEST_NODE: - case TPlanNodeType::SUBPLAN_NODE: - // Do nothing - break; - default: - DCHECK(false) << "Invalid node type: " << plan_node.node_type; - } - } - - // Collect TTableId(s) matching the TTupleId(s). - unordered_set<TTableId> table_ids; - for (const TTupleId& tuple_id: tuple_ids) { - for (const TTupleDescriptor& tuple_desc: desc_tbl_.tupleDescriptors) { - if (tuple_desc.__isset.tableId && tuple_id == tuple_desc.id) { - table_ids.insert(tuple_desc.tableId); - } - } - } - - // Collect the tableId for the table sink. - if (fragment.__isset.output_sink && fragment.output_sink.__isset.table_sink - && fragment.output_sink.type == TDataSinkType::TABLE_SINK) { - table_ids.insert(fragment.output_sink.table_sink.target_table_id); - } - - // For DataStreamSinks that partition according to the partitioning scheme of a Kudu - // table, we need the corresponding tableId. - if (fragment.__isset.output_sink && fragment.output_sink.__isset.stream_sink - && fragment.output_sink.type == TDataSinkType::DATA_STREAM_SINK - && fragment.output_sink.stream_sink.output_partition.type == TPartitionType::KUDU) { - TDataPartition partition = fragment.output_sink.stream_sink.output_partition; - DCHECK_EQ(partition.partition_exprs.size(), 1); - DCHECK(partition.partition_exprs[0].nodes[0].__isset.kudu_partition_expr); - table_ids.insert( - partition.partition_exprs[0].nodes[0].kudu_partition_expr.target_table_id); - } - - // Iterate over all TTableDescriptor(s) and add the ones that are needed. - for (const TTableDescriptor& table_desc: desc_tbl_.tableDescriptors) { - if (table_ids.find(table_desc.id) == table_ids.end()) continue; - thrift_desc_tbl.tableDescriptors.push_back(table_desc); - thrift_desc_tbl.__isset.tableDescriptors = true; - } - - rpc_params->query_ctx.__set_desc_tbl(thrift_desc_tbl); -} - -namespace { - -// Make a PublishFilter rpc to 'impalad' for given fragment_instance_id -// and params. -// This takes by-value parameters because we cannot guarantee that the originating -// coordinator won't be destroyed while this executes. -// TODO: switch to references when we fix the lifecycle problems of coordinators. -void DistributeFilters(shared_ptr<TPublishFilterParams> params, - TNetworkAddress impalad, TUniqueId fragment_instance_id) { - Status status; - ImpalaBackendConnection backend_client( - ExecEnv::GetInstance()->impalad_client_cache(), impalad, &status); - if (!status.ok()) return; - // Make a local copy of the shared 'master' set of parameters - TPublishFilterParams local_params(*params); - local_params.dst_instance_id = fragment_instance_id; - local_params.__set_bloom_filter(params->bloom_filter); - TPublishFilterResult res; - backend_client.DoRpc(&ImpalaBackendClient::PublishFilter, local_params, &res); -}; - -} - // TODO: call this as soon as it's clear that we won't reference the state // anymore, ie, in CancelInternal() and when GetNext() hits eos void Coordinator::TearDown() { - DCHECK(!torn_down_) << "Coordinator::TearDown() may not be called twice"; + DCHECK(!torn_down_) << "Coordinator::TearDown() must not be called twice"; torn_down_ = true; if (filter_routing_table_.size() > 0) { query_profile_->AddInfoString("Final filter table", FilterDebugString()); @@ -1925,7 +1114,7 @@ void Coordinator::TearDown() { void Coordinator::UpdateFilter(const TUpdateFilterParams& params) { DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF) << "UpdateFilter() called although runtime filters are disabled"; - DCHECK(exec_complete_barrier_.get() != NULL) + DCHECK(exec_complete_barrier_.get() != nullptr) << "Filters received before fragments started!"; exec_complete_barrier_->Wait(); DCHECK(filter_routing_table_complete_) @@ -1933,7 +1122,7 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) { // Make a 'master' copy that will be shared by all concurrent delivery RPC attempts. shared_ptr<TPublishFilterParams> rpc_params(new TPublishFilterParams()); - unordered_set<int> target_fragment_instance_state_idxs; + unordered_set<int> target_fragment_idxs; { lock_guard<SpinLock> l(filter_lock_); FilterRoutingTable::iterator it = filter_routing_table_.find(params.filter_id); @@ -1971,13 +1160,11 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) { // Don't publish the filter to targets that are in the same fragment as the join // that produced it. if (target.is_local) continue; - target_fragment_instance_state_idxs.insert( - target.fragment_instance_state_idxs.begin(), - target.fragment_instance_state_idxs.end()); +
<TRUNCATED>
