Repository: incubator-impala Updated Branches: refs/heads/master 29faca568 -> 9b507b6ed
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/scheduling/simple-scheduler.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc index bc500c8..9ffc273 100644 --- a/be/src/scheduling/simple-scheduler.cc +++ b/be/src/scheduling/simple-scheduler.cc @@ -31,7 +31,6 @@ #include "common/logging.h" #include "util/metrics.h" #include "runtime/exec-env.h" -#include "runtime/coordinator.h" #include "service/impala-server.h" #include "statestore/statestore-subscriber.h" @@ -290,55 +289,17 @@ SimpleScheduler::BackendConfigPtr SimpleScheduler::GetBackendConfig() const { return backend_config; } -void SimpleScheduler::SetBackendConfig(const BackendConfigPtr& backend_config) -{ +void SimpleScheduler::SetBackendConfig(const BackendConfigPtr& backend_config) { lock_guard<mutex> l(backend_config_lock_); backend_config_ = backend_config; } -Status SimpleScheduler::ComputeScanRangeAssignment(const TQueryExecRequest& exec_request, - QuerySchedule* schedule) { - map<TPlanNodeId, vector<TScanRangeLocations>>::const_iterator entry; - RuntimeProfile::Counter* total_assignment_timer = - ADD_TIMER(schedule->summary_profile(), "ComputeScanRangeAssignmentTimer"); - - BackendConfigPtr backend_config = GetBackendConfig(); - - for (entry = exec_request.per_node_scan_ranges.begin(); - entry != exec_request.per_node_scan_ranges.end(); ++entry) { - const TPlanNodeId node_id = entry->first; - FragmentIdx fragment_idx = schedule->GetFragmentIdx(node_id); - const TPlanFragment& fragment = exec_request.fragments[fragment_idx]; - bool exec_at_coord = (fragment.partition.type == TPartitionType::UNPARTITIONED); - - const TPlanNode& node = fragment.plan.nodes[schedule->GetNodeIdx(node_id)]; - DCHECK_EQ(node.node_id, node_id); - - const TReplicaPreference::type* node_replica_preference = node.__isset.hdfs_scan_node - && node.hdfs_scan_node.__isset.replica_preference - ? &node.hdfs_scan_node.replica_preference : NULL; - - bool node_random_replica = node.__isset.hdfs_scan_node - && node.hdfs_scan_node.__isset.random_replica - && node.hdfs_scan_node.random_replica; - - FragmentScanRangeAssignment* assignment = - &(*schedule->exec_params())[fragment_idx].scan_range_assignment; - RETURN_IF_ERROR(ComputeScanRangeAssignment(*backend_config, - node_id, node_replica_preference, node_random_replica, entry->second, - exec_request.host_list, exec_at_coord, schedule->query_options(), - total_assignment_timer, assignment)); - schedule->IncNumScanRanges(entry->second.size()); - } - return Status::OK(); -} - -Status SimpleScheduler::MtComputeScanRangeAssignment(QuerySchedule* schedule) { +Status SimpleScheduler::ComputeScanRangeAssignment(QuerySchedule* schedule) { RuntimeProfile::Counter* total_assignment_timer = ADD_TIMER(schedule->summary_profile(), "ComputeScanRangeAssignmentTimer"); BackendConfigPtr backend_config = GetBackendConfig(); const TQueryExecRequest& exec_request = schedule->request(); - for (const TPlanExecInfo& plan_exec_info: exec_request.mt_plan_exec_info) { + for (const TPlanExecInfo& plan_exec_info: exec_request.plan_exec_info) { for (const auto& entry: plan_exec_info.per_node_scan_ranges) { const TPlanNodeId node_id = entry.first; const TPlanFragment& fragment = schedule->GetContainingFragment(node_id); @@ -366,38 +327,34 @@ Status SimpleScheduler::MtComputeScanRangeAssignment(QuerySchedule* schedule) { return Status::OK(); } -void SimpleScheduler::MtComputeFragmentExecParams(QuerySchedule* schedule) { +void SimpleScheduler::ComputeFragmentExecParams(QuerySchedule* schedule) { const TQueryExecRequest& exec_request = schedule->request(); // for each plan, compute the FInstanceExecParams for the tree of fragments - for (const TPlanExecInfo& plan_exec_info: exec_request.mt_plan_exec_info) { + for (const TPlanExecInfo& plan_exec_info: exec_request.plan_exec_info) { // set instance_id, host, per_node_scan_ranges - MtComputeFragmentExecParams( - plan_exec_info, + ComputeFragmentExecParams(plan_exec_info, schedule->GetFragmentExecParams(plan_exec_info.fragments[0].idx), schedule); // Set destinations, per_exch_num_senders, sender_id. - // fragments[f] sends its output to fragments[dest_fragment_idx[f-1]]; - // fragments[0] is an endpoint. - for (int i = 0; i < plan_exec_info.dest_fragment_idx.size(); ++i) { - int dest_idx = plan_exec_info.dest_fragment_idx[i]; + for (const TPlanFragment& src_fragment: plan_exec_info.fragments) { + if (!src_fragment.output_sink.__isset.stream_sink) continue; + FragmentIdx dest_idx = + schedule->GetFragmentIdx(src_fragment.output_sink.stream_sink.dest_node_id); DCHECK_LT(dest_idx, plan_exec_info.fragments.size()); const TPlanFragment& dest_fragment = plan_exec_info.fragments[dest_idx]; - DCHECK_LT(i + 1, plan_exec_info.fragments.size()); - const TPlanFragment& src_fragment = plan_exec_info.fragments[i + 1]; - DCHECK(src_fragment.output_sink.__isset.stream_sink); - MtFragmentExecParams* dest_params = + FragmentExecParams* dest_params = schedule->GetFragmentExecParams(dest_fragment.idx); - MtFragmentExecParams* src_params = + FragmentExecParams* src_params = schedule->GetFragmentExecParams(src_fragment.idx); // populate src_params->destinations src_params->destinations.resize(dest_params->instance_exec_params.size()); - for (int j = 0; j < dest_params->instance_exec_params.size(); ++j) { - TPlanFragmentDestination& dest = src_params->destinations[j]; - dest.__set_fragment_instance_id(dest_params->instance_exec_params[j].instance_id); - dest.__set_server(dest_params->instance_exec_params[j].host); + for (int i = 0; i < dest_params->instance_exec_params.size(); ++i) { + TPlanFragmentDestination& dest = src_params->destinations[i]; + dest.__set_fragment_instance_id(dest_params->instance_exec_params[i].instance_id); + dest.__set_server(dest_params->instance_exec_params[i].host); } // enumerate senders consecutively; @@ -411,26 +368,24 @@ void SimpleScheduler::MtComputeFragmentExecParams(QuerySchedule* schedule) { int sender_id_base = dest_params->per_exch_num_senders[exch_id]; dest_params->per_exch_num_senders[exch_id] += src_params->instance_exec_params.size(); - for (int j = 0; j < src_params->instance_exec_params.size(); ++j) { - FInstanceExecParams& src_instance_params = src_params->instance_exec_params[j]; - src_instance_params.sender_id = sender_id_base + j; + for (int i = 0; i < src_params->instance_exec_params.size(); ++i) { + FInstanceExecParams& src_instance_params = src_params->instance_exec_params[i]; + src_instance_params.sender_id = sender_id_base + i; } } } } -void SimpleScheduler::MtComputeFragmentExecParams( - const TPlanExecInfo& plan_exec_info, MtFragmentExecParams* fragment_params, +void SimpleScheduler::ComputeFragmentExecParams( + const TPlanExecInfo& plan_exec_info, FragmentExecParams* fragment_params, QuerySchedule* schedule) { // traverse input fragments for (FragmentIdx input_fragment_idx: fragment_params->input_fragments) { - MtComputeFragmentExecParams( + ComputeFragmentExecParams( plan_exec_info, schedule->GetFragmentExecParams(input_fragment_idx), schedule); } const TPlanFragment& fragment = fragment_params->fragment; - // TODO: deal with Union nodes - DCHECK(!ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)); // case 1: single instance executed at coordinator if (fragment.partition.type == TPartitionType::UNPARTITIONED) { const TNetworkAddress& coord = local_backend_descriptor_.address; @@ -439,7 +394,7 @@ void SimpleScheduler::MtComputeFragmentExecParams( ? schedule->query_id() : schedule->GetNextInstanceId(); fragment_params->instance_exec_params.emplace_back( - instance_id, coord, 0, *fragment_params); + instance_id, coord, 0, *fragment_params); FInstanceExecParams& instance_params = fragment_params->instance_exec_params.back(); // That instance gets all of the scan ranges, if there are any. @@ -448,25 +403,81 @@ void SimpleScheduler::MtComputeFragmentExecParams( auto first_entry = fragment_params->scan_range_assignment.begin(); instance_params.per_node_scan_ranges = first_entry->second; } + + return; + } + + if (ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)) { + CreateUnionInstances(fragment_params, schedule); + return; + } + + PlanNodeId leftmost_scan_id = FindLeftmostScan(fragment.plan); + if (leftmost_scan_id != g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) { + // case 2: leaf fragment with leftmost scan + // TODO: check that there's only one scan in this fragment + CreateScanInstances(leftmost_scan_id, fragment_params, schedule); } else { - PlanNodeId leftmost_scan_id = FindLeftmostScan(fragment.plan); - if (leftmost_scan_id != g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) { - // case 2: leaf fragment with leftmost scan - // TODO: check that there's only one scan in this fragment - MtCreateScanInstances(leftmost_scan_id, fragment_params, schedule); - } else { - // case 3: interior fragment without leftmost scan - // we assign the same hosts as those of our leftmost input fragment (so that a - // merge aggregation fragment runs on the hosts that provide the input data) - MtCreateMirrorInstances(fragment_params, schedule); + // case 3: interior fragment without leftmost scan + // we assign the same hosts as those of our leftmost input fragment (so that a + // merge aggregation fragment runs on the hosts that provide the input data) + CreateCollocatedInstances(fragment_params, schedule); + } +} + +void SimpleScheduler::CreateUnionInstances( + FragmentExecParams* fragment_params, QuerySchedule* schedule) { + const TPlanFragment& fragment = fragment_params->fragment; + DCHECK(ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)); + + // Add hosts of scan nodes. + vector<TPlanNodeType::type> scan_node_types { + TPlanNodeType::HDFS_SCAN_NODE, TPlanNodeType::HBASE_SCAN_NODE, + TPlanNodeType::DATA_SOURCE_NODE, TPlanNodeType::KUDU_SCAN_NODE}; + vector<TPlanNodeId> scan_node_ids; + FindNodes(fragment.plan, scan_node_types, &scan_node_ids); + vector<TNetworkAddress> scan_hosts; + for (TPlanNodeId id: scan_node_ids) GetScanHosts(id, *fragment_params, &scan_hosts); + + unordered_set<TNetworkAddress> hosts(scan_hosts.begin(), scan_hosts.end()); + + // Add hosts of input fragments. + for (FragmentIdx idx: fragment_params->input_fragments) { + const FragmentExecParams& input_params = *schedule->GetFragmentExecParams(idx); + for (const FInstanceExecParams& instance_params: input_params.instance_exec_params) { + hosts.insert(instance_params.host); + } + } + DCHECK(!hosts.empty()) + << "no hosts for fragment " << fragment.idx << " with a UnionNode"; + + // create a single instance per host + // TODO-MT: figure out how to parallelize Union + int per_fragment_idx = 0; + for (const TNetworkAddress& host: hosts) { + fragment_params->instance_exec_params.emplace_back( + schedule->GetNextInstanceId(), host, per_fragment_idx++, *fragment_params); + // assign all scan ranges + FInstanceExecParams& instance_params = fragment_params->instance_exec_params.back(); + if (fragment_params->scan_range_assignment.count(host) > 0) { + instance_params.per_node_scan_ranges = fragment_params->scan_range_assignment[host]; } } } -void SimpleScheduler::MtCreateScanInstances( - PlanNodeId leftmost_scan_id, MtFragmentExecParams* fragment_params, +void SimpleScheduler::CreateScanInstances( + PlanNodeId leftmost_scan_id, FragmentExecParams* fragment_params, QuerySchedule* schedule) { int max_num_instances = schedule->request().query_ctx.request.query_options.mt_dop; + if (max_num_instances == 0) max_num_instances = 1; + + if (fragment_params->scan_range_assignment.empty()) { + // this scan doesn't have any scan ranges: run a single instance on the coordinator + fragment_params->instance_exec_params.emplace_back( + schedule->GetNextInstanceId(), local_backend_descriptor_.address, 0, *fragment_params); + return; + } + for (const auto& assignment_entry: fragment_params->scan_range_assignment) { // evenly divide up the scan ranges of the leftmost scan between at most // <dop> instances @@ -477,9 +488,13 @@ void SimpleScheduler::MtCreateScanInstances( int64 total_size = 0; for (const TScanRangeParams& params: params_list) { - // TODO: implement logic for hbase and kudu - DCHECK(params.scan_range.__isset.hdfs_file_split); - total_size += params.scan_range.hdfs_file_split.length; + if (params.scan_range.__isset.hdfs_file_split) { + total_size += params.scan_range.hdfs_file_split.length; + } else { + // fake load-balancing for Kudu and Hbase: every split has length 1 + // TODO: implement more accurate logic for Kudu and Hbase + ++total_size; + } } // try to load-balance scan ranges by assigning just beyond the average number of @@ -507,7 +522,12 @@ void SimpleScheduler::MtCreateScanInstances( const TScanRangeParams& scan_range_params = params_list[params_idx]; instance_params.per_node_scan_ranges[leftmost_scan_id].push_back( scan_range_params); - total_assigned_bytes += scan_range_params.scan_range.hdfs_file_split.length; + if (scan_range_params.scan_range.__isset.hdfs_file_split) { + total_assigned_bytes += scan_range_params.scan_range.hdfs_file_split.length; + } else { + // for Kudu and Hbase every split has length 1 + ++total_assigned_bytes; + } ++params_idx; } if (params_idx >= params_list.size()) break; // nothing left to assign @@ -516,10 +536,10 @@ void SimpleScheduler::MtCreateScanInstances( } } -void SimpleScheduler::MtCreateMirrorInstances( - MtFragmentExecParams* fragment_params, QuerySchedule* schedule) { +void SimpleScheduler::CreateCollocatedInstances( + FragmentExecParams* fragment_params, QuerySchedule* schedule) { DCHECK_GE(fragment_params->input_fragments.size(), 1); - const MtFragmentExecParams* input_fragment_params = + const FragmentExecParams* input_fragment_params = schedule->GetFragmentExecParams(fragment_params->input_fragments[0]); int per_fragment_instance_idx = 0; for (const FInstanceExecParams& input_instance_params: @@ -533,7 +553,7 @@ void SimpleScheduler::MtCreateMirrorInstances( Status SimpleScheduler::ComputeScanRangeAssignment( const BackendConfig& backend_config, PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference, bool node_random_replica, - const vector<TScanRangeLocations>& locations, + const vector<TScanRangeLocationList>& locations, const vector<TNetworkAddress>& host_list, bool exec_at_coord, const TQueryOptions& query_options, RuntimeProfile::Counter* timer, FragmentScanRangeAssignment* assignment) { @@ -564,11 +584,11 @@ Status SimpleScheduler::ComputeScanRangeAssignment( AssignmentCtx assignment_ctx(backend_config, total_assignments_, total_local_assignments_); - vector<const TScanRangeLocations*> remote_scan_range_locations; + vector<const TScanRangeLocationList*> remote_scan_range_locations; // Loop over all scan ranges, select a backend for those with local impalads and collect // all others for later processing. - for (const TScanRangeLocations& scan_range_locations: locations) { + for (const TScanRangeLocationList& scan_range_locations: locations) { TReplicaPreference::type min_distance = TReplicaPreference::REMOTE; // Select backend host for the current scan range. @@ -643,7 +663,7 @@ Status SimpleScheduler::ComputeScanRangeAssignment( } // End of for loop over scan ranges. // Assign remote scans to backends. - for (const TScanRangeLocations* scan_range_locations: remote_scan_range_locations) { + for (const TScanRangeLocationList* scan_range_locations: remote_scan_range_locations) { const IpAddr* backend_ip = assignment_ctx.SelectRemoteBackendHost(); TBackendDescriptor backend; assignment_ctx.SelectBackendOnHost(*backend_ip, &backend); @@ -656,132 +676,6 @@ Status SimpleScheduler::ComputeScanRangeAssignment( return Status::OK(); } -void SimpleScheduler::ComputeFragmentExecParams(const TQueryExecRequest& exec_request, - QuerySchedule* schedule) { - vector<FragmentExecParams>* fragment_exec_params = schedule->exec_params(); - // assign instance ids - int64_t num_fragment_instances = 0; - for (FragmentExecParams& params: *fragment_exec_params) { - for (int j = 0; j < params.hosts.size(); ++j, ++num_fragment_instances) { - params.instance_ids.push_back( - CreateInstanceId(schedule->query_id(), num_fragment_instances)); - } - } - - // compute destinations and # senders per exchange node - // (the root fragment doesn't have a destination) - for (int i = 1; i < fragment_exec_params->size(); ++i) { - FragmentExecParams& params = (*fragment_exec_params)[i]; - int dest_fragment_idx = exec_request.dest_fragment_idx[i - 1]; - DCHECK_LT(dest_fragment_idx, fragment_exec_params->size()); - FragmentExecParams& dest_params = (*fragment_exec_params)[dest_fragment_idx]; - - // set # of senders - DCHECK(exec_request.fragments[i].output_sink.__isset.stream_sink); - const TDataStreamSink& sink = exec_request.fragments[i].output_sink.stream_sink; - // we can only handle unpartitioned (= broadcast), random-partitioned or - // hash-partitioned output at the moment - DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED - || sink.output_partition.type == TPartitionType::HASH_PARTITIONED - || sink.output_partition.type == TPartitionType::RANDOM); - PlanNodeId exch_id = sink.dest_node_id; - // we might have multiple fragments sending to this exchange node - // (distributed MERGE), which is why we need to add up the #senders - params.sender_id_base = dest_params.per_exch_num_senders[exch_id]; - dest_params.per_exch_num_senders[exch_id] += params.hosts.size(); - - // create one TPlanFragmentDestination per destination host - params.destinations.resize(dest_params.hosts.size()); - for (int j = 0; j < dest_params.hosts.size(); ++j) { - TPlanFragmentDestination& dest = params.destinations[j]; - dest.fragment_instance_id = dest_params.instance_ids[j]; - dest.server = dest_params.hosts[j]; - VLOG_RPC << "dest for fragment " << i << ":" - << " instance_id=" << dest.fragment_instance_id - << " server=" << dest.server; - } - } -} - -void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request, - QuerySchedule* schedule) { - vector<FragmentExecParams>* fragment_exec_params = schedule->exec_params(); - DCHECK_EQ(fragment_exec_params->size(), exec_request.fragments.size()); - - // compute hosts of producer fragment before those of consumer fragment(s), - // the latter might inherit the set of hosts from the former - for (int i = exec_request.fragments.size() - 1; i >= 0; --i) { - const TPlanFragment& fragment = exec_request.fragments[i]; - FragmentExecParams& params = (*fragment_exec_params)[i]; - if (fragment.partition.type == TPartitionType::UNPARTITIONED) { - // all single-node fragments run on the coordinator host - params.hosts.push_back(local_backend_descriptor_.address); - continue; - } - - // UnionNodes are special because they can consume multiple partitioned inputs, - // as well as execute multiple scans in the same fragment. - // Fragments containing a UnionNode are executed on the union of hosts of all - // scans in the fragment as well as the hosts of all its input fragments (s.t. - // a UnionNode with partitioned joins or grouping aggregates as children runs on - // at least as many hosts as the input to those children). - if (ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)) { - vector<TPlanNodeType::type> scan_node_types { - TPlanNodeType::HDFS_SCAN_NODE, TPlanNodeType::HBASE_SCAN_NODE, - TPlanNodeType::DATA_SOURCE_NODE, TPlanNodeType::KUDU_SCAN_NODE}; - vector<TPlanNodeId> scan_nodes; - FindNodes(fragment.plan, scan_node_types, &scan_nodes); - vector<TPlanNodeId> exch_nodes; - FindNodes(fragment.plan, - vector<TPlanNodeType::type>(1, TPlanNodeType::EXCHANGE_NODE), - &exch_nodes); - - // Add hosts of scan nodes. - vector<TNetworkAddress> scan_hosts; - for (int j = 0; j < scan_nodes.size(); ++j) { - GetScanHosts(scan_nodes[j], exec_request, params, &scan_hosts); - } - unordered_set<TNetworkAddress> hosts(scan_hosts.begin(), scan_hosts.end()); - - // Add hosts of input fragments. - for (int j = 0; j < exch_nodes.size(); ++j) { - int input_fragment_idx = FindSenderFragment(exch_nodes[j], i, exec_request); - const vector<TNetworkAddress>& input_fragment_hosts = - (*fragment_exec_params)[input_fragment_idx].hosts; - hosts.insert(input_fragment_hosts.begin(), input_fragment_hosts.end()); - } - DCHECK(!hosts.empty()) << "no hosts for fragment " << i << " with a UnionNode"; - - params.hosts.assign(hosts.begin(), hosts.end()); - continue; - } - - PlanNodeId leftmost_scan_id = FindLeftmostScan(fragment.plan); - if (leftmost_scan_id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) { - // there is no leftmost scan; we assign the same hosts as those of our - // leftmost input fragment (so that a partitioned aggregation fragment - // runs on the hosts that provide the input data) - int input_fragment_idx = FindLeftmostInputFragment(i, exec_request); - DCHECK_GE(input_fragment_idx, 0); - DCHECK_LT(input_fragment_idx, fragment_exec_params->size()); - params.hosts = (*fragment_exec_params)[input_fragment_idx].hosts; - // TODO: switch to unpartitioned/coord execution if our input fragment - // is executed that way (could have been downgraded from distributed) - continue; - } - - // This fragment is executed on those hosts that have scan ranges - // for the leftmost scan. - GetScanHosts(leftmost_scan_id, exec_request, params, ¶ms.hosts); - } - - unordered_set<TNetworkAddress> unique_hosts; - for (const FragmentExecParams& exec_params: *fragment_exec_params) { - unique_hosts.insert(exec_params.hosts.begin(), exec_params.hosts.end()); - } - schedule->SetUniqueHosts(unique_hosts); -} - PlanNodeId SimpleScheduler::FindLeftmostNode( const TPlan& plan, const vector<TPlanNodeType::type>& types) { // the first node with num_children == 0 is the leftmost node @@ -827,11 +721,17 @@ void SimpleScheduler::FindNodes(const TPlan& plan, } void SimpleScheduler::GetScanHosts(TPlanNodeId scan_id, - const TQueryExecRequest& exec_request, const FragmentExecParams& params, - vector<TNetworkAddress>* scan_hosts) { - map<TPlanNodeId, vector<TScanRangeLocations>>::const_iterator entry = - exec_request.per_node_scan_ranges.find(scan_id); - if (entry == exec_request.per_node_scan_ranges.end() || entry->second.empty()) { + const FragmentExecParams& params, vector<TNetworkAddress>* scan_hosts) { + // Get the list of impalad host from scan_range_assignment_ + for (const FragmentScanRangeAssignment::value_type& scan_range_assignment: + params.scan_range_assignment) { + const PerNodeScanRanges& per_node_scan_ranges = scan_range_assignment.second; + if (per_node_scan_ranges.find(scan_id) != per_node_scan_ranges.end()) { + scan_hosts->push_back(scan_range_assignment.first); + } + } + + if (scan_hosts->empty()) { // this scan node doesn't have any scan ranges; run it on the coordinator // TODO: we'll need to revisit this strategy once we can partition joins // (in which case this fragment might be executing a right outer join @@ -839,40 +739,6 @@ void SimpleScheduler::GetScanHosts(TPlanNodeId scan_id, scan_hosts->push_back(local_backend_descriptor_.address); return; } - - // Get the list of impalad host from scan_range_assignment_ - for (const FragmentScanRangeAssignment::value_type& scan_range_assignment: - params.scan_range_assignment) { - scan_hosts->push_back(scan_range_assignment.first); - } -} - -int SimpleScheduler::FindLeftmostInputFragment( - int fragment_idx, const TQueryExecRequest& exec_request) { - // find the leftmost node, which we expect to be an exchage node - vector<TPlanNodeType::type> exch_node_type; - exch_node_type.push_back(TPlanNodeType::EXCHANGE_NODE); - PlanNodeId exch_id = - FindLeftmostNode(exec_request.fragments[fragment_idx].plan, exch_node_type); - if (exch_id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) { - return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID; - } - // find the fragment that sends to this exchange node - return FindSenderFragment(exch_id, fragment_idx, exec_request); -} - -int SimpleScheduler::FindSenderFragment(TPlanNodeId exch_id, int fragment_idx, - const TQueryExecRequest& exec_request) { - for (int i = 0; i < exec_request.dest_fragment_idx.size(); ++i) { - if (exec_request.dest_fragment_idx[i] != fragment_idx) continue; - const TPlanFragment& input_fragment = exec_request.fragments[i + 1]; - DCHECK(input_fragment.__isset.output_sink); - DCHECK(input_fragment.output_sink.__isset.stream_sink); - if (input_fragment.output_sink.stream_sink.dest_node_id == exch_id) return i + 1; - } - // this shouldn't happen - DCHECK(false) << "no fragment sends to exch id " << exch_id; - return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID; } Status SimpleScheduler::Schedule(QuerySchedule* schedule) { @@ -882,29 +748,26 @@ Status SimpleScheduler::Schedule(QuerySchedule* schedule) { schedule->set_request_pool(resolved_pool); schedule->summary_profile()->AddInfoString("Request Pool", resolved_pool); - bool is_mt_execution = schedule->request().query_ctx.request.query_options.mt_dop > 0; - if (is_mt_execution) { - RETURN_IF_ERROR(MtComputeScanRangeAssignment(schedule)); - MtComputeFragmentExecParams(schedule); - - // compute unique hosts - unordered_set<TNetworkAddress> unique_hosts; - for (const MtFragmentExecParams& f: schedule->mt_fragment_exec_params()) { - for (const FInstanceExecParams& i: f.instance_exec_params) { - unique_hosts.insert(i.host); - } - } - schedule->SetUniqueHosts(unique_hosts); + RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule)); + ComputeFragmentExecParams(schedule); +#ifndef NDEBUG + schedule->Validate(); +#endif - // TODO-MT: call AdmitQuery() - } else { - RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule->request(), schedule)); - ComputeFragmentHosts(schedule->request(), schedule); - ComputeFragmentExecParams(schedule->request(), schedule); - if (!FLAGS_disable_admission_control) { - RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule)); + // compute unique hosts + unordered_set<TNetworkAddress> unique_hosts; + for (const FragmentExecParams& f: schedule->fragment_exec_params()) { + for (const FInstanceExecParams& i: f.instance_exec_params) { + unique_hosts.insert(i.host); } } + schedule->SetUniqueHosts(unique_hosts); + + // TODO-MT: call AdmitQuery() + bool is_mt_execution = schedule->request().query_ctx.request.query_options.mt_dop > 0; + if (!is_mt_execution && !FLAGS_disable_admission_control) { + RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule)); + } return Status::OK(); } @@ -1020,7 +883,7 @@ void SimpleScheduler::AssignmentCtx::SelectBackendOnHost(const IpAddr& backend_i void SimpleScheduler::AssignmentCtx::RecordScanRangeAssignment( const TBackendDescriptor& backend, PlanNodeId node_id, const vector<TNetworkAddress>& host_list, - const TScanRangeLocations& scan_range_locations, + const TScanRangeLocationList& scan_range_locations, FragmentScanRangeAssignment* assignment) { int64_t scan_range_length = 0; if (scan_range_locations.scan_range.__isset.hdfs_file_split) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/scheduling/simple-scheduler.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler.h b/be/src/scheduling/simple-scheduler.h index 6a9d6db..cc68432 100644 --- a/be/src/scheduling/simple-scheduler.h +++ b/be/src/scheduling/simple-scheduler.h @@ -40,8 +40,6 @@ namespace impala { -class Coordinator; - namespace test { class SchedulerWrapper; } @@ -199,7 +197,7 @@ class SimpleScheduler : public Scheduler { /// scan range and its replica locations. void RecordScanRangeAssignment(const TBackendDescriptor& backend, PlanNodeId node_id, const vector<TNetworkAddress>& host_list, - const TScanRangeLocations& scan_range_locations, + const TScanRangeLocationList& scan_range_locations, FragmentScanRangeAssignment* assignment); const BackendConfig& backend_config() const { return backend_config_; } @@ -331,11 +329,11 @@ class SimpleScheduler : public Scheduler { Status GetRequestPool(const std::string& user, const TQueryOptions& query_options, std::string* pool) const; - /// Compute the assignment of scan ranges to hosts for each scan node in 'schedule'. + /// Compute the assignment of scan ranges to hosts for each scan node in + /// the schedule's TQueryExecRequest.plan_exec_info. /// Unpartitioned fragments are assigned to the coordinator. Populate the schedule's /// fragment_exec_params_ with the resulting scan range assignment. - Status ComputeScanRangeAssignment(const TQueryExecRequest& exec_request, - QuerySchedule* schedule); + Status ComputeScanRangeAssignment(QuerySchedule* schedule); /// Process the list of scan ranges of a single plan node and compute scan range /// assignments (returned in 'assignment'). The result is a mapping from hosts to their @@ -397,50 +395,49 @@ class SimpleScheduler : public Scheduler { /// assignment: Output parameter, to which new assignments will be added. Status ComputeScanRangeAssignment(const BackendConfig& backend_config, PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference, - bool node_random_replica, const std::vector<TScanRangeLocations>& locations, + bool node_random_replica, const std::vector<TScanRangeLocationList>& locations, const std::vector<TNetworkAddress>& host_list, bool exec_at_coord, const TQueryOptions& query_options, RuntimeProfile::Counter* timer, FragmentScanRangeAssignment* assignment); - /// Populate fragment_exec_params_ in schedule. - void ComputeFragmentExecParams(const TQueryExecRequest& exec_request, - QuerySchedule* schedule); - - /// Compute the assignment of scan ranges to hosts for each scan node in - /// the schedule's TQueryExecRequest.mt_plan_exec_info. - /// Unpartitioned fragments are assigned to the coordinator. Populate the schedule's - /// mt_fragment_exec_params_ with the resulting scan range assignment. - Status MtComputeScanRangeAssignment(QuerySchedule* schedule); - - /// Compute the MtFragmentExecParams for all plans in the schedule's - /// TQueryExecRequest.mt_plan_exec_info. + /// Compute the FragmentExecParams for all plans in the schedule's + /// TQueryExecRequest.plan_exec_info. /// This includes the routing information (destinations, per_exch_num_senders, /// sender_id) - void MtComputeFragmentExecParams(QuerySchedule* schedule); + void ComputeFragmentExecParams(QuerySchedule* schedule); /// Recursively create FInstanceExecParams and set per_node_scan_ranges for /// fragment_params and its input fragments via a depth-first traversal. /// All fragments are part of plan_exec_info. - void MtComputeFragmentExecParams(const TPlanExecInfo& plan_exec_info, - MtFragmentExecParams* fragment_params, QuerySchedule* schedule); + void ComputeFragmentExecParams(const TPlanExecInfo& plan_exec_info, + FragmentExecParams* fragment_params, QuerySchedule* schedule); + + /// Create instances of the fragment corresponding to fragment_params, which contains + /// a Union node. + /// UnionNodes are special because they can consume multiple partitioned inputs, + /// as well as execute multiple scans in the same fragment. + /// Fragments containing a UnionNode are executed on the union of hosts of all + /// scans in the fragment as well as the hosts of all its input fragments (s.t. + /// a UnionNode with partitioned joins or grouping aggregates as children runs on + /// at least as many hosts as the input to those children). + /// TODO: is this really necessary? If not, revise. + void CreateUnionInstances( + FragmentExecParams* fragment_params, QuerySchedule* schedule); /// Create instances of the fragment corresponding to fragment_params to run on the /// selected replica hosts of the scan ranges of the node with id scan_id. /// The maximum number of instances is the value of query option mt_dop. - /// This attempts to load balance among instances by computing the average number - /// of bytes per instances and then in a single pass assigning scan ranges to each + /// For HDFS, this attempts to load balance among instances by computing the average + /// number of bytes per instances and then in a single pass assigning scan ranges to each /// instances to roughly meet that average. - void MtCreateScanInstances(PlanNodeId scan_id, - MtFragmentExecParams* fragment_params, QuerySchedule* schedule); - - /// For each instance of the single input fragment of the fragment corresponding to - /// fragment_params, create an instance for this fragment. - void MtCreateMirrorInstances(MtFragmentExecParams* fragment_params, - QuerySchedule* schedule); - - /// For each fragment in exec_request, computes hosts on which to run the instances - /// and stores result in fragment_exec_params_.hosts. - void ComputeFragmentHosts(const TQueryExecRequest& exec_request, + /// For all other storage mgrs, it load-balances the number of splits per instance. + void CreateScanInstances(PlanNodeId scan_id, + FragmentExecParams* fragment_params, QuerySchedule* schedule); + + /// For each instance of fragment_params's input fragment, create a collocated + /// instance for fragment_params's fragment. + /// Expects that fragment_params only has a single input fragment. + void CreateCollocatedInstances(FragmentExecParams* fragment_params, QuerySchedule* schedule); /// Return the id of the leftmost node of any of the given types in 'plan', or @@ -450,14 +447,9 @@ class SimpleScheduler : public Scheduler { /// Same for scan nodes. PlanNodeId FindLeftmostScan(const TPlan& plan); - /// Return the index (w/in exec_request.fragments) of fragment that sends its output to - /// exec_request.fragment[fragment_idx]'s leftmost ExchangeNode. - /// Return INVALID_PLAN_NODE_ID if the leftmost node is not an exchange node. - int FindLeftmostInputFragment(int fragment_idx, const TQueryExecRequest& exec_request); - /// Add all hosts the given scan is executed on to scan_hosts. - void GetScanHosts(TPlanNodeId scan_id, const TQueryExecRequest& exec_request, - const FragmentExecParams& params, std::vector<TNetworkAddress>* scan_hosts); + void GetScanHosts(TPlanNodeId scan_id, const FragmentExecParams& params, + std::vector<TNetworkAddress>* scan_hosts); /// Return true if 'plan' contains a node of the given type. bool ContainsNode(const TPlan& plan, TPlanNodeType::type type); @@ -466,11 +458,6 @@ class SimpleScheduler : public Scheduler { void FindNodes(const TPlan& plan, const std::vector<TPlanNodeType::type>& types, std::vector<TPlanNodeId>* results); - /// Returns the index (w/in exec_request.fragments) of fragment that sends its output - /// to the given exchange in the given fragment index. - int FindSenderFragment(TPlanNodeId exch_id, int fragment_idx, - const TQueryExecRequest& exec_request); - friend class impala::test::SchedulerWrapper; FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentDeterministicNonCached); FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomNonCached); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/service/impala-http-handler.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc index 923c864..b335a29 100644 --- a/be/src/service/impala-http-handler.cc +++ b/be/src/service/impala-http-handler.cc @@ -713,7 +713,12 @@ void ImpalaHttpHandler::QuerySummaryHandler(bool include_json_plan, bool include summary = exec_state->coord()->exec_summary(); } if (include_json_plan) { - fragments = exec_state->exec_request().query_exec_request.fragments; + for (const TPlanExecInfo& plan_exec_info: + exec_state->exec_request().query_exec_request.plan_exec_info) { + for (const TPlanFragment& fragment: plan_exec_info.fragments) { + fragments.push_back(fragment); + } + } } } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 4886bdf..c0eed50 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -1599,7 +1599,11 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const QueryExecState& exec_stat } // Save the query fragments so that the plan can be visualised. - fragments = exec_state.exec_request().query_exec_request.fragments; + for (const TPlanExecInfo& plan_exec_info: + exec_state.exec_request().query_exec_request.plan_exec_info) { + fragments.insert(fragments.end(), + plan_exec_info.fragments.begin(), plan_exec_info.fragments.end()); + } all_rows_returned = exec_state.eos(); last_active_time = exec_state.last_active(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/service/query-exec-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc index 7c40d37..5cd4c0c 100644 --- a/be/src/service/query-exec-state.cc +++ b/be/src/service/query-exec-state.cc @@ -379,8 +379,7 @@ Status ImpalaServer::QueryExecState::ExecLocalCatalogOp( Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest( const TQueryExecRequest& query_exec_request) { // we always need at least one plan fragment - DCHECK(query_exec_request.fragments.size() > 0 - || query_exec_request.mt_plan_exec_info.size() > 0); + DCHECK(query_exec_request.plan_exec_info.size() > 0); if (query_exec_request.__isset.query_plan) { stringstream plan_ss; @@ -427,11 +426,6 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest( summary_profile_.AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str()); } - bool is_mt_exec = query_exec_request.query_ctx.request.query_options.mt_dop > 0; - const TPlanFragment& fragment = is_mt_exec - ? query_exec_request.mt_plan_exec_info[0].fragments[0] - : query_exec_request.fragments[0]; - { lock_guard<mutex> l(lock_); // Don't start executing the query if Cancel() was called concurrently with Exec(). http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/common/thrift/Frontend.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift index fbbf7be..f6ba35a 100644 --- a/common/thrift/Frontend.thrift +++ b/common/thrift/Frontend.thrift @@ -347,15 +347,9 @@ struct TPlanExecInfo { // it is unpartitioned. 1: required list<Planner.TPlanFragment> fragments - // Specifies the destination fragment of the output of each fragment. - // dest_fragment_idx.size() == fragments.size() - 1 and - // fragments[i] sends its output to fragments[dest_fragment_idx[i-1]] - // TODO: remove; TPlanFragment.output_sink.dest_node_id is sufficient - 2: optional list<i32> dest_fragment_idx - // A map from scan node ids to a list of scan range locations. - // The node ids refer to scan nodes in fragments[].plan_tree - 3: optional map<Types.TPlanNodeId, list<Planner.TScanRangeLocations>> + // The node ids refer to scan nodes in fragments[].plan + 2: optional map<Types.TPlanNodeId, list<Planner.TScanRangeLocationList>> per_node_scan_ranges } @@ -364,57 +358,39 @@ struct TQueryExecRequest { // global descriptor tbl for all fragments 1: optional Descriptors.TDescriptorTable desc_tbl - // fragments[i] may consume the output of fragments[j > i]; - // fragments[0] is the root fragment and also the coordinator fragment, if - // it is unpartitioned. - 2: optional list<Planner.TPlanFragment> fragments - - // Specifies the destination fragment of the output of each fragment. - // parent_fragment_idx.size() == fragments.size() - 1 and - // fragments[i] sends its output to fragments[dest_fragment_idx[i-1]] - 3: optional list<i32> dest_fragment_idx - - // A map from scan node ids to a list of scan range locations. - // The node ids refer to scan nodes in fragments[].plan_tree - 4: optional map<Types.TPlanNodeId, list<Planner.TScanRangeLocations>> - per_node_scan_ranges - - // Multi-threaded execution: exec info for all plans; the first one materializes - // the query result - // TODO: this will eventually supercede fields fragments, dest_fragment_idx, - // per_node_scan_ranges - 14: optional list<TPlanExecInfo> mt_plan_exec_info + // exec info for all plans; the first one materializes the query result + 2: optional list<TPlanExecInfo> plan_exec_info // Metadata of the query result set (only for select) - 5: optional Results.TResultSetMetadata result_set_metadata + 3: optional Results.TResultSetMetadata result_set_metadata // Set if the query needs finalization after it executes - 6: optional TFinalizeParams finalize_params + 4: optional TFinalizeParams finalize_params - 7: required ImpalaInternalService.TQueryCtx query_ctx + 5: required ImpalaInternalService.TQueryCtx query_ctx // The same as the output of 'explain <query>' - 8: optional string query_plan + 6: optional string query_plan // The statement type governs when the coordinator can judge a query to be finished. // DML queries are complete after Wait(), SELECTs may not be. Generally matches // the stmt_type of the parent TExecRequest, but in some cases (such as CREATE TABLE // AS SELECT), these may differ. - 9: required Types.TStmtType stmt_type + 7: required Types.TStmtType stmt_type // Estimated per-host peak memory consumption in bytes. Used for resource management. - 10: optional i64 per_host_mem_req + 8: optional i64 per_host_mem_req // Estimated per-host CPU requirements in YARN virtual cores. // Used for resource management. // TODO: Remove this and associated code in Planner. - 11: optional i16 per_host_vcores + 9: optional i16 per_host_vcores // List of replica hosts. Used by the host_idx field of TScanRangeLocation. - 12: required list<Types.TNetworkAddress> host_list + 10: required list<Types.TNetworkAddress> host_list // Column lineage graph - 13: optional LineageGraph.TLineageGraph lineage_graph + 11: optional LineageGraph.TLineageGraph lineage_graph } enum TCatalogOpType { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/common/thrift/Planner.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/Planner.thrift b/common/thrift/Planner.thrift index 92c8681..766551e 100644 --- a/common/thrift/Planner.thrift +++ b/common/thrift/Planner.thrift @@ -77,9 +77,7 @@ struct TScanRangeLocation { } // A single scan range plus the hosts that serve it -// TODO: rename to TScanRangeLocationList, having this differ from the above struct -// by only a single letter has caused needless confusion -struct TScanRangeLocations { +struct TScanRangeLocationList { 1: required PlanNodes.TScanRange scan_range // non-empty list 2: list<TScanRangeLocation> locations http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java index 1f5665f..307e67e 100644 --- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java @@ -54,7 +54,7 @@ import org.apache.impala.thrift.TPlanNodeType; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TScanRange; import org.apache.impala.thrift.TScanRangeLocation; -import org.apache.impala.thrift.TScanRangeLocations; +import org.apache.impala.thrift.TScanRangeLocationList; import org.apache.impala.thrift.TStatus; import com.google.common.base.Joiner; import com.google.common.base.Objects; @@ -322,7 +322,7 @@ public class DataSourceScanNode extends ScanNode { TNetworkAddress networkAddress = addressToTNetworkAddress("localhost:12345"); Integer hostIndex = analyzer.getHostIndex().getIndex(networkAddress); scanRanges_ = Lists.newArrayList( - new TScanRangeLocations( + new TScanRangeLocationList( new TScanRange(), Lists.newArrayList(new TScanRangeLocation(hostIndex)))); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java index f299131..89296f1 100644 --- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java @@ -59,7 +59,7 @@ import org.apache.impala.thrift.TPlanNodeType; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TScanRange; import org.apache.impala.thrift.TScanRangeLocation; -import org.apache.impala.thrift.TScanRangeLocations; +import org.apache.impala.thrift.TScanRangeLocationList; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -337,13 +337,13 @@ public class HBaseScanNode extends ScanNode { // extend the key range setKeyRangeEnd(keyRange, curRegEndKey); } else { - // create a new HBaseKeyRange (and TScanRange2/TScanRangeLocations to go + // create a new HBaseKeyRange (and TScanRange2/TScanRangeLocationList to go // with it). keyRange = new THBaseKeyRange(); setKeyRangeStart(keyRange, curRegStartKey); setKeyRangeEnd(keyRange, curRegEndKey); - TScanRangeLocations scanRangeLocation = new TScanRangeLocations(); + TScanRangeLocationList scanRangeLocation = new TScanRangeLocationList(); TNetworkAddress networkAddress = addressToTNetworkAddress(locEntry.getKey()); scanRangeLocation.addToLocations( new TScanRangeLocation(analyzer.getHostIndex().getIndex(networkAddress))); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 95d86e3..66ed792 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -51,7 +51,7 @@ import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TReplicaPreference; import org.apache.impala.thrift.TScanRange; import org.apache.impala.thrift.TScanRangeLocation; -import org.apache.impala.thrift.TScanRangeLocations; +import org.apache.impala.thrift.TScanRangeLocationList; import org.apache.impala.util.MembershipSnapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -353,7 +353,7 @@ public class HdfsScanNode extends ScanNode { fileDesc.getFileName(), currentOffset, currentLength, partition.getId(), fileDesc.getFileLength(), fileDesc.getFileCompression(), fileDesc.getModificationTime())); - TScanRangeLocations scanRangeLocations = new TScanRangeLocations(); + TScanRangeLocationList scanRangeLocations = new TScanRangeLocationList(); scanRangeLocations.scan_range = scanRange; scanRangeLocations.locations = locations; scanRanges_.add(scanRangeLocations); @@ -456,7 +456,7 @@ public class HdfsScanNode extends ScanNode { int totalNodes = 0; int numLocalRanges = 0; int numRemoteRanges = 0; - for (TScanRangeLocations range: scanRanges_) { + for (TScanRangeLocationList range: scanRanges_) { boolean anyLocal = false; for (TScanRangeLocation loc: range.locations) { TNetworkAddress dataNode = analyzer.getHostIndex().getEntry(loc.getHost_idx()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java index d338608..7ef1c20 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java @@ -43,7 +43,7 @@ import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; import org.apache.impala.thrift.TScanRange; import org.apache.impala.thrift.TScanRangeLocation; -import org.apache.impala.thrift.TScanRangeLocations; +import org.apache.impala.thrift.TScanRangeLocationList; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.client.KuduClient; @@ -183,7 +183,7 @@ public class KuduScanNode extends ScanNode { token.toString(), e); } - TScanRangeLocations locs = new TScanRangeLocations(); + TScanRangeLocationList locs = new TScanRangeLocationList(); locs.setScan_range(scanRange); locs.locations = locations; scanRanges_.add(locs); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/fe/src/main/java/org/apache/impala/planner/Planner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index fade723..ec4cf7e 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -302,9 +302,7 @@ public class Planner { for (int i = 0; i < fragments.size(); ++i) { PlanFragment fragment = fragments.get(i); str.append(fragment.getExplainString(explainLevel)); - if (explainLevel == TExplainLevel.VERBOSE && i + 1 != fragments.size()) { - str.append("\n"); - } + if (i < fragments.size() - 1) str.append("\n"); } } return str.toString(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/fe/src/main/java/org/apache/impala/planner/ScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java index f444ffa..985a03a 100644 --- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java @@ -26,7 +26,7 @@ import org.apache.impala.catalog.Type; import org.apache.impala.common.NotImplementedException; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TNetworkAddress; -import org.apache.impala.thrift.TScanRangeLocations; +import org.apache.impala.thrift.TScanRangeLocationList; import com.google.common.base.Joiner; import com.google.common.base.Objects; import com.google.common.base.Preconditions; @@ -45,7 +45,7 @@ abstract public class ScanNode extends PlanNode { protected int numPartitionsMissingStats_ = 0; // List of scan-range locations. Populated in init(). - protected List<TScanRangeLocations> scanRanges_; + protected List<TScanRangeLocationList> scanRanges_; public ScanNode(PlanNodeId id, TupleDescriptor desc, String displayName) { super(id, desc.getId().asList(), displayName); @@ -82,7 +82,7 @@ abstract public class ScanNode extends PlanNode { /** * Returns all scan ranges plus their locations. */ - public List<TScanRangeLocations> getScanRangeLocations() { + public List<TScanRangeLocationList> getScanRangeLocations() { Preconditions.checkNotNull(scanRanges_, "Need to call init() first."); return scanRanges_; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/fe/src/main/java/org/apache/impala/service/Frontend.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 4bb51ad..a974278 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -919,27 +919,15 @@ public class Frontend { TPlanExecInfo result = new TPlanExecInfo(); ArrayList<PlanFragment> fragments = planRoot.getNodesPreOrder(); - // map from fragment to its index in TPlanExecInfo.fragments; needed for - // TPlanExecInfo.dest_fragment_idx + // collect ScanNodes List<ScanNode> scanNodes = Lists.newArrayList(); - Map<PlanFragment, Integer> fragmentIdx = Maps.newHashMap(); - for (int idx = 0; idx < fragments.size(); ++idx) { - PlanFragment fragment = fragments.get(idx); + for (PlanFragment fragment: fragments) { Preconditions.checkNotNull(fragment.getPlanRoot()); fragment.getPlanRoot().collect(Predicates.instanceOf(ScanNode.class), scanNodes); - fragmentIdx.put(fragment, idx); - } - - // set fragment destinations - for (int i = 1; i < fragments.size(); ++i) { - PlanFragment dest = fragments.get(i).getDestFragment(); - Integer idx = fragmentIdx.get(dest); - Preconditions.checkState(idx != null); - result.addToDest_fragment_idx(idx.intValue()); } // Set scan ranges/locations for scan nodes. - LOG.debug("get scan range locations"); + LOG.trace("get scan range locations"); Set<TTableName> tablesMissingStats = Sets.newTreeSet(); Set<TTableName> tablesWithCorruptStats = Sets.newTreeSet(); for (ScanNode scanNode: scanNodes) { @@ -960,6 +948,16 @@ public class Frontend { queryCtx.addToTables_with_corrupt_stats(tableName); } + // Compute resource requirements after scan range locations because the cost + // estimates of scan nodes rely on them. + try { + planner.computeResourceReqs(fragments, true, queryExecRequest); + } catch (Exception e) { + // Turn exceptions into a warning to allow the query to execute. + LOG.error("Failed to compute resource requirements for query\n" + + queryCtx.request.getStmt(), e); + } + // The fragment at this point has all state set, serialize it to thrift. for (PlanFragment fragment: fragments) { TPlanFragment thriftFragment = fragment.toThrift(); @@ -970,144 +968,54 @@ public class Frontend { } /** - * Create a populated TQueryExecRequest, corresponding to the supplied planner, - * for multi-threaded execution. + * Create a populated TQueryExecRequest, corresponding to the supplied planner. */ - private TQueryExecRequest mtCreateExecRequest( - Planner planner, StringBuilder explainString) - throws ImpalaException { + private TQueryExecRequest createExecRequest( + Planner planner, StringBuilder explainString) throws ImpalaException { TQueryCtx queryCtx = planner.getQueryCtx(); - Preconditions.checkState(queryCtx.request.query_options.mt_dop > 0); - // for now, always disable spilling in the backend - // TODO-MT: re-enable spilling - queryCtx.setDisable_spilling(true); - TQueryExecRequest result = new TQueryExecRequest(); - - LOG.debug("create mt plan"); - List<PlanFragment> planRoots = planner.createParallelPlans(); + AnalysisContext.AnalysisResult analysisResult = planner.getAnalysisResult(); + boolean isMtExec = + analysisResult.isQueryStmt() && queryCtx.request.query_options.mt_dop > 0; - // create EXPLAIN output - result.setQuery_ctx(queryCtx); // needed by getExplainString() - explainString.append( - planner.getExplainString(Lists.newArrayList(planRoots.get(0)), result)); - result.setQuery_plan(explainString.toString()); + List<PlanFragment> planRoots = Lists.newArrayList(); + TQueryExecRequest result = new TQueryExecRequest(); + if (isMtExec) { + LOG.debug("create mt plan"); + planRoots.addAll(planner.createParallelPlans()); + } else { + LOG.debug("create plan"); + planRoots.add(planner.createPlan().get(0)); + } // create per-plan exec info; // also assemble list of names of tables with missing or corrupt stats for // assembling a warning message for (PlanFragment planRoot: planRoots) { - result.addToMt_plan_exec_info( + result.addToPlan_exec_info( createPlanExecInfo(planRoot, planner, queryCtx, result)); } - // assign fragment ids - int idx = 0; - for (TPlanExecInfo planExecInfo: result.mt_plan_exec_info) { - for (TPlanFragment fragment: planExecInfo.fragments) fragment.setIdx(idx++); - } - - // TODO-MT: implement - // Compute resource requirements after scan range locations because the cost - // estimates of scan nodes rely on them. - //try { - //planner.computeResourceReqs(fragments, true, queryExecRequest); - //} catch (Exception e) { - //// Turn exceptions into a warning to allow the query to execute. - //LOG.error("Failed to compute resource requirements for query\n" + - //queryCtx.request.getStmt(), e); - //} - - return result; - } - - /** - * Create a populated TQueryExecRequest corresponding to the supplied TQueryCtx. - * TODO-MT: remove this function and rename mtCreateExecRequest() to - * createExecRequest() - */ - private TQueryExecRequest createExecRequest( - Planner planner, StringBuilder explainString) - throws ImpalaException { - LOG.debug("create plan"); - ArrayList<PlanFragment> fragments = planner.createPlan(); - - List<ScanNode> scanNodes = Lists.newArrayList(); - // map from fragment to its index in queryExecRequest.fragments; needed for - // queryExecRequest.dest_fragment_idx - Map<PlanFragment, Integer> fragmentIdx = Maps.newHashMap(); - - for (int idx = 0; idx < fragments.size(); ++idx) { - PlanFragment fragment = fragments.get(idx); - Preconditions.checkNotNull(fragment.getPlanRoot()); - fragment.getPlanRoot().collect(Predicates.instanceOf(ScanNode.class), scanNodes); - fragmentIdx.put(fragment, idx); - } - - TQueryExecRequest result = new TQueryExecRequest(); - // set fragment destinations - for (int i = 1; i < fragments.size(); ++i) { - PlanFragment dest = fragments.get(i).getDestFragment(); - Integer idx = fragmentIdx.get(dest); - Preconditions.checkState(idx != null); - result.addToDest_fragment_idx(idx.intValue()); - } - - // Set scan ranges/locations for scan nodes. - // Also assemble list of tables names missing stats for assembling a warning message. - LOG.debug("get scan range locations"); - Set<TTableName> tablesMissingStats = Sets.newTreeSet(); - // Assemble a similar list for corrupt stats - Set<TTableName> tablesWithCorruptStats = Sets.newTreeSet(); - for (ScanNode scanNode: scanNodes) { - result.putToPer_node_scan_ranges( - scanNode.getId().asInt(), scanNode.getScanRangeLocations()); - if (scanNode.isTableMissingStats()) { - tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift()); - } - if (scanNode.hasCorruptTableStats()) { - tablesWithCorruptStats.add(scanNode.getTupleDesc().getTableName().toThrift()); - } - } - - TQueryCtx queryCtx = planner.getQueryCtx(); - for (TTableName tableName: tablesMissingStats) { - queryCtx.addToTables_missing_stats(tableName); - } - for (TTableName tableName: tablesWithCorruptStats) { - queryCtx.addToTables_with_corrupt_stats(tableName); - } - // Optionally disable spilling in the backend. Allow spilling if there are plan hints // or if all tables have stats. - AnalysisContext.AnalysisResult analysisResult = planner.getAnalysisResult(); - if (queryCtx.request.query_options.isDisable_unsafe_spills() - && !tablesMissingStats.isEmpty() - && !analysisResult.getAnalyzer().hasPlanHints()) { - queryCtx.setDisable_spilling(true); - } - - // Compute resource requirements after scan range locations because the cost - // estimates of scan nodes rely on them. - try { - planner.computeResourceReqs(fragments, true, result); - } catch (Exception e) { - // Turn exceptions into a warning to allow the query to execute. - LOG.error("Failed to compute resource requirements for query\n" + - queryCtx.request.getStmt(), e); - } - - // The fragment at this point has all state set, assign sequential ids - // and serialize to thrift. - for (int i = 0; i < fragments.size(); ++i) { - PlanFragment fragment = fragments.get(i); - TPlanFragment thriftFragment = fragment.toThrift(); - thriftFragment.setIdx(i); - result.addToFragments(thriftFragment); + boolean disableSpilling = + queryCtx.request.query_options.isDisable_unsafe_spills() + && !queryCtx.tables_missing_stats.isEmpty() + && !analysisResult.getAnalyzer().hasPlanHints(); + // for now, always disable spilling for multi-threaded execution + if (isMtExec || disableSpilling) queryCtx.setDisable_spilling(true); + + // assign fragment idx + int idx = 0; + for (TPlanExecInfo planExecInfo: result.plan_exec_info) { + for (TPlanFragment fragment: planExecInfo.fragments) fragment.setIdx(idx++); } + // create EXPLAIN output after setting everything else result.setQuery_ctx(queryCtx); // needed by getExplainString() - explainString.append(planner.getExplainString(fragments, result)); + ArrayList<PlanFragment> allFragments = planRoots.get(0).getNodesPreOrder(); + explainString.append(planner.getExplainString(allFragments, result)); result.setQuery_plan(explainString.toString()); + return result; } @@ -1156,12 +1064,7 @@ public class Frontend { || analysisResult.isDeleteStmt()); Planner planner = new Planner(analysisResult, queryCtx); - TQueryExecRequest queryExecRequest; - if (analysisResult.isQueryStmt() && queryCtx.request.query_options.mt_dop > 0) { - queryExecRequest = mtCreateExecRequest(planner, explainString); - } else { - queryExecRequest = createExecRequest(planner, explainString); - } + TQueryExecRequest queryExecRequest = createExecRequest(planner, explainString); queryExecRequest.setDesc_tbl( planner.getAnalysisResult().getAnalyzer().getDescTbl().toThrift()); queryExecRequest.setQuery_ctx(queryCtx); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java index a94901b..745efa3 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java @@ -57,12 +57,13 @@ import org.apache.impala.thrift.THdfsScanNode; import org.apache.impala.thrift.THdfsTable; import org.apache.impala.thrift.TLineageGraph; import org.apache.impala.thrift.TNetworkAddress; +import org.apache.impala.thrift.TPlanExecInfo; import org.apache.impala.thrift.TPlanFragment; import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TQueryCtx; import org.apache.impala.thrift.TQueryExecRequest; import org.apache.impala.thrift.TQueryOptions; -import org.apache.impala.thrift.TScanRangeLocations; +import org.apache.impala.thrift.TScanRangeLocationList; import org.apache.impala.thrift.TTableDescriptor; import org.apache.impala.thrift.TTableSink; import org.apache.impala.thrift.TTupleDescriptor; @@ -133,9 +134,11 @@ public class PlannerTestBase extends FrontendTestBase { planMap_.clear(); tupleMap_.clear(); tableMap_.clear(); - for (TPlanFragment frag: execRequest.fragments) { - for (TPlanNode node: frag.plan.nodes) { - planMap_.put(node.node_id, node); + for (TPlanExecInfo execInfo: execRequest.plan_exec_info) { + for (TPlanFragment frag: execInfo.fragments) { + for (TPlanNode node: frag.plan.nodes) { + planMap_.put(node.node_id, node); + } } } if (execRequest.isSetDesc_tbl()) { @@ -194,24 +197,28 @@ public class PlannerTestBase extends FrontendTestBase { long insertTableId = -1; // Collect all partitions that are referenced by a scan range. Set<THdfsPartition> scanRangePartitions = Sets.newHashSet(); - if (execRequest.per_node_scan_ranges != null) { - for (Map.Entry<Integer, List<TScanRangeLocations>> entry: - execRequest.per_node_scan_ranges.entrySet()) { - if (entry.getValue() == null) { - continue; - } - for (TScanRangeLocations locations: entry.getValue()) { - if (locations.scan_range.isSetHdfs_file_split()) { - THdfsFileSplit split = locations.scan_range.getHdfs_file_split(); - THdfsPartition partition = findPartition(entry.getKey(), split); - scanRangePartitions.add(partition); + for (TPlanExecInfo execInfo: execRequest.plan_exec_info) { + if (execInfo.per_node_scan_ranges != null) { + for (Map.Entry<Integer, List<TScanRangeLocationList>> entry: + execInfo.per_node_scan_ranges.entrySet()) { + if (entry.getValue() == null) { + continue; + } + for (TScanRangeLocationList locationList: entry.getValue()) { + if (locationList.scan_range.isSetHdfs_file_split()) { + THdfsFileSplit split = locationList.scan_range.getHdfs_file_split(); + THdfsPartition partition = findPartition(entry.getKey(), split); + scanRangePartitions.add(partition); + } } } } } + if (execRequest.isSetFinalize_params()) { insertTableId = execRequest.getFinalize_params().getTable_id(); } + boolean first = true; // Iterate through all partitions of the descriptor table and verify all partitions // are referenced. @@ -241,64 +248,62 @@ public class PlannerTestBase extends FrontendTestBase { */ private StringBuilder printScanRangeLocations(TQueryExecRequest execRequest) { StringBuilder result = new StringBuilder(); - if (execRequest.per_node_scan_ranges == null) { - return result; - } - for (Map.Entry<Integer, List<TScanRangeLocations>> entry: - execRequest.per_node_scan_ranges.entrySet()) { - result.append("NODE " + entry.getKey().toString() + ":\n"); - if (entry.getValue() == null) { - continue; - } - - for (TScanRangeLocations locations: entry.getValue()) { - // print scan range - result.append(" "); - if (locations.scan_range.isSetHdfs_file_split()) { - THdfsFileSplit split = locations.scan_range.getHdfs_file_split(); - THdfsTable table = findTable(entry.getKey()); - THdfsPartition partition = table.getPartitions().get(split.partition_id); - THdfsPartitionLocation location = partition.getLocation(); - String file_location = location.getSuffix(); - if (location.prefix_index != -1) { - file_location = - table.getPartition_prefixes().get(location.prefix_index) + file_location; - } - Path filePath = new Path(file_location, split.file_name); - filePath = cleanseFilePath(filePath); - result.append("HDFS SPLIT " + filePath.toString() + " " - + Long.toString(split.offset) + ":" + Long.toString(split.length)); - } - if (locations.scan_range.isSetHbase_key_range()) { - THBaseKeyRange keyRange = locations.scan_range.getHbase_key_range(); - Integer hostIdx = locations.locations.get(0).host_idx; - TNetworkAddress networkAddress = execRequest.getHost_list().get(hostIdx); - result.append("HBASE KEYRANGE "); - result.append("port=" + networkAddress.port + " "); - if (keyRange.isSetStartKey()) { - result.append(HBaseScanNode.printKey(keyRange.getStartKey().getBytes())); - } else { - result.append("<unbounded>"); + for (TPlanExecInfo execInfo: execRequest.plan_exec_info) { + if (execInfo.per_node_scan_ranges == null) continue; + for (Map.Entry<Integer, List<TScanRangeLocationList>> entry: + execInfo.per_node_scan_ranges.entrySet()) { + result.append("NODE " + entry.getKey().toString() + ":\n"); + if (entry.getValue() == null) continue; + + for (TScanRangeLocationList locations: entry.getValue()) { + // print scan range + result.append(" "); + if (locations.scan_range.isSetHdfs_file_split()) { + THdfsFileSplit split = locations.scan_range.getHdfs_file_split(); + THdfsTable table = findTable(entry.getKey()); + THdfsPartition partition = table.getPartitions().get(split.partition_id); + THdfsPartitionLocation location = partition.getLocation(); + String file_location = location.getSuffix(); + if (location.prefix_index != -1) { + file_location = + table.getPartition_prefixes().get(location.prefix_index) + file_location; + } + Path filePath = new Path(file_location, split.file_name); + filePath = cleanseFilePath(filePath); + result.append("HDFS SPLIT " + filePath.toString() + " " + + Long.toString(split.offset) + ":" + Long.toString(split.length)); } - result.append(":"); - if (keyRange.isSetStopKey()) { - result.append(HBaseScanNode.printKey(keyRange.getStopKey().getBytes())); - } else { - result.append("<unbounded>"); + if (locations.scan_range.isSetHbase_key_range()) { + THBaseKeyRange keyRange = locations.scan_range.getHbase_key_range(); + Integer hostIdx = locations.locations.get(0).host_idx; + TNetworkAddress networkAddress = execRequest.getHost_list().get(hostIdx); + result.append("HBASE KEYRANGE "); + result.append("port=" + networkAddress.port + " "); + if (keyRange.isSetStartKey()) { + result.append(HBaseScanNode.printKey(keyRange.getStartKey().getBytes())); + } else { + result.append("<unbounded>"); + } + result.append(":"); + if (keyRange.isSetStopKey()) { + result.append(HBaseScanNode.printKey(keyRange.getStopKey().getBytes())); + } else { + result.append("<unbounded>"); + } } - } - if (locations.scan_range.isSetKudu_scan_token()) { - Preconditions.checkNotNull(kuduClient_, - "Test should not be invoked on platforms that do not support Kudu."); - try { - result.append(KuduScanToken.stringifySerializedToken( - locations.scan_range.kudu_scan_token.array(), kuduClient_)); - } catch (IOException e) { - throw new IllegalStateException("Unable to parse Kudu scan token", e); + if (locations.scan_range.isSetKudu_scan_token()) { + Preconditions.checkNotNull(kuduClient_, + "Test should not be invoked on platforms that do not support Kudu."); + try { + result.append(KuduScanToken.stringifySerializedToken( + locations.scan_range.kudu_scan_token.array(), kuduClient_)); + } catch (IOException e) { + throw new IllegalStateException("Unable to parse Kudu scan token", e); + } } + result.append("\n"); } - result.append("\n"); } } return result; @@ -446,8 +451,8 @@ public class PlannerTestBase extends FrontendTestBase { } } - if (execRequest.isSetFragments() && !execRequest.fragments.isEmpty()) { - TPlanFragment firstPlanFragment = execRequest.fragments.get(0); + if (execRequest.isSetPlan_exec_info() && !execRequest.plan_exec_info.isEmpty()) { + TPlanFragment firstPlanFragment = execRequest.plan_exec_info.get(0).fragments.get(0); if (firstPlanFragment.isSetOutput_sink() && firstPlanFragment.output_sink.isSetTable_sink()) { TTableSink tableSink = firstPlanFragment.output_sink.table_sink; @@ -552,7 +557,7 @@ public class PlannerTestBase extends FrontendTestBase { // Query exec request may not be set for DDL, e.g., CTAS. String locationsStr = null; if (execRequest != null && execRequest.isSetQuery_exec_request()) { - if (execRequest.query_exec_request.fragments == null) return; + if (execRequest.query_exec_request.plan_exec_info == null) return; buildMaps(execRequest.query_exec_request); // If we optimize the partition key scans, we may get all the partition key values // from the metadata and don't reference any table. Skip the check in this case. @@ -614,27 +619,29 @@ public class PlannerTestBase extends FrontendTestBase { if (execRequest == null) return; if (!execRequest.isSetQuery_exec_request() || execRequest.query_exec_request == null - || execRequest.query_exec_request.fragments == null) { + || execRequest.query_exec_request.plan_exec_info == null) { return; } - for (TPlanFragment planFragment : execRequest.query_exec_request.fragments) { - if (!planFragment.isSetPlan() || planFragment.plan == null) continue; - for (TPlanNode node : planFragment.plan.nodes) { - if (!node.isSetLimit() || -1 == node.limit) continue; - if (!node.isSetEstimated_stats() || node.estimated_stats == null) continue; - if (node.limit < node.estimated_stats.cardinality) { - StringBuilder limitCardinalityError = new StringBuilder(); - limitCardinalityError.append("Query: " + query + "\n"); - limitCardinalityError.append( - "Expected cardinality estimate less than or equal to LIMIT: " - + node.limit + "\n"); - limitCardinalityError.append( - "Actual cardinality estimate: " - + node.estimated_stats.cardinality + "\n"); - limitCardinalityError.append( - "In node id " - + node.node_id + "\n"); - errorLog.append(limitCardinalityError.toString()); + for (TPlanExecInfo execInfo : execRequest.query_exec_request.plan_exec_info) { + for (TPlanFragment planFragment : execInfo.fragments) { + if (!planFragment.isSetPlan() || planFragment.plan == null) continue; + for (TPlanNode node : planFragment.plan.nodes) { + if (!node.isSetLimit() || -1 == node.limit) continue; + if (!node.isSetEstimated_stats() || node.estimated_stats == null) continue; + if (node.limit < node.estimated_stats.cardinality) { + StringBuilder limitCardinalityError = new StringBuilder(); + limitCardinalityError.append("Query: " + query + "\n"); + limitCardinalityError.append( + "Expected cardinality estimate less than or equal to LIMIT: " + + node.limit + "\n"); + limitCardinalityError.append( + "Actual cardinality estimate: " + + node.estimated_stats.cardinality + "\n"); + limitCardinalityError.append( + "In node id " + + node.node_id + "\n"); + errorLog.append(limitCardinalityError.toString()); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test index fe25599..6ce43db 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test @@ -70,23 +70,23 @@ PLAN-ROOT SINK | 02:TOP-N [LIMIT=10] | order by: count(int_col) ASC, bigint_col ASC -| hosts=3 per-host-mem=unavailable +| hosts=3 per-host-mem=160B | tuple-ids=2 row-size=16B cardinality=10 | 04:AGGREGATE [FINALIZE] | output: count:merge(int_col) | group by: bigint_col -| hosts=3 per-host-mem=unavailable +| hosts=3 per-host-mem=128.00MB | tuple-ids=1 row-size=16B cardinality=unavailable | 03:EXCHANGE [HASH(bigint_col)] -| hosts=3 per-host-mem=unavailable +| hosts=3 per-host-mem=0B | tuple-ids=1 row-size=16B cardinality=unavailable | 01:AGGREGATE [STREAMING] | output: count(int_col) | group by: bigint_col -| hosts=3 per-host-mem=unavailable +| hosts=3 per-host-mem=128.00MB | tuple-ids=1 row-size=16B cardinality=unavailable | 00:SCAN HDFS [functional_parquet.alltypes, RANDOM] @@ -94,7 +94,7 @@ PLAN-ROOT SINK predicates: id < 10 table stats: unavailable column stats: unavailable - hosts=3 per-host-mem=unavailable + hosts=3 per-host-mem=16.00MB tuple-ids=0 row-size=16B cardinality=unavailable ==== # Single-table scan/filter/analysic should work. @@ -136,16 +136,16 @@ PLAN-ROOT SINK | partition by: int_col | order by: id ASC | window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW -| hosts=3 per-host-mem=unavailable +| hosts=3 per-host-mem=0B | tuple-ids=3,2 row-size=16B cardinality=unavailable | 01:SORT | order by: int_col ASC NULLS FIRST, id ASC -| hosts=3 per-host-mem=unavailable +| hosts=3 per-host-mem=0B | tuple-ids=3 row-size=8B cardinality=unavailable | 03:EXCHANGE [HASH(int_col)] -| hosts=3 per-host-mem=unavailable +| hosts=3 per-host-mem=0B | tuple-ids=0 row-size=8B cardinality=unavailable | 00:SCAN HDFS [functional_parquet.alltypes, RANDOM] @@ -153,7 +153,7 @@ PLAN-ROOT SINK predicates: id < 10 table stats: unavailable column stats: unavailable - hosts=3 per-host-mem=unavailable + hosts=3 per-host-mem=16.00MB tuple-ids=0 row-size=8B cardinality=unavailable ==== # Nested-loop join in a subplan should work. @@ -216,39 +216,39 @@ PLAN-ROOT SINK | tuple-ids=2,1,0 row-size=562B cardinality=1500000 | 01:SUBPLAN -| hosts=3 per-host-mem=unavailable +| hosts=3 per-host-mem=0B | tuple-ids=2,1,0 row-size=562B cardinality=1500000 | |--08:NESTED LOOP JOIN [CROSS JOIN] -| | hosts=3 per-host-mem=unavailable +| | hosts=3 per-host-mem=254B | | tuple-ids=2,1,0 row-size=562B cardinality=100 | | | |--02:SINGULAR ROW SRC | | parent-subplan=01 -| | hosts=3 per-host-mem=unavailable +| | hosts=3 per-host-mem=0B | | tuple-ids=0 row-size=254B cardinality=1 | | | 04:SUBPLAN -| | hosts=3 per-host-mem=unavailable +| | hosts=3 per-host-mem=0B | | tuple-ids=2,1 row-size=308B cardinality=100 | | | |--07:NESTED LOOP JOIN [CROSS JOIN] -| | | hosts=3 per-host-mem=unavailable +| | | hosts=3 per-host-mem=124B | | | tuple-ids=2,1 row-size=308B cardinality=10 | | | | | |--05:SINGULAR ROW SRC | | | parent-subplan=04 -| | | hosts=3 per-host-mem=unavailable +| | | hosts=3 per-host-mem=0B | | | tuple-ids=1 row-size=124B cardinality=1 | | | | | 06:UNNEST [o.o_lineitems] | | parent-subplan=04 -| | hosts=3 per-host-mem=unavailable +| | hosts=3 per-host-mem=0B | | tuple-ids=2 row-size=0B cardinality=10 | | | 03:UNNEST [c.c_orders o] | parent-subplan=01 -| hosts=3 per-host-mem=unavailable +| hosts=3 per-host-mem=0B | tuple-ids=1 row-size=0B cardinality=10 | 00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM] @@ -258,7 +258,7 @@ PLAN-ROOT SINK predicates on o_lineitems: l_linenumber < 3 table stats: 150000 rows total columns missing stats: c_orders - hosts=3 per-host-mem=unavailable + hosts=3 per-host-mem=88.00MB tuple-ids=0 row-size=254B cardinality=15000 ==== # Hash-join in a subplan should work. @@ -312,31 +312,31 @@ PLAN-ROOT SINK | tuple-ids=1,0,2 row-size=286B cardinality=1500000 | 01:SUBPLAN -| hosts=3 per-host-mem=unavailable +| hosts=3 per-host-mem=0B | tuple-ids=1,0,2 row-size=286B cardinality=1500000 | |--06:HASH JOIN [INNER JOIN] | | hash predicates: o1.o_orderkey = o2.o_orderkey + 2 -| | hosts=3 per-host-mem=unavailable +| | hosts=3 per-host-mem=0B | | tuple-ids=1,0,2 row-size=286B cardinality=10 | | | |--04:UNNEST [c.c_orders o2] | | parent-subplan=01 -| | hosts=3 per-host-mem=unavailable +| | hosts=3 per-host-mem=0B | | tuple-ids=2 row-size=0B cardinality=10 | | | 05:NESTED LOOP JOIN [CROSS JOIN] -| | hosts=3 per-host-mem=unavailable +| | hosts=3 per-host-mem=270B | | tuple-ids=1,0 row-size=278B cardinality=10 | | | |--02:SINGULAR ROW SRC | | parent-subplan=01 -| | hosts=3 per-host-mem=unavailable +| | hosts=3 per-host-mem=0B | | tuple-ids=0 row-size=270B cardinality=1 | | | 03:UNNEST [c.c_orders o1] | parent-subplan=01 -| hosts=3 per-host-mem=unavailable +| hosts=3 per-host-mem=0B | tuple-ids=1 row-size=0B cardinality=10 | 00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM] @@ -345,6 +345,6 @@ PLAN-ROOT SINK predicates on o1: o1.o_orderkey < 5 table stats: 150000 rows total columns missing stats: c_orders, c_orders - hosts=3 per-host-mem=unavailable + hosts=3 per-host-mem=88.00MB tuple-ids=0 row-size=270B cardinality=150000 ====
