http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/scheduling/query-schedule.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc index b5745c4..5ad84df 100644 --- a/be/src/scheduling/query-schedule.cc +++ b/be/src/scheduling/query-schedule.cc @@ -52,25 +52,90 @@ QuerySchedule::QuerySchedule(const TUniqueId& query_id, query_events_(query_events), num_fragment_instances_(0), num_scan_ranges_(0), + next_instance_id_(query_id), is_admitted_(false) { fragment_exec_params_.resize(request.fragments.size()); - // Build two maps to map node ids to their fragments as well as to the offset in their - // fragment's plan's nodes list. - for (int i = 0; i < request.fragments.size(); ++i) { - int node_idx = 0; - for (const TPlanNode& node: request.fragments[i].plan.nodes) { - if (plan_node_to_fragment_idx_.size() < node.node_id + 1) { - plan_node_to_fragment_idx_.resize(node.node_id + 1); - plan_node_to_plan_node_idx_.resize(node.node_id + 1); + bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0; + + if (is_mt_execution) { + /// TODO-MT: remove else branch and move MtInit() logic here + MtInit(); + } else { + // Build two maps to map node ids to their fragments as well as to the offset in their + // fragment's plan's nodes list. + for (int i = 0; i < request.fragments.size(); ++i) { + int node_idx = 0; + for (const TPlanNode& node: request.fragments[i].plan.nodes) { + if (plan_node_to_fragment_idx_.size() < node.node_id + 1) { + plan_node_to_fragment_idx_.resize(node.node_id + 1); + plan_node_to_plan_node_idx_.resize(node.node_id + 1); + } + DCHECK_EQ(plan_node_to_fragment_idx_.size(), plan_node_to_plan_node_idx_.size()); + plan_node_to_fragment_idx_[node.node_id] = i; + plan_node_to_plan_node_idx_[node.node_id] = node_idx; + ++node_idx; } - DCHECK_EQ(plan_node_to_fragment_idx_.size(), plan_node_to_plan_node_idx_.size()); - plan_node_to_fragment_idx_[node.node_id] = i; - plan_node_to_plan_node_idx_[node.node_id] = node_idx; - ++node_idx; } } } +void QuerySchedule::MtInit() { + // extract TPlanFragments and order by fragment id + vector<const TPlanFragment*> fragments; + for (const TPlanExecInfo& plan_exec_info: request_.mt_plan_exec_info) { + for (const TPlanFragment& fragment: plan_exec_info.fragments) { + fragments.push_back(&fragment); + } + } + sort(fragments.begin(), fragments.end(), + [](const TPlanFragment* a, const TPlanFragment* b) { return a->idx < b->idx; }); + + DCHECK_EQ(mt_fragment_exec_params_.size(), 0); + for (const TPlanFragment* fragment: fragments) { + mt_fragment_exec_params_.emplace_back(*fragment); + } + + // mark coordinator fragment + const TPlanFragment& coord_fragment = request_.mt_plan_exec_info[0].fragments[0]; + if (coord_fragment.partition.type == TPartitionType::UNPARTITIONED) { + mt_fragment_exec_params_[coord_fragment.idx].is_coord_fragment = true; + next_instance_id_.lo = 1; // generated instance ids start at 1 + } + + // compute input fragments and find max node id + int max_node_id = 0; + for (const TPlanExecInfo& plan_exec_info: request_.mt_plan_exec_info) { + for (const TPlanFragment& fragment: plan_exec_info.fragments) { + for (const TPlanNode& node: fragment.plan.nodes) { + max_node_id = max(node.node_id, max_node_id); + } + } + + // fragments[i] sends its output to fragments[dest_fragment_idx[i-1]] + for (int i = 1; i < plan_exec_info.fragments.size(); ++i) { + const TPlanFragment& fragment = plan_exec_info.fragments[i]; + FragmentIdx dest_idx = + plan_exec_info.fragments[plan_exec_info.dest_fragment_idx[i - 1]].idx; + MtFragmentExecParams& dest_params = mt_fragment_exec_params_[dest_idx]; + dest_params.input_fragments.push_back(fragment.idx); + } + } + + // populate plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_ + plan_node_to_fragment_idx_.resize(max_node_id + 1); + plan_node_to_plan_node_idx_.resize(max_node_id + 1); + for (const TPlanExecInfo& plan_exec_info: request_.mt_plan_exec_info) { + for (const TPlanFragment& fragment: plan_exec_info.fragments) { + for (int i = 0; i < fragment.plan.nodes.size(); ++i) { + const TPlanNode& node = fragment.plan.nodes[i]; + plan_node_to_fragment_idx_[node.node_id] = fragment.idx; + plan_node_to_plan_node_idx_[node.node_id] = i; + } + } + } +} + + int64_t QuerySchedule::GetClusterMemoryEstimate() const { DCHECK_GT(unique_hosts_.size(), 0); const int64_t total_cluster_mem = GetPerHostMemoryEstimate() * unique_hosts_.size(); @@ -122,4 +187,73 @@ void QuerySchedule::SetUniqueHosts(const unordered_set<TNetworkAddress>& unique_ unique_hosts_ = unique_hosts; } +TUniqueId QuerySchedule::GetNextInstanceId() { + TUniqueId result = next_instance_id_; + ++next_instance_id_.lo; + return result; +} + +const TPlanFragment& FInstanceExecParams::fragment() const { + return fragment_exec_params.fragment; +} + +int QuerySchedule::GetNumFragmentInstances() const { + if (mt_fragment_exec_params_.empty()) return num_fragment_instances_; + int result = 0; + for (const MtFragmentExecParams& fragment_exec_params: mt_fragment_exec_params_) { + result += fragment_exec_params.instance_exec_params.size(); + } + return result; +} + +int QuerySchedule::GetNumRemoteFInstances() const { + bool has_coordinator_fragment = GetCoordFragment() != nullptr; + int result = GetNumFragmentInstances(); + bool is_mt_execution = request_.query_ctx.request.query_options.mt_dop > 0; + if (is_mt_execution && has_coordinator_fragment) --result; + return result; +} + +int QuerySchedule::GetTotalFInstances() const { + int result = GetNumRemoteFInstances(); + return GetCoordFragment() != nullptr ? result + 1 : result; +} + +const TPlanFragment* QuerySchedule::GetCoordFragment() const { + bool is_mt_exec = request_.query_ctx.request.query_options.mt_dop > 0; + const TPlanFragment* fragment = is_mt_exec + ? &request_.mt_plan_exec_info[0].fragments[0] : &request_.fragments[0]; + if (fragment->partition.type == TPartitionType::UNPARTITIONED) { + return fragment; + } else { + return nullptr; + } +} + +void QuerySchedule::GetTPlanFragments(vector<const TPlanFragment*>* fragments) const { + fragments->clear(); + bool is_mt_exec = request_.query_ctx.request.query_options.mt_dop > 0; + if (is_mt_exec) { + for (const TPlanExecInfo& plan_info: request_.mt_plan_exec_info) { + for (const TPlanFragment& fragment: plan_info.fragments) { + fragments->push_back(&fragment); + } + } + } else { + for (const TPlanFragment& fragment: request_.fragments) { + fragments->push_back(&fragment); + } + } +} + +const FInstanceExecParams& QuerySchedule::GetCoordInstanceExecParams() const { + const TPlanFragment& coord_fragment = request_.mt_plan_exec_info[0].fragments[0]; + DCHECK_EQ(coord_fragment.partition.type, TPartitionType::UNPARTITIONED); + const MtFragmentExecParams* fragment_params = + &mt_fragment_exec_params_[coord_fragment.idx]; + DCHECK(fragment_params != nullptr); + DCHECK_EQ(fragment_params->instance_exec_params.size(), 1); + return fragment_params->instance_exec_params[0]; +} + }
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/scheduling/query-schedule.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h index c8ebd5d..39ce268 100644 --- a/be/src/scheduling/query-schedule.h +++ b/be/src/scheduling/query-schedule.h @@ -20,13 +20,14 @@ #include <vector> #include <string> +#include <unordered_map> #include <boost/unordered_set.hpp> -#include <boost/unordered_map.hpp> #include <boost/scoped_ptr.hpp> #include "common/global-types.h" #include "common/status.h" #include "util/promise.h" +#include "util/container-util.h" #include "util/runtime-profile.h" #include "gen-cpp/Types_types.h" // for TNetworkAddress #include "gen-cpp/Frontend_types.h" @@ -34,12 +35,14 @@ namespace impala { class Coordinator; +struct MtFragmentExecParams; /// map from scan node id to a list of scan ranges typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges; + /// map from an impalad host address to the per-node assigned scan ranges; /// records scan range assignment for a single fragment -typedef boost::unordered_map<TNetworkAddress, PerNodeScanRanges> +typedef std::unordered_map<TNetworkAddress, PerNodeScanRanges> FragmentScanRangeAssignment; /// execution parameters for a single fragment; used to assemble the @@ -58,14 +61,62 @@ struct FragmentExecParams { int sender_id_base; }; +/// execution parameters for a single fragment instance; used to assemble the +/// TPlanFragmentInstanceCtx +struct FInstanceExecParams { + TUniqueId instance_id; + TNetworkAddress host; // execution backend + PerNodeScanRanges per_node_scan_ranges; + + /// 0-based ordinal of this particular instance within its fragment (not: query-wide) + int per_fragment_instance_idx; + + /// In its role as a data sender, a fragment instance is assigned a "sender id" to + /// uniquely identify it to a receiver. -1 = invalid. + int sender_id; + + /// the parent MtFragmentExecParams + const MtFragmentExecParams& fragment_exec_params; + const TPlanFragment& fragment() const; + + FInstanceExecParams(const TUniqueId& instance_id, const TNetworkAddress& host, + int per_fragment_instance_idx, const MtFragmentExecParams& fragment_exec_params) + : instance_id(instance_id), host(host), + per_fragment_instance_idx(per_fragment_instance_idx), + sender_id(-1), + fragment_exec_params(fragment_exec_params) {} +}; + +/// Execution parameters shared between fragment instances +struct MtFragmentExecParams { + /// output destinations of this fragment + std::vector<TPlanFragmentDestination> destinations; + + /// map from node id to the number of senders (node id expected to be for an + /// ExchangeNode) + std::map<PlanNodeId, int> per_exch_num_senders; + + // only needed as intermediate state during exec parameter computation; + // for scheduling, refer to FInstanceExecParams.per_node_scan_ranges + FragmentScanRangeAssignment scan_range_assignment; + + bool is_coord_fragment; + const TPlanFragment& fragment; + std::vector<FragmentIdx> input_fragments; + std::vector<FInstanceExecParams> instance_exec_params; + + MtFragmentExecParams(const TPlanFragment& fragment) + : is_coord_fragment(false), fragment(fragment) {} +}; + /// A QuerySchedule contains all necessary information for a query coordinator to /// generate fragment execution requests and start query execution. If resource management /// is enabled, then a schedule also contains the resource reservation request /// and the granted resource reservation. -/// TODO: Consider moving QuerySchedule and all Schedulers into -/// their own lib (and out of statestore). -/// TODO: Move all global state (e.g. profiles) to QueryExecState (after it is decoupled -/// from ImpalaServer) +/// +/// QuerySchedule is a container class for scheduling data, but it doesn't contain +/// scheduling logic itself. Its state either comes from the static TQueryExecRequest +/// or is computed by SimpleScheduler. class QuerySchedule { public: QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request, @@ -87,19 +138,86 @@ class QuerySchedule { int64_t GetClusterMemoryEstimate() const; /// Helper methods used by scheduler to populate this QuerySchedule. - void AddScanRanges(int64_t delta) { num_scan_ranges_ += delta; } + void IncNumScanRanges(int64_t delta) { num_scan_ranges_ += delta; } + + /// The following 4 functions need to be replaced once we stop special-casing + /// the coordinator instance in the coordinator. + /// The replacement is a single function int GetNumFInstances() (which includes + /// the coordinator instance). + + /// TODO-MT: remove; this is actually only the number of remote instances + /// (from the coordinator's perspective) void set_num_fragment_instances(int64_t num_fragment_instances) { num_fragment_instances_ = num_fragment_instances; } - int64_t num_fragment_instances() const { return num_fragment_instances_; } + + /// Returns the number of fragment instances registered with this schedule. + /// MT: total number of fragment instances + /// ST: value set with set_num_fragment_instances(); excludes coord instance + /// (in effect the number of remote instances) + /// TODO-MT: get rid of special-casing of coordinator instance and always return the + /// total + int GetNumFragmentInstances() const; + + /// Returns the total number of fragment instances, incl. coordinator fragment. + /// TODO-MT: remove + int GetTotalFInstances() const; + + /// Returns the number of remote fragment instances (excludes coordinator). + /// Works for both MT and ST. + /// TODO-MT: remove + int GetNumRemoteFInstances() const; + + /// Return the coordinator fragment, or nullptr if there isn't one. + const TPlanFragment* GetCoordFragment() const; + + /// Return all fragments belonging to exec request in 'fragments'. + void GetTPlanFragments(std::vector<const TPlanFragment*>* fragments) const; + int64_t num_scan_ranges() const { return num_scan_ranges_; } - /// Map node ids to the index of their fragment in TQueryExecRequest.fragments. - int32_t GetFragmentIdx(PlanNodeId id) const { return plan_node_to_fragment_idx_[id]; } + /// Map node ids to the id of their containing fragment. + FragmentIdx GetFragmentIdx(PlanNodeId id) const { + return plan_node_to_fragment_idx_[id]; + } + + /// Returns next instance id. Instance ids are consecutive numbers generated from + /// the query id. + /// If the query contains a coordinator fragment instance, the generated instance + /// ids start at 1 and the caller is responsible for assigning the correct id + /// to the coordinator instance. If the query does not contain a coordinator instance, + /// the generated instance ids start at 0. + TUniqueId GetNextInstanceId(); + + const TPlanFragment& GetContainingFragment(PlanNodeId node_id) const { + return mt_fragment_exec_params_[GetFragmentIdx(node_id)].fragment; + } - /// Map node ids to the index of the node inside their plan.nodes list. + /// Map node ids to the index of their node inside their plan.nodes list. + /// TODO-MT: remove; only needed for the ST path int32_t GetNodeIdx(PlanNodeId id) const { return plan_node_to_plan_node_idx_[id]; } + + const TPlanNode& GetNode(PlanNodeId id) const { + const TPlanFragment& fragment = GetContainingFragment(id); + return fragment.plan.nodes[plan_node_to_plan_node_idx_[id]]; + } + std::vector<FragmentExecParams>* exec_params() { return &fragment_exec_params_; } + const std::vector<FragmentExecParams>& exec_params() const { + return fragment_exec_params_; + } + const std::vector<MtFragmentExecParams>& mt_fragment_exec_params() const { + return mt_fragment_exec_params_; + } + const MtFragmentExecParams& GetFragmentExecParams(FragmentIdx idx) const { + return mt_fragment_exec_params_[idx]; + } + MtFragmentExecParams* GetFragmentExecParams(FragmentIdx idx) { + return &mt_fragment_exec_params_[idx]; + } + + const FInstanceExecParams& GetCoordInstanceExecParams() const; + const boost::unordered_set<TNetworkAddress>& unique_hosts() const { return unique_hosts_; } @@ -111,7 +229,6 @@ class QuerySchedule { void SetUniqueHosts(const boost::unordered_set<TNetworkAddress>& unique_hosts); private: - /// These references are valid for the lifetime of this query schedule because they /// are all owned by the enclosing QueryExecState. const TUniqueId& query_id_; @@ -122,7 +239,7 @@ class QuerySchedule { RuntimeProfile* summary_profile_; RuntimeProfile::EventSequence* query_events_; - /// Maps from plan node id to its fragment index. Filled in c'tor. + /// Maps from plan node id to its fragment idx. Filled in c'tor. std::vector<int32_t> plan_node_to_fragment_idx_; /// Maps from plan node id to its index in plan.nodes. Filled in c'tor. @@ -132,21 +249,33 @@ class QuerySchedule { /// populated by Scheduler::Schedule() std::vector<FragmentExecParams> fragment_exec_params_; + // populated by Scheduler::Schedule (SimpleScheduler::ComputeMtFInstanceExecParams()) + // indexed by fragment idx (TPlanFragment.idx) + std::vector<MtFragmentExecParams> mt_fragment_exec_params_; + /// The set of hosts that the query will run on excluding the coordinator. boost::unordered_set<TNetworkAddress> unique_hosts_; /// Number of backends executing plan fragments on behalf of this query. + /// TODO-MT: remove int64_t num_fragment_instances_; /// Total number of scan ranges of this query. int64_t num_scan_ranges_; + /// Used to generate consecutive fragment instance ids. + TUniqueId next_instance_id_; + /// Request pool to which the request was submitted for admission. std::string request_pool_; /// Indicates if the query has been admitted for execution. bool is_admitted_; + /// Populate mt_fragment_exec_params_ from request_.mt_plan_exec_info. + /// Sets is_coord_fragment and input_fragments. + /// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_. + void MtInit(); }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/scheduling/scheduler.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h index 4c3a967..9bff424 100644 --- a/be/src/scheduling/scheduler.h +++ b/be/src/scheduling/scheduler.h @@ -46,12 +46,10 @@ class Scheduler { /// List of server descriptors. typedef std::vector<TBackendDescriptor> BackendList; - /// Populates given query schedule whose execution is to be coordinated by coord. - /// Assigns fragments to hosts based on scan ranges in the query exec request. - /// If resource management is enabled, also reserves resources from the central - /// resource manager (Yarn via Llama) to run the query in. This function blocks until - /// the reservation request has been granted or denied. - virtual Status Schedule(Coordinator* coord, QuerySchedule* schedule) = 0; + /// Populates given query schedule and assigns fragments to hosts based on scan + /// ranges in the query exec request. Submits schedule to admission control before + /// returning. + virtual Status Schedule(QuerySchedule* schedule) = 0; /// Releases the reserved resources (if any) from the given schedule. virtual Status Release(QuerySchedule* schedule) = 0; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/scheduling/simple-scheduler.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc index f3ba9a5..5b6303e 100644 --- a/be/src/scheduling/simple-scheduler.cc +++ b/be/src/scheduling/simple-scheduler.cc @@ -20,6 +20,7 @@ #include <atomic> #include <random> #include <vector> +#include <algorithm> #include <boost/algorithm/string.hpp> #include <boost/algorithm/string/join.hpp> @@ -276,6 +277,7 @@ void SimpleScheduler::UpdateMembership( } } if (metrics_ != NULL) { + /// TODO-MT: fix this (do we even need to report it?) num_fragment_instances_metric_->set_value(current_membership_.size()); } } @@ -305,7 +307,7 @@ Status SimpleScheduler::ComputeScanRangeAssignment(const TQueryExecRequest& exec for (entry = exec_request.per_node_scan_ranges.begin(); entry != exec_request.per_node_scan_ranges.end(); ++entry) { const TPlanNodeId node_id = entry->first; - int fragment_idx = schedule->GetFragmentIdx(node_id); + FragmentIdx fragment_idx = schedule->GetFragmentIdx(node_id); const TPlanFragment& fragment = exec_request.fragments[fragment_idx]; bool exec_at_coord = (fragment.partition.type == TPartitionType::UNPARTITIONED); @@ -326,11 +328,208 @@ Status SimpleScheduler::ComputeScanRangeAssignment(const TQueryExecRequest& exec node_id, node_replica_preference, node_random_replica, entry->second, exec_request.host_list, exec_at_coord, schedule->query_options(), total_assignment_timer, assignment)); - schedule->AddScanRanges(entry->second.size()); + schedule->IncNumScanRanges(entry->second.size()); } return Status::OK(); } +Status SimpleScheduler::MtComputeScanRangeAssignment(QuerySchedule* schedule) { + RuntimeProfile::Counter* total_assignment_timer = + ADD_TIMER(schedule->summary_profile(), "ComputeScanRangeAssignmentTimer"); + BackendConfigPtr backend_config = GetBackendConfig(); + const TQueryExecRequest& exec_request = schedule->request(); + for (const TPlanExecInfo& plan_exec_info: exec_request.mt_plan_exec_info) { + for (const auto& entry: plan_exec_info.per_node_scan_ranges) { + const TPlanNodeId node_id = entry.first; + const TPlanFragment& fragment = schedule->GetContainingFragment(node_id); + bool exec_at_coord = (fragment.partition.type == TPartitionType::UNPARTITIONED); + + const TPlanNode& node = schedule->GetNode(node_id); + DCHECK_EQ(node.node_id, node_id); + + const TReplicaPreference::type* node_replica_preference = + node.__isset.hdfs_scan_node && node.hdfs_scan_node.__isset.replica_preference + ? &node.hdfs_scan_node.replica_preference : NULL; + bool node_random_replica = + node.__isset.hdfs_scan_node && node.hdfs_scan_node.__isset.random_replica + && node.hdfs_scan_node.random_replica; + + FragmentScanRangeAssignment* assignment = + &schedule->GetFragmentExecParams(fragment.idx)->scan_range_assignment; + RETURN_IF_ERROR(ComputeScanRangeAssignment( + *backend_config, node_id, node_replica_preference, node_random_replica, + entry.second, exec_request.host_list, exec_at_coord, + schedule->query_options(), total_assignment_timer, assignment)); + schedule->IncNumScanRanges(entry.second.size()); + } + } + return Status::OK(); +} + +void SimpleScheduler::MtComputeFragmentExecParams(QuerySchedule* schedule) { + const TQueryExecRequest& exec_request = schedule->request(); + + // for each plan, compute the FInstanceExecParams for the tree of fragments + for (const TPlanExecInfo& plan_exec_info: exec_request.mt_plan_exec_info) { + // set instance_id, host, per_node_scan_ranges + MtComputeFragmentExecParams( + plan_exec_info, + schedule->GetFragmentExecParams(plan_exec_info.fragments[0].idx), + schedule); + + // Set destinations, per_exch_num_senders, sender_id. + // fragments[f] sends its output to fragments[dest_fragment_idx[f-1]]; + // fragments[0] is an endpoint. + for (int i = 0; i < plan_exec_info.dest_fragment_idx.size(); ++i) { + int dest_idx = plan_exec_info.dest_fragment_idx[i]; + DCHECK_LT(dest_idx, plan_exec_info.fragments.size()); + const TPlanFragment& dest_fragment = plan_exec_info.fragments[dest_idx]; + DCHECK_LT(i + 1, plan_exec_info.fragments.size()); + const TPlanFragment& src_fragment = plan_exec_info.fragments[i + 1]; + DCHECK(src_fragment.output_sink.__isset.stream_sink); + MtFragmentExecParams* dest_params = + schedule->GetFragmentExecParams(dest_fragment.idx); + MtFragmentExecParams* src_params = + schedule->GetFragmentExecParams(src_fragment.idx); + + // populate src_params->destinations + src_params->destinations.resize(dest_params->instance_exec_params.size()); + for (int j = 0; j < dest_params->instance_exec_params.size(); ++j) { + TPlanFragmentDestination& dest = src_params->destinations[j]; + dest.__set_fragment_instance_id(dest_params->instance_exec_params[j].instance_id); + dest.__set_server(dest_params->instance_exec_params[j].host); + } + + // enumerate senders consecutively; + // for distributed merge we need to enumerate senders across fragment instances + const TDataStreamSink& sink = src_fragment.output_sink.stream_sink; + DCHECK( + sink.output_partition.type == TPartitionType::UNPARTITIONED + || sink.output_partition.type == TPartitionType::HASH_PARTITIONED + || sink.output_partition.type == TPartitionType::RANDOM); + PlanNodeId exch_id = sink.dest_node_id; + int sender_id_base = dest_params->per_exch_num_senders[exch_id]; + dest_params->per_exch_num_senders[exch_id] += + src_params->instance_exec_params.size(); + for (int j = 0; j < src_params->instance_exec_params.size(); ++j) { + FInstanceExecParams& src_instance_params = src_params->instance_exec_params[j]; + src_instance_params.sender_id = sender_id_base + j; + } + } + } +} + +void SimpleScheduler::MtComputeFragmentExecParams( + const TPlanExecInfo& plan_exec_info, MtFragmentExecParams* fragment_params, + QuerySchedule* schedule) { + // traverse input fragments + for (FragmentIdx input_fragment_idx: fragment_params->input_fragments) { + MtComputeFragmentExecParams( + plan_exec_info, schedule->GetFragmentExecParams(input_fragment_idx), schedule); + } + + const TPlanFragment& fragment = fragment_params->fragment; + // TODO: deal with Union nodes + DCHECK(!ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)); + // case 1: single instance executed at coordinator + if (fragment.partition.type == TPartitionType::UNPARTITIONED) { + const TNetworkAddress& coord = local_backend_descriptor_.address; + // make sure that the coordinator instance ends up with instance idx 0 + TUniqueId instance_id = fragment_params->is_coord_fragment + ? schedule->query_id() + : schedule->GetNextInstanceId(); + fragment_params->instance_exec_params.emplace_back( + instance_id, coord, 0, *fragment_params); + FInstanceExecParams& instance_params = fragment_params->instance_exec_params.back(); + + // That instance gets all of the scan ranges, if there are any. + if (!fragment_params->scan_range_assignment.empty()) { + DCHECK_EQ(fragment_params->scan_range_assignment.size(), 1); + auto first_entry = fragment_params->scan_range_assignment.begin(); + instance_params.per_node_scan_ranges = first_entry->second; + } + } else { + PlanNodeId leftmost_scan_id = FindLeftmostScan(fragment.plan); + if (leftmost_scan_id != g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) { + // case 2: leaf fragment with leftmost scan + // TODO: check that there's only one scan in this fragment + MtCreateScanInstances(leftmost_scan_id, fragment_params, schedule); + } else { + // case 3: interior fragment without leftmost scan + // we assign the same hosts as those of our leftmost input fragment (so that a + // merge aggregation fragment runs on the hosts that provide the input data) + MtCreateMirrorInstances(fragment_params, schedule); + } + } +} + +void SimpleScheduler::MtCreateScanInstances( + PlanNodeId leftmost_scan_id, MtFragmentExecParams* fragment_params, + QuerySchedule* schedule) { + int max_num_instances = schedule->request().query_ctx.request.query_options.mt_dop; + for (const auto& assignment_entry: fragment_params->scan_range_assignment) { + // evenly divide up the scan ranges of the leftmost scan between at most + // <dop> instances + const TNetworkAddress& host = assignment_entry.first; + auto scan_ranges_it = assignment_entry.second.find(leftmost_scan_id); + DCHECK(scan_ranges_it != assignment_entry.second.end()); + const vector<TScanRangeParams>& params_list = scan_ranges_it->second; + + int64 total_size = 0; + for (const TScanRangeParams& params: params_list) { + // TODO: implement logic for hbase and kudu + DCHECK(params.scan_range.__isset.hdfs_file_split); + total_size += params.scan_range.hdfs_file_split.length; + } + + // try to load-balance scan ranges by assigning just beyond the average number of + // bytes to each instance + // TODO: fix shortcomings introduced by uneven split sizes, + // this could end up assigning 0 scan ranges to an instance + int num_instances = ::min(max_num_instances, static_cast<int>(params_list.size())); + DCHECK_GT(num_instances, 0); + float avg_bytes_per_instance = static_cast<float>(total_size) / num_instances; + int64_t total_assigned_bytes = 0; + int params_idx = 0; // into params_list + for (int i = 0; i < num_instances; ++i) { + fragment_params->instance_exec_params.emplace_back( + schedule->GetNextInstanceId(), host, i, *fragment_params); + FInstanceExecParams& instance_params = fragment_params->instance_exec_params.back(); + + // threshold beyond which we want to assign to the next instance + int64_t threshold_total_bytes = avg_bytes_per_instance * (i + 1); + + // this will have assigned all scan ranges by the last instance: + // for the last instance, threshold_total_bytes == total_size and + // total_assigned_bytes won't hit total_size until everything is assigned + while (params_idx < params_list.size() + && total_assigned_bytes < threshold_total_bytes) { + const TScanRangeParams& scan_range_params = params_list[params_idx]; + instance_params.per_node_scan_ranges[leftmost_scan_id].push_back( + scan_range_params); + total_assigned_bytes += scan_range_params.scan_range.hdfs_file_split.length; + ++params_idx; + } + if (params_idx >= params_list.size()) break; // nothing left to assign + } + DCHECK_EQ(params_idx, params_list.size()); // everything got assigned + } +} + +void SimpleScheduler::MtCreateMirrorInstances( + MtFragmentExecParams* fragment_params, QuerySchedule* schedule) { + DCHECK_GE(fragment_params->input_fragments.size(), 1); + const MtFragmentExecParams* input_fragment_params = + schedule->GetFragmentExecParams(fragment_params->input_fragments[0]); + int per_fragment_instance_idx = 0; + for (const FInstanceExecParams& input_instance_params: + input_fragment_params->instance_exec_params) { + fragment_params->instance_exec_params.emplace_back( + schedule->GetNextInstanceId(), input_instance_params.host, + per_fragment_instance_idx++, *fragment_params); + } +} + Status SimpleScheduler::ComputeScanRangeAssignment( const BackendConfig& backend_config, PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference, bool node_random_replica, @@ -459,12 +658,10 @@ void SimpleScheduler::ComputeFragmentExecParams(const TQueryExecRequest& exec_re // assign instance ids int64_t num_fragment_instances = 0; for (FragmentExecParams& params: *fragment_exec_params) { - for (int j = 0; j < params.hosts.size(); ++j) { - int instance_idx = num_fragment_instances + j; + for (int j = 0; j < params.hosts.size(); ++j, ++num_fragment_instances) { params.instance_ids.push_back( - CreateInstanceId(schedule->query_id(), instance_idx)); + CreateInstanceId(schedule->query_id(), num_fragment_instances)); } - num_fragment_instances += params.hosts.size(); } if (exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED) { // the root fragment is executed directly by the coordinator @@ -511,11 +708,6 @@ void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request QuerySchedule* schedule) { vector<FragmentExecParams>* fragment_exec_params = schedule->exec_params(); DCHECK_EQ(fragment_exec_params->size(), exec_request.fragments.size()); - vector<TPlanNodeType::type> scan_node_types; - scan_node_types.push_back(TPlanNodeType::HDFS_SCAN_NODE); - scan_node_types.push_back(TPlanNodeType::HBASE_SCAN_NODE); - scan_node_types.push_back(TPlanNodeType::DATA_SOURCE_NODE); - scan_node_types.push_back(TPlanNodeType::KUDU_SCAN_NODE); // compute hosts of producer fragment before those of consumer fragment(s), // the latter might inherit the set of hosts from the former @@ -535,6 +727,9 @@ void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request // a UnionNode with partitioned joins or grouping aggregates as children runs on // at least as many hosts as the input to those children). if (ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)) { + vector<TPlanNodeType::type> scan_node_types { + TPlanNodeType::HDFS_SCAN_NODE, TPlanNodeType::HBASE_SCAN_NODE, + TPlanNodeType::DATA_SOURCE_NODE, TPlanNodeType::KUDU_SCAN_NODE}; vector<TPlanNodeId> scan_nodes; FindNodes(fragment.plan, scan_node_types, &scan_nodes); vector<TPlanNodeId> exch_nodes; @@ -562,7 +757,7 @@ void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request continue; } - PlanNodeId leftmost_scan_id = FindLeftmostNode(fragment.plan, scan_node_types); + PlanNodeId leftmost_scan_id = FindLeftmostScan(fragment.plan); if (leftmost_scan_id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) { // there is no leftmost scan; we assign the same hosts as those of our // leftmost input fragment (so that a partitioned aggregation fragment @@ -606,6 +801,13 @@ PlanNodeId SimpleScheduler::FindLeftmostNode( return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID; } +PlanNodeId SimpleScheduler::FindLeftmostScan(const TPlan& plan) { + vector<TPlanNodeType::type> scan_node_types { + TPlanNodeType::HDFS_SCAN_NODE, TPlanNodeType::HBASE_SCAN_NODE, + TPlanNodeType::DATA_SOURCE_NODE, TPlanNodeType::KUDU_SCAN_NODE}; + return FindLeftmostNode(plan, scan_node_types); +} + bool SimpleScheduler::ContainsNode(const TPlan& plan, TPlanNodeType::type type) { for (int i = 0; i < plan.nodes.size(); ++i) { if (plan.nodes[i].node_type == type) return true; @@ -674,18 +876,35 @@ int SimpleScheduler::FindSenderFragment(TPlanNodeId exch_id, int fragment_idx, return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID; } -Status SimpleScheduler::Schedule(Coordinator* coord, QuerySchedule* schedule) { +Status SimpleScheduler::Schedule(QuerySchedule* schedule) { string resolved_pool; RETURN_IF_ERROR(request_pool_service_->ResolveRequestPool( schedule->request().query_ctx, &resolved_pool)); schedule->set_request_pool(resolved_pool); schedule->summary_profile()->AddInfoString("Request Pool", resolved_pool); - RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule->request(), schedule)); - ComputeFragmentHosts(schedule->request(), schedule); - ComputeFragmentExecParams(schedule->request(), schedule); - if (!FLAGS_disable_admission_control) { - RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule)); + bool is_mt_execution = schedule->request().query_ctx.request.query_options.mt_dop > 0; + if (is_mt_execution) { + RETURN_IF_ERROR(MtComputeScanRangeAssignment(schedule)); + MtComputeFragmentExecParams(schedule); + + // compute unique hosts + unordered_set<TNetworkAddress> unique_hosts; + for (const MtFragmentExecParams& f: schedule->mt_fragment_exec_params()) { + for (const FInstanceExecParams& i: f.instance_exec_params) { + unique_hosts.insert(i.host); + } + } + schedule->SetUniqueHosts(unique_hosts); + + // TODO-MT: call AdmitQuery() + } else { + RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule->request(), schedule)); + ComputeFragmentHosts(schedule->request(), schedule); + ComputeFragmentExecParams(schedule->request(), schedule); + if (!FLAGS_disable_admission_control) { + RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule)); + } } return Status::OK(); } @@ -849,10 +1068,10 @@ void SimpleScheduler::AssignmentCtx::RecordScanRangeAssignment( if (!remote_read) total_local_assignments_->Increment(1); } - PerNodeScanRanges* scan_ranges = FindOrInsert(assignment, backend.address, - PerNodeScanRanges()); - vector<TScanRangeParams>* scan_range_params_list = FindOrInsert(scan_ranges, node_id, - vector<TScanRangeParams>()); + PerNodeScanRanges* scan_ranges = + FindOrInsert(assignment, backend.address, PerNodeScanRanges()); + vector<TScanRangeParams>* scan_range_params_list = + FindOrInsert(scan_ranges, node_id, vector<TScanRangeParams>()); // Add scan range. TScanRangeParams scan_range_params; scan_range_params.scan_range = scan_range_locations.scan_range; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/scheduling/simple-scheduler.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler.h b/be/src/scheduling/simple-scheduler.h index 9b51269..b7cc83e 100644 --- a/be/src/scheduling/simple-scheduler.h +++ b/be/src/scheduling/simple-scheduler.h @@ -88,7 +88,7 @@ class SimpleScheduler : public Scheduler { /// Register with the subscription manager if required virtual impala::Status Init(); - virtual Status Schedule(Coordinator* coord, QuerySchedule* schedule); + virtual Status Schedule(QuerySchedule* schedule); virtual Status Release(QuerySchedule* schedule); private: @@ -405,7 +405,39 @@ class SimpleScheduler : public Scheduler { void ComputeFragmentExecParams(const TQueryExecRequest& exec_request, QuerySchedule* schedule); - /// For each fragment in exec_request, compute the hosts on which to run the instances + /// Compute the assignment of scan ranges to hosts for each scan node in + /// the schedule's TQueryExecRequest.mt_plan_exec_info. + /// Unpartitioned fragments are assigned to the coordinator. Populate the schedule's + /// mt_fragment_exec_params_ with the resulting scan range assignment. + Status MtComputeScanRangeAssignment(QuerySchedule* schedule); + + /// Compute the MtFragmentExecParams for all plans in the schedule's + /// TQueryExecRequest.mt_plan_exec_info. + /// This includes the routing information (destinations, per_exch_num_senders, + /// sender_id) + void MtComputeFragmentExecParams(QuerySchedule* schedule); + + /// Recursively create FInstanceExecParams and set per_node_scan_ranges for + /// fragment_params and its input fragments via a depth-first traversal. + /// All fragments are part of plan_exec_info. + void MtComputeFragmentExecParams(const TPlanExecInfo& plan_exec_info, + MtFragmentExecParams* fragment_params, QuerySchedule* schedule); + + /// Create instances of the fragment corresponding to fragment_params to run on the + /// selected replica hosts of the scan ranges of the node with id scan_id. + /// The maximum number of instances is the value of query option mt_dop. + /// This attempts to load balance among instances by computing the average number + /// of bytes per instances and then in a single pass assigning scan ranges to each + /// instances to roughly meet that average. + void MtCreateScanInstances(PlanNodeId scan_id, + MtFragmentExecParams* fragment_params, QuerySchedule* schedule); + + /// For each instance of the single input fragment of the fragment corresponding to + /// fragment_params, create an instance for this fragment. + void MtCreateMirrorInstances(MtFragmentExecParams* fragment_params, + QuerySchedule* schedule); + + /// For each fragment in exec_request, computes hosts on which to run the instances /// and stores result in fragment_exec_params_.hosts. void ComputeFragmentHosts(const TQueryExecRequest& exec_request, QuerySchedule* schedule); @@ -414,6 +446,8 @@ class SimpleScheduler : public Scheduler { /// INVALID_PLAN_NODE_ID if no such node present. PlanNodeId FindLeftmostNode( const TPlan& plan, const std::vector<TPlanNodeType::type>& types); + /// Same for scan nodes. + PlanNodeId FindLeftmostScan(const TPlan& plan); /// Return the index (w/in exec_request.fragments) of fragment that sends its output to /// exec_request.fragment[fragment_idx]'s leftmost ExchangeNode. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/fragment-exec-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/fragment-exec-state.cc b/be/src/service/fragment-exec-state.cc index 9069b03..76e11d1 100644 --- a/be/src/service/fragment-exec-state.cc +++ b/be/src/service/fragment-exec-state.cc @@ -81,7 +81,6 @@ void FragmentMgr::FragmentExecState::ReportStatusCb( TReportExecStatusParams params; params.protocol_version = ImpalaInternalServiceVersion::V1; params.__set_query_id(query_ctx_.query_id); - params.__set_instance_state_idx( fragment_instance_ctx_.instance_state_idx); params.__set_fragment_instance_id(fragment_instance_ctx_.fragment_instance_id); exec_status.SetTStatus(¶ms); params.__set_done(done); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/fragment-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/service/fragment-mgr.cc b/be/src/service/fragment-mgr.cc index a98bbf9..64e9a78 100644 --- a/be/src/service/fragment-mgr.cc +++ b/be/src/service/fragment-mgr.cc @@ -38,10 +38,8 @@ DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory Status FragmentMgr::ExecPlanFragment(const TExecPlanFragmentParams& exec_params) { VLOG_QUERY << "ExecPlanFragment() instance_id=" - << exec_params.fragment_instance_ctx.fragment_instance_id - << " coord=" << exec_params.query_ctx.coord_address - << " fragment instance#=" - << exec_params.fragment_instance_ctx.instance_state_idx; + << PrintId(exec_params.fragment_instance_ctx.fragment_instance_id) + << " coord=" << exec_params.query_ctx.coord_address; // Preparing and opening the fragment creates a thread and consumes a non-trivial // amount of memory. If we are already starved for memory, cancel the fragment as @@ -146,6 +144,7 @@ void FragmentMgr::CancelPlanFragment(TCancelPlanFragmentResult& return_val, void FragmentMgr::PublishFilter(TPublishFilterResult& return_val, const TPublishFilterParams& params) { + VLOG_FILE << "PublishFilter(): dst_instance_id=" << params.dst_instance_id; shared_ptr<FragmentExecState> fragment_exec_state = GetFragmentExecState(params.dst_instance_id); if (fragment_exec_state.get() == NULL) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index d5fd59a..7f9d862 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -845,9 +845,7 @@ void ImpalaServer::PrepareQueryContext(TQueryCtx* query_ctx) { // benchmarks show it to be slightly cheaper than contending for a // single generator under a lock (since random_generator is not // thread-safe). - random_generator uuid_generator; - uuid query_uuid = uuid_generator(); - query_ctx->query_id = UuidToQueryId(query_uuid); + query_ctx->query_id = UuidToQueryId(random_generator()()); } Status ImpalaServer::RegisterQuery(shared_ptr<SessionState> session_state, @@ -1077,9 +1075,8 @@ Status ImpalaServer::GetSessionState(const TUniqueId& session_id, void ImpalaServer::ReportExecStatus( TReportExecStatusResult& return_val, const TReportExecStatusParams& params) { - VLOG_FILE << "ReportExecStatus() query_id=" << params.query_id - << " fragment instance#=" << params.instance_state_idx - << " instance_id=" << params.fragment_instance_id + VLOG_FILE << "ReportExecStatus()" + << " instance_id=" << PrintId(params.fragment_instance_id) << " done=" << (params.done ? "true" : "false"); // TODO: implement something more efficient here, we're currently // acquiring/releasing the map lock and doing a map lookup for @@ -1090,10 +1087,8 @@ void ImpalaServer::ReportExecStatus( // This is expected occasionally (since a report RPC might be in flight while // cancellation is happening). Return an error to the caller to get it to stop. const string& err = Substitute("ReportExecStatus(): Received report for unknown " - "query ID (probably closed or cancelled). (query_id: $0, backend: $1, instance:" - " $2 done: $3)", PrintId(params.query_id), - params.instance_state_idx, PrintId(params.fragment_instance_id), - params.done); + "query ID (probably closed or cancelled). (instance: $0 done: $1)", + PrintId(params.fragment_instance_id), params.done); Status(TErrorCode::INTERNAL_ERROR, err).SetTStatus(&return_val); VLOG_QUERY << err; return; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/query-exec-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc index 9b2fc88..d55ac54 100644 --- a/be/src/service/query-exec-state.cc +++ b/be/src/service/query-exec-state.cc @@ -373,7 +373,8 @@ Status ImpalaServer::QueryExecState::ExecLocalCatalogOp( Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest( const TQueryExecRequest& query_exec_request) { // we always need at least one plan fragment - DCHECK_GT(query_exec_request.fragments.size(), 0); + DCHECK(query_exec_request.fragments.size() > 0 + || query_exec_request.mt_plan_exec_info.size() > 0); if (query_exec_request.__isset.query_plan) { stringstream plan_ss; @@ -424,26 +425,31 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest( // case, the query can only have a single fragment, and that fragment needs to be // executed by the coordinator. This check confirms that. // If desc_tbl is set, the query may or may not have a coordinator fragment. + bool is_mt_exec = query_exec_request.query_ctx.request.query_options.mt_dop > 0; + const TPlanFragment& fragment = is_mt_exec + ? query_exec_request.mt_plan_exec_info[0].fragments[0] + : query_exec_request.fragments[0]; bool has_coordinator_fragment = - query_exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED; + fragment.partition.type == TPartitionType::UNPARTITIONED; DCHECK(has_coordinator_fragment || query_exec_request.__isset.desc_tbl); { lock_guard<mutex> l(lock_); // Don't start executing the query if Cancel() was called concurrently with Exec(). if (is_cancelled_) return Status::CANCELLED; + // TODO: make schedule local to coordinator and move schedule_->Release() into + // Coordinator::TearDown() schedule_.reset(new QuerySchedule(query_id(), query_exec_request, exec_request_.query_options, &summary_profile_, query_events_)); - coord_.reset(new Coordinator(exec_request_.query_options, exec_env_, query_events_)); } - Status status = exec_env_->scheduler()->Schedule(coord_.get(), schedule_.get()); - + Status status = exec_env_->scheduler()->Schedule(schedule_.get()); { lock_guard<mutex> l(lock_); RETURN_IF_ERROR(UpdateQueryStatus(status)); } - status = coord_->Exec(*schedule_, &output_expr_ctxs_); + coord_.reset(new Coordinator(*schedule_, exec_env_, query_events_)); + status = coord_->Exec(&output_expr_ctxs_); { lock_guard<mutex> l(lock_); RETURN_IF_ERROR(UpdateQueryStatus(status)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/query-options.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index d0e4275..0823981 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -395,16 +395,16 @@ Status impala::SetQueryOption(const string& key, const string& value, } break; } - case TImpalaQueryOptions::MT_NUM_CORES: { + case TImpalaQueryOptions::MT_DOP: { StringParser::ParseResult result; - const int32_t num_cores = + const int32_t dop = StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result); - if (result != StringParser::PARSE_SUCCESS || num_cores < 0 || num_cores > 128) { + if (result != StringParser::PARSE_SUCCESS || dop < 0 || dop > 128) { return Status( - Substitute("$0 is not valid for mt_num_cores. Valid values are in " + Substitute("$0 is not valid for mt_dop. Valid values are in " "[0, 128].", value)); } - query_options->__set_mt_num_cores(num_cores); + query_options->__set_mt_dop(dop); break; } case TImpalaQueryOptions::S3_SKIP_INSERT_STAGING: { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/query-options.h ---------------------------------------------------------------------- diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index 2c25700..b1194d3 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -79,7 +79,7 @@ class TQueryOptions; QUERY_OPT_FN(max_num_runtime_filters, MAX_NUM_RUNTIME_FILTERS)\ QUERY_OPT_FN(parquet_annotate_strings_utf8, PARQUET_ANNOTATE_STRINGS_UTF8)\ QUERY_OPT_FN(parquet_fallback_schema_resolution, PARQUET_FALLBACK_SCHEMA_RESOLUTION)\ - QUERY_OPT_FN(mt_num_cores, MT_NUM_CORES)\ + QUERY_OPT_FN(mt_dop, MT_DOP)\ QUERY_OPT_FN(s3_skip_insert_staging, S3_SKIP_INSERT_STAGING)\ QUERY_OPT_FN(runtime_filter_min_size, RUNTIME_FILTER_MIN_SIZE)\ QUERY_OPT_FN(runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE)\ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/util/container-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/container-util.h b/be/src/util/container-util.h index b8cd1ef..efb4f52 100644 --- a/be/src/util/container-util.h +++ b/be/src/util/container-util.h @@ -20,6 +20,7 @@ #define IMPALA_UTIL_CONTAINER_UTIL_H #include <map> +#include <unordered_map> #include <boost/unordered_map.hpp> #include "util/hash-util.h" @@ -35,6 +36,21 @@ inline std::size_t hash_value(const TNetworkAddress& host_port) { return HashUtil::Hash(&host_port.port, sizeof(host_port.port), hash); } +} + +/// Hash function for std:: containers +namespace std { + +template<> struct hash<impala::TNetworkAddress> { + std::size_t operator()(const impala::TNetworkAddress& host_port) const { + return impala::hash_value(host_port); + } +}; + +} + +namespace impala { + struct HashTNetworkAddressPtr : public std::unary_function<TNetworkAddress*, size_t> { size_t operator()(const TNetworkAddress* const& p) const { return hash_value(*p); } }; @@ -49,6 +65,7 @@ struct TNetworkAddressPtrEquals : public std::unary_function<TNetworkAddress*, b /// FindOrInsert(): if the key is present, return the value; if the key is not present, /// create a new entry (key, default_val) and return default_val. +/// TODO: replace with single template which takes a template param template <typename K, typename V> V* FindOrInsert(std::map<K,V>* m, const K& key, const V& default_val) { @@ -60,6 +77,15 @@ V* FindOrInsert(std::map<K,V>* m, const K& key, const V& default_val) { } template <typename K, typename V> +V* FindOrInsert(std::unordered_map<K,V>* m, const K& key, const V& default_val) { + typename std::unordered_map<K,V>::iterator it = m->find(key); + if (it == m->end()) { + it = m->insert(std::make_pair(key, default_val)).first; + } + return &it->second; +} + +template <typename K, typename V> V* FindOrInsert(boost::unordered_map<K,V>* m, const K& key, const V& default_val) { typename boost::unordered_map<K,V>::iterator it = m->find(key); if (it == m->end()) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/util/uid-util-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/uid-util-test.cc b/be/src/util/uid-util-test.cc index ebbc5eb..997cc0a 100644 --- a/be/src/util/uid-util-test.cc +++ b/be/src/util/uid-util-test.cc @@ -31,12 +31,7 @@ TEST(UidUtil, FragmentInstanceId) { for (int i = 0; i < 100; ++i) { TUniqueId instance_id = CreateInstanceId(query_id, i); - EXPECT_EQ(instance_id.hi, query_id.hi); - - TUniqueId qid = GetQueryId(instance_id); - EXPECT_EQ(qid.hi, query_id.hi); - EXPECT_EQ(qid.lo, query_id.lo); - + EXPECT_EQ(GetQueryId(instance_id), query_id); EXPECT_EQ(GetInstanceIdx(instance_id), i); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/util/uid-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/uid-util.h b/be/src/util/uid-util.h index f0f87ec..de18464 100644 --- a/be/src/util/uid-util.h +++ b/be/src/util/uid-util.h @@ -45,7 +45,7 @@ inline void UUIDToTUniqueId(const boost::uuids::uuid& uuid, TUniqueId* unique_id /// Query id: uuid with bottom 4 bytes set to 0 /// Fragment instance id: query id with instance index stored in the bottom 4 bytes -const int64_t FRAGMENT_IDX_MASK = (1L << 32) - 1; +constexpr int64_t FRAGMENT_IDX_MASK = (1L << 32) - 1; inline TUniqueId UuidToQueryId(const boost::uuids::uuid& uuid) { TUniqueId result; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/ExecStats.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ExecStats.thrift b/common/thrift/ExecStats.thrift index 8068b63..8e88b20 100644 --- a/common/thrift/ExecStats.thrift +++ b/common/thrift/ExecStats.thrift @@ -56,7 +56,7 @@ struct TExecStats { // node as well as per instance stats. struct TPlanNodeExecSummary { 1: required Types.TPlanNodeId node_id - 2: required i32 fragment_id + 2: required Types.TFragmentIdx fragment_idx 3: required string label 4: optional string label_detail 5: required i32 num_children @@ -64,15 +64,11 @@ struct TPlanNodeExecSummary { // Estimated stats generated by the planner 6: optional TExecStats estimated_stats - // One entry for each BE executing this plan node. + // One entry for each fragment instance executing this plan node. 7: optional list<TExecStats> exec_stats - // One entry for each BE executing this plan node. True if this plan node is still - // running. - 8: optional list<bool> is_active - // If true, this plan node is an exchange node that is the receiver of a broadcast. - 9: optional bool is_broadcast + 8: optional bool is_broadcast } // Progress counters for an in-flight query. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/Frontend.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift index 95d6ba3..91322b2 100644 --- a/common/thrift/Frontend.thrift +++ b/common/thrift/Frontend.thrift @@ -340,6 +340,25 @@ struct TLoadDataResp { 1: required Data.TResultRow load_summary } +// Execution parameters for a single plan; component of TQueryExecRequest +struct TPlanExecInfo { + // fragments[i] may consume the output of fragments[j > i]; + // fragments[0] is the root fragment and also the coordinator fragment, if + // it is unpartitioned. + 1: required list<Planner.TPlanFragment> fragments + + // Specifies the destination fragment of the output of each fragment. + // dest_fragment_idx.size() == fragments.size() - 1 and + // fragments[i] sends its output to fragments[dest_fragment_idx[i-1]] + // TODO: remove; TPlanFragment.output_sink.dest_node_id is sufficient + 2: optional list<i32> dest_fragment_idx + + // A map from scan node ids to a list of scan range locations. + // The node ids refer to scan nodes in fragments[].plan_tree + 3: optional map<Types.TPlanNodeId, list<Planner.TScanRangeLocations>> + per_node_scan_ranges +} + // Result of call to ImpalaPlanService/JniFrontend.CreateQueryRequest() struct TQueryExecRequest { // global descriptor tbl for all fragments @@ -348,12 +367,7 @@ struct TQueryExecRequest { // fragments[i] may consume the output of fragments[j > i]; // fragments[0] is the root fragment and also the coordinator fragment, if // it is unpartitioned. - 2: required list<Planner.TPlanFragment> fragments - - // Multi-threaded execution: sequence of plans; the last one materializes - // the query result - // TODO: this will eventually supercede 'fragments' - 14: optional list<Planner.TPlanFragmentTree> mt_plans + 2: optional list<Planner.TPlanFragment> fragments // Specifies the destination fragment of the output of each fragment. // parent_fragment_idx.size() == fragments.size() - 1 and @@ -365,6 +379,12 @@ struct TQueryExecRequest { 4: optional map<Types.TPlanNodeId, list<Planner.TScanRangeLocations>> per_node_scan_ranges + // Multi-threaded execution: exec info for all plans; the first one materializes + // the query result + // TODO: this will eventually supercede fields fragments, dest_fragment_idx, + // per_node_scan_ranges + 14: optional list<TPlanExecInfo> mt_plan_exec_info + // Metadata of the query result set (only for select) 5: optional Results.TResultSetMetadata result_set_metadata http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 089524d..3ee54ae 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -181,10 +181,11 @@ struct TQueryOptions { // "name". 43: optional TParquetFallbackSchemaResolution parquet_fallback_schema_resolution = 0 - // Multi-threaded execution: number of cores per query per node. - // > 0: multi-threaded execution mode, with given number of cores + // Multi-threaded execution: degree of parallelism (= number of active threads) per + // query per backend. + // > 0: multi-threaded execution mode, with given dop // 0: single-threaded execution mode - 44: optional i32 mt_num_cores = 0 + 44: optional i32 mt_dop = 0 // If true, INSERT writes to S3 go directly to their final location rather than being // copied there by the coordinator. We cannot do this for INSERT OVERWRITES because for @@ -252,6 +253,7 @@ struct TClientRequest { // TODO: Separate into FE/BE initialized vars. struct TQueryCtx { // Client request containing stmt to execute and query options. + // TODO: rename to client_request, we have too many requests 1: required TClientRequest request // A globally unique id assigned to the entire query in the BE. @@ -302,9 +304,6 @@ struct TQueryCtx { // fragment. struct TPlanFragmentCtx { 1: required Planner.TPlanFragment fragment - - // total number of instances of this fragment - 2: required i32 num_fragment_instances } // A scan range plus the parameters needed to execute that scan. @@ -329,45 +328,42 @@ struct TPlanFragmentDestination { // TODO: for range partitioning, we also need to specify the range boundaries struct TPlanFragmentInstanceCtx { // The globally unique fragment instance id. - // Format: query id + query-wide fragment index - // The query-wide fragment index starts at 0, so that the query id - // and the id of the first fragment instance (the coordinator instance) - // are identical. + // Format: query id + query-wide fragment instance index + // The query-wide fragment instance index starts at 0, so that the query id + // and the id of the first fragment instance are identical. + // If there is a coordinator instance, it is the first one, with index 0. 1: required Types.TUniqueId fragment_instance_id // Index of this fragment instance accross all instances of its parent fragment, // range [0, TPlanFragmentCtx.num_fragment_instances). - 2: required i32 fragment_instance_idx - - // Index of this fragment instance in Coordinator::fragment_instance_states_. - // TODO: remove; this is subsumed by the query-wide instance idx embedded - // in the fragment_instance_id - 3: required i32 instance_state_idx + 2: required i32 per_fragment_instance_idx // Initial scan ranges for each scan node in TPlanFragment.plan_tree - 4: required map<Types.TPlanNodeId, list<TScanRangeParams>> per_node_scan_ranges + 3: required map<Types.TPlanNodeId, list<TScanRangeParams>> per_node_scan_ranges // Number of senders for ExchangeNodes contained in TPlanFragment.plan_tree; // needed to create a DataStreamRecvr - 5: required map<Types.TPlanNodeId, i32> per_exch_num_senders + // TODO for per-query exec rpc: move these to TPlanFragmentCtx + 4: required map<Types.TPlanNodeId, i32> per_exch_num_senders // Output destinations, one per output partition. // The partitioning of the output is specified by // TPlanFragment.output_sink.output_partition. // The number of output partitions is destinations.size(). - 6: list<TPlanFragmentDestination> destinations + // TODO for per-query exec rpc: move these to TPlanFragmentCtx + 5: list<TPlanFragmentDestination> destinations // Debug options: perform some action in a particular phase of a particular node - 7: optional Types.TPlanNodeId debug_node_id - 8: optional PlanNodes.TExecNodePhase debug_phase - 9: optional PlanNodes.TDebugAction debug_action + 6: optional Types.TPlanNodeId debug_node_id + 7: optional PlanNodes.TExecNodePhase debug_phase + 8: optional PlanNodes.TDebugAction debug_action // The pool to which this request has been submitted. Used to update pool statistics // for admission control. - 10: optional string request_pool + 9: optional string request_pool // Id of this fragment in its role as a sender. - 11: optional i32 sender_id + 10: optional i32 sender_id } @@ -461,31 +457,26 @@ struct TReportExecStatusParams { 2: optional Types.TUniqueId query_id // required in V1 - // Used to look up the fragment instance state in the coordinator, same value as - // TExecPlanFragmentParams.instance_state_idx. - 3: optional i32 instance_state_idx - - // required in V1 - 4: optional Types.TUniqueId fragment_instance_id + 3: optional Types.TUniqueId fragment_instance_id // Status of fragment execution; any error status means it's done. // required in V1 - 5: optional Status.TStatus status + 4: optional Status.TStatus status // If true, fragment finished executing. // required in V1 - 6: optional bool done + 5: optional bool done // cumulative profile // required in V1 - 7: optional RuntimeProfile.TRuntimeProfileTree profile + 6: optional RuntimeProfile.TRuntimeProfileTree profile // Cumulative structural changes made by a table sink // optional in V1 - 8: optional TInsertExecStatus insert_exec_status; + 7: optional TInsertExecStatus insert_exec_status; // New errors that have not been reported to the coordinator - 9: optional map<ErrorCodes.TErrorCode, TErrorLogEntry> error_log; + 8: optional map<ErrorCodes.TErrorCode, TErrorLogEntry> error_log; } struct TReportExecStatusResult { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/ImpalaService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index da41a8e..129be2d 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -213,8 +213,9 @@ enum TImpalaQueryOptions { // is always, since fields IDs are NYI). Valid values are "position" and "name". PARQUET_FALLBACK_SCHEMA_RESOLUTION, - // Multi-threaded execution: number of cores per machine - MT_NUM_CORES, + // Multi-threaded execution: degree of parallelism = number of active threads per + // backend + MT_DOP, // If true, INSERT writes to S3 go directly to their final location rather than being // copied there by the coordinator. We cannot do this for INSERT OVERWRITES because for http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/Planner.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/Planner.thrift b/common/thrift/Planner.thrift index eb95585..92c8681 100644 --- a/common/thrift/Planner.thrift +++ b/common/thrift/Planner.thrift @@ -31,11 +31,14 @@ include "Partitions.thrift" // plan fragment, including how to produce and how to partition its output. // It leaves out node-specific parameters needed for the actual execution. struct TPlanFragment { + // Ordinal number of fragment within a query; range: 0..<total # fragments> + 1: required Types.TFragmentIdx idx + // display name to be shown in the runtime profile; unique within a query - 1: required string display_name + 2: required string display_name // no plan or descriptor table: query without From clause - 2: optional PlanNodes.TPlan plan + 3: optional PlanNodes.TPlan plan // exprs that produce values for slots of output tuple (one expr per slot); // if not set, plan fragment materializes full rows of plan_tree @@ -74,6 +77,8 @@ struct TScanRangeLocation { } // A single scan range plus the hosts that serve it +// TODO: rename to TScanRangeLocationList, having this differ from the above struct +// by only a single letter has caused needless confusion struct TScanRangeLocations { 1: required PlanNodes.TScanRange scan_range // non-empty list http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/Types.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift index 770a414..30d168d 100644 --- a/common/thrift/Types.thrift +++ b/common/thrift/Types.thrift @@ -19,6 +19,7 @@ namespace cpp impala namespace java org.apache.impala.thrift typedef i64 TTimestamp +typedef i32 TFragmentIdx typedef i32 TPlanNodeId typedef i32 TTupleId typedef i32 TSlotId http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/fe/src/main/java/org/apache/impala/common/TreeNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/common/TreeNode.java b/fe/src/main/java/org/apache/impala/common/TreeNode.java index adaee18..3231e33 100644 --- a/fe/src/main/java/org/apache/impala/common/TreeNode.java +++ b/fe/src/main/java/org/apache/impala/common/TreeNode.java @@ -51,6 +51,22 @@ public abstract class TreeNode<NodeType extends TreeNode<NodeType>> { public ArrayList<NodeType> getChildren() { return children_; } /** + * Return list of all nodes of the tree rooted at 'this', obtained + * through pre-order traversal. + */ + public <C extends TreeNode<NodeType>> ArrayList<C> getNodesPreOrder() { + ArrayList<C> result = new ArrayList<C>(); + getNodesPreOrderAux(result); + return result; + } + + protected <C extends TreeNode<NodeType>> void getNodesPreOrderAux( + ArrayList<C> result) { + result.add((C) this); + for (NodeType child: children_) child.getNodesPreOrderAux(result); + } + + /** * Count the total number of nodes in this tree. Leaf node will return 1. * Non-leaf node will include all its children. */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/fe/src/main/java/org/apache/impala/planner/Planner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index e50aca5..405eebe 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -59,6 +59,11 @@ public class Planner { ctx_ = new PlannerContext(analysisResult, queryCtx); } + public TQueryCtx getQueryCtx() { return ctx_.getQueryCtx(); } + public AnalysisContext.AnalysisResult getAnalysisResult() { + return ctx_.getAnalysisResult(); + } + /** * Returns a list of plan fragments for executing an analyzed parse tree. * May return a single-node or distributed executable plan. If enabled (through a @@ -204,6 +209,24 @@ public class Planner { /** * Return combined explain string for all plan fragments. * Includes the estimated resource requirements from the request if set. + * Uses a default level of EXTENDED, unless overriden by the + * 'explain_level' query option. + */ + public String getExplainString(ArrayList<PlanFragment> fragments, + TQueryExecRequest request) { + // use EXTENDED by default for all non-explain statements + TExplainLevel explainLevel = TExplainLevel.EXTENDED; + // use the query option for explain stmts and tests (e.g., planner tests) + if (ctx_.getAnalysisResult().isExplainStmt() || RuntimeEnv.INSTANCE.isTestEnv()) { + explainLevel = ctx_.getQueryOptions().getExplain_level(); + } + return getExplainString(fragments, request, explainLevel); + } + + /** + * Return combined explain string for all plan fragments and with an + * explicit explain level. + * Includes the estimated resource requirements from the request if set. */ public String getExplainString(ArrayList<PlanFragment> fragments, TQueryExecRequest request, TExplainLevel explainLevel) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/fe/src/main/java/org/apache/impala/service/Frontend.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index fe1f8f1..00a3d93 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -120,6 +120,7 @@ import org.apache.impala.thrift.TLineageGraph; import org.apache.impala.thrift.TLoadDataReq; import org.apache.impala.thrift.TLoadDataResp; import org.apache.impala.thrift.TMetadataOpRequest; +import org.apache.impala.thrift.TPlanExecInfo; import org.apache.impala.thrift.TPlanFragment; import org.apache.impala.thrift.TPlanFragmentTree; import org.apache.impala.thrift.TQueryCtx; @@ -908,70 +909,123 @@ public class Frontend { } /** - * Create a populated TExecRequest corresponding to the supplied TQueryCtx. + * Return a TPlanExecInfo corresponding to the plan with root fragment 'planRoot'. */ - public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString) - throws ImpalaException { - // Analyze the statement - AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx); - EventSequence timeline = analysisResult.getAnalyzer().getTimeline(); - timeline.markEvent("Analysis finished"); - Preconditions.checkNotNull(analysisResult.getStmt()); - TExecRequest result = new TExecRequest(); - result.setQuery_options(queryCtx.request.getQuery_options()); - result.setAccess_events(analysisResult.getAccessEvents()); - result.analysis_warnings = analysisResult.getAnalyzer().getWarnings(); + private TPlanExecInfo createPlanExecInfo(PlanFragment planRoot, Planner planner, + TQueryCtx queryCtx, TQueryExecRequest queryExecRequest) { + TPlanExecInfo result = new TPlanExecInfo(); + ArrayList<PlanFragment> fragments = planRoot.getNodesPreOrder(); - if (analysisResult.isCatalogOp()) { - result.stmt_type = TStmtType.DDL; - createCatalogOpRequest(analysisResult, result); - TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph(); - if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) { - result.catalog_op_request.setLineage_graph(thriftLineageGraph); - } - // All DDL operations except for CTAS are done with analysis at this point. - if (!analysisResult.isCreateTableAsSelectStmt()) return result; - } else if (analysisResult.isLoadDataStmt()) { - result.stmt_type = TStmtType.LOAD; - result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList( - new TColumn("summary", Type.STRING.toThrift())))); - result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift()); - return result; - } else if (analysisResult.isSetStmt()) { - result.stmt_type = TStmtType.SET; - result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList( - new TColumn("option", Type.STRING.toThrift()), - new TColumn("value", Type.STRING.toThrift())))); - result.setSet_query_option_request(analysisResult.getSetStmt().toThrift()); - return result; + // map from fragment to its index in TPlanExecInfo.fragments; needed for + // TPlanExecInfo.dest_fragment_idx + List<ScanNode> scanNodes = Lists.newArrayList(); + Map<PlanFragment, Integer> fragmentIdx = Maps.newHashMap(); + for (int idx = 0; idx < fragments.size(); ++idx) { + PlanFragment fragment = fragments.get(idx); + Preconditions.checkNotNull(fragment.getPlanRoot()); + fragment.getPlanRoot().collect(Predicates.instanceOf(ScanNode.class), scanNodes); + fragmentIdx.put(fragment, idx); } - // create TQueryExecRequest - Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt() - || analysisResult.isCreateTableAsSelectStmt() || analysisResult.isUpdateStmt() - || analysisResult.isDeleteStmt()); + // set fragment destinations + for (int i = 1; i < fragments.size(); ++i) { + PlanFragment dest = fragments.get(i).getDestFragment(); + Integer idx = fragmentIdx.get(dest); + Preconditions.checkState(idx != null); + result.addToDest_fragment_idx(idx.intValue()); + } - TQueryExecRequest queryExecRequest = new TQueryExecRequest(); - // create plan - LOG.debug("create plan"); - Planner planner = new Planner(analysisResult, queryCtx); - if (RuntimeEnv.INSTANCE.isTestEnv() - && queryCtx.request.query_options.mt_num_cores > 0) { - // TODO: this is just to be able to run tests; implement this - List<PlanFragment> planRoots = planner.createParallelPlans(); - for (PlanFragment planRoot: planRoots) { - TPlanFragmentTree thriftPlan = planRoot.treeToThrift(); - queryExecRequest.addToMt_plans(thriftPlan); + // Set scan ranges/locations for scan nodes. + LOG.debug("get scan range locations"); + Set<TTableName> tablesMissingStats = Sets.newTreeSet(); + Set<TTableName> tablesWithCorruptStats = Sets.newTreeSet(); + for (ScanNode scanNode: scanNodes) { + result.putToPer_node_scan_ranges( + scanNode.getId().asInt(), scanNode.getScanRangeLocations()); + if (scanNode.isTableMissingStats()) { + tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift()); } - queryExecRequest.setDesc_tbl(analysisResult.getAnalyzer().getDescTbl().toThrift()); - queryExecRequest.setQuery_ctx(queryCtx); - explainString.append(planner.getExplainString( - Lists.newArrayList(planRoots.get(0)), queryExecRequest, - TExplainLevel.STANDARD)); - queryExecRequest.setQuery_plan(explainString.toString()); - result.setQuery_exec_request(queryExecRequest); - return result; + if (scanNode.hasCorruptTableStats()) { + tablesWithCorruptStats.add(scanNode.getTupleDesc().getTableName().toThrift()); + } + } + + for (TTableName tableName: tablesMissingStats) { + queryCtx.addToTables_missing_stats(tableName); } + for (TTableName tableName: tablesWithCorruptStats) { + queryCtx.addToTables_with_corrupt_stats(tableName); + } + + // The fragment at this point has all state set, serialize it to thrift. + for (PlanFragment fragment: fragments) { + TPlanFragment thriftFragment = fragment.toThrift(); + result.addToFragments(thriftFragment); + } + + return result; + } + + /** + * Create a populated TQueryExecRequest, corresponding to the supplied planner, + * for multi-threaded execution. + */ + private TQueryExecRequest mtCreateExecRequest( + Planner planner, StringBuilder explainString) + throws ImpalaException { + TQueryCtx queryCtx = planner.getQueryCtx(); + Preconditions.checkState(queryCtx.request.query_options.mt_dop > 0); + // for now, always disable spilling in the backend + // TODO-MT: re-enable spilling + queryCtx.setDisable_spilling(true); + TQueryExecRequest result = new TQueryExecRequest(); + + LOG.debug("create mt plan"); + List<PlanFragment> planRoots = planner.createParallelPlans(); + + // create EXPLAIN output + result.setQuery_ctx(queryCtx); // needed by getExplainString() + explainString.append( + planner.getExplainString(Lists.newArrayList(planRoots.get(0)), result)); + result.setQuery_plan(explainString.toString()); + + // create per-plan exec info; + // also assemble list of names of tables with missing or corrupt stats for + // assembling a warning message + for (PlanFragment planRoot: planRoots) { + result.addToMt_plan_exec_info( + createPlanExecInfo(planRoot, planner, queryCtx, result)); + } + + // assign fragment ids + int idx = 0; + for (TPlanExecInfo planExecInfo: result.mt_plan_exec_info) { + for (TPlanFragment fragment: planExecInfo.fragments) fragment.setIdx(idx++); + } + + // TODO-MT: implement + // Compute resource requirements after scan range locations because the cost + // estimates of scan nodes rely on them. + //try { + //planner.computeResourceReqs(fragments, true, queryExecRequest); + //} catch (Exception e) { + //// Turn exceptions into a warning to allow the query to execute. + //LOG.error("Failed to compute resource requirements for query\n" + + //queryCtx.request.getStmt(), e); + //} + + return result; + } + + /** + * Create a populated TQueryExecRequest corresponding to the supplied TQueryCtx. + * TODO-MT: remove this function and rename mtCreateExecRequest() to + * createExecRequest() + */ + private TQueryExecRequest createExecRequest( + Planner planner, StringBuilder explainString) + throws ImpalaException { + LOG.debug("create plan"); ArrayList<PlanFragment> fragments = planner.createPlan(); List<ScanNode> scanNodes = Lists.newArrayList(); @@ -986,12 +1040,13 @@ public class Frontend { fragmentIdx.put(fragment, idx); } + TQueryExecRequest result = new TQueryExecRequest(); // set fragment destinations for (int i = 1; i < fragments.size(); ++i) { PlanFragment dest = fragments.get(i).getDestFragment(); Integer idx = fragmentIdx.get(dest); Preconditions.checkState(idx != null); - queryExecRequest.addToDest_fragment_idx(idx.intValue()); + result.addToDest_fragment_idx(idx.intValue()); } // Set scan ranges/locations for scan nodes. @@ -1001,9 +1056,8 @@ public class Frontend { // Assemble a similar list for corrupt stats Set<TTableName> tablesWithCorruptStats = Sets.newTreeSet(); for (ScanNode scanNode: scanNodes) { - queryExecRequest.putToPer_node_scan_ranges( - scanNode.getId().asInt(), - scanNode.getScanRangeLocations()); + result.putToPer_node_scan_ranges( + scanNode.getId().asInt(), scanNode.getScanRangeLocations()); if (scanNode.isTableMissingStats()) { tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift()); } @@ -1012,7 +1066,7 @@ public class Frontend { } } - queryExecRequest.setHost_list(analysisResult.getAnalyzer().getHostIndex().getList()); + TQueryCtx queryCtx = planner.getQueryCtx(); for (TTableName tableName: tablesMissingStats) { queryCtx.addToTables_missing_stats(tableName); } @@ -1022,6 +1076,7 @@ public class Frontend { // Optionally disable spilling in the backend. Allow spilling if there are plan hints // or if all tables have stats. + AnalysisContext.AnalysisResult analysisResult = planner.getAnalysisResult(); if (queryCtx.request.query_options.isDisable_unsafe_spills() && !tablesMissingStats.isEmpty() && !analysisResult.getAnalyzer().hasPlanHints()) { @@ -1031,33 +1086,83 @@ public class Frontend { // Compute resource requirements after scan range locations because the cost // estimates of scan nodes rely on them. try { - planner.computeResourceReqs(fragments, true, queryExecRequest); + planner.computeResourceReqs(fragments, true, result); } catch (Exception e) { // Turn exceptions into a warning to allow the query to execute. LOG.error("Failed to compute resource requirements for query\n" + queryCtx.request.getStmt(), e); } - // The fragment at this point has all state set, serialize it to thrift. - for (PlanFragment fragment: fragments) { + // The fragment at this point has all state set, assign sequential ids + // and serialize to thrift. + for (int i = 0; i < fragments.size(); ++i) { + PlanFragment fragment = fragments.get(i); TPlanFragment thriftFragment = fragment.toThrift(); - queryExecRequest.addToFragments(thriftFragment); + thriftFragment.setIdx(i); + result.addToFragments(thriftFragment); } - // Use EXTENDED by default for all non-explain statements. - TExplainLevel explainLevel = TExplainLevel.EXTENDED; - // Use the query option for explain stmts and tests (e.g., planner tests). - if (analysisResult.isExplainStmt() || RuntimeEnv.INSTANCE.isTestEnv()) { - explainLevel = queryCtx.request.query_options.getExplain_level(); + result.setQuery_ctx(queryCtx); // needed by getExplainString() + explainString.append(planner.getExplainString(fragments, result)); + result.setQuery_plan(explainString.toString()); + return result; + } + + /** + * Create a populated TExecRequest corresponding to the supplied TQueryCtx. + */ + public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString) + throws ImpalaException { + // Analyze the statement + AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx); + EventSequence timeline = analysisResult.getAnalyzer().getTimeline(); + timeline.markEvent("Analysis finished"); + Preconditions.checkNotNull(analysisResult.getStmt()); + TExecRequest result = new TExecRequest(); + result.setQuery_options(queryCtx.request.getQuery_options()); + result.setAccess_events(analysisResult.getAccessEvents()); + result.analysis_warnings = analysisResult.getAnalyzer().getWarnings(); + + if (analysisResult.isCatalogOp()) { + result.stmt_type = TStmtType.DDL; + createCatalogOpRequest(analysisResult, result); + TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph(); + if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) { + result.catalog_op_request.setLineage_graph(thriftLineageGraph); + } + // All DDL operations except for CTAS are done with analysis at this point. + if (!analysisResult.isCreateTableAsSelectStmt()) return result; + } else if (analysisResult.isLoadDataStmt()) { + result.stmt_type = TStmtType.LOAD; + result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList( + new TColumn("summary", Type.STRING.toThrift())))); + result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift()); + return result; + } else if (analysisResult.isSetStmt()) { + result.stmt_type = TStmtType.SET; + result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList( + new TColumn("option", Type.STRING.toThrift()), + new TColumn("value", Type.STRING.toThrift())))); + result.setSet_query_option_request(analysisResult.getSetStmt().toThrift()); + return result; } - // Global query parameters to be set in each TPlanExecRequest. - queryExecRequest.setQuery_ctx(queryCtx); + // create TQueryExecRequest + Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt() + || analysisResult.isCreateTableAsSelectStmt() || analysisResult.isUpdateStmt() + || analysisResult.isDeleteStmt()); - explainString.append( - planner.getExplainString(fragments, queryExecRequest, explainLevel)); - queryExecRequest.setQuery_plan(explainString.toString()); - queryExecRequest.setDesc_tbl(analysisResult.getAnalyzer().getDescTbl().toThrift()); + Planner planner = new Planner(analysisResult, queryCtx); + TQueryExecRequest queryExecRequest; + if (analysisResult.isQueryStmt() && queryCtx.request.query_options.mt_dop > 0) { + queryExecRequest = mtCreateExecRequest(planner, explainString); + } else { + queryExecRequest = createExecRequest(planner, explainString); + } + queryExecRequest.setDesc_tbl( + planner.getAnalysisResult().getAnalyzer().getDescTbl().toThrift()); + queryExecRequest.setQuery_ctx(queryCtx); + queryExecRequest.setHost_list(analysisResult.getAnalyzer().getHostIndex().getList()); TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph(); if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) { @@ -1071,7 +1176,6 @@ public class Frontend { } result.setQuery_exec_request(queryExecRequest); - if (analysisResult.isQueryStmt()) { // fill in the metadata LOG.debug("create result set metadata"); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java index 4464203..284d7e5 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java @@ -374,9 +374,9 @@ public class PlannerTestBase extends FrontendTestBase { } /** - * Produces single-node and distributed plans for testCase and compares + * Produces single-node, distributed, and parallel plans for testCase and compares * plan and scan range results. - * Appends the actual single-node and distributed plan as well as the printed + * Appends the actual plans as well as the printed * scan ranges to actualOutput, along with the requisite section header. * locations to actualScanRangeLocations; compares both to the appropriate sections * of 'testCase'. @@ -430,7 +430,7 @@ public class PlannerTestBase extends FrontendTestBase { ImpalaInternalServiceConstants.NUM_NODES_ALL); } if (section == Section.PARALLELPLANS) { - queryCtx.request.query_options.mt_num_cores = 2; + queryCtx.request.query_options.mt_dop = 2; } ArrayList<String> expectedPlan = testCase.getSectionContents(section); boolean sectionExists = expectedPlan != null && !expectedPlan.isEmpty();
