Repository: incubator-impala
Updated Branches:
  refs/heads/master 29faca568 -> 9b507b6ed


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/scheduling/simple-scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.cc 
b/be/src/scheduling/simple-scheduler.cc
index bc500c8..9ffc273 100644
--- a/be/src/scheduling/simple-scheduler.cc
+++ b/be/src/scheduling/simple-scheduler.cc
@@ -31,7 +31,6 @@
 #include "common/logging.h"
 #include "util/metrics.h"
 #include "runtime/exec-env.h"
-#include "runtime/coordinator.h"
 #include "service/impala-server.h"
 
 #include "statestore/statestore-subscriber.h"
@@ -290,55 +289,17 @@ SimpleScheduler::BackendConfigPtr 
SimpleScheduler::GetBackendConfig() const {
   return backend_config;
 }
 
-void SimpleScheduler::SetBackendConfig(const BackendConfigPtr& backend_config)
-{
+void SimpleScheduler::SetBackendConfig(const BackendConfigPtr& backend_config) 
{
   lock_guard<mutex> l(backend_config_lock_);
   backend_config_ = backend_config;
 }
 
-Status SimpleScheduler::ComputeScanRangeAssignment(const TQueryExecRequest& 
exec_request,
-    QuerySchedule* schedule) {
-  map<TPlanNodeId, vector<TScanRangeLocations>>::const_iterator entry;
-  RuntimeProfile::Counter* total_assignment_timer =
-      ADD_TIMER(schedule->summary_profile(), 
"ComputeScanRangeAssignmentTimer");
-
-  BackendConfigPtr backend_config = GetBackendConfig();
-
-  for (entry = exec_request.per_node_scan_ranges.begin();
-      entry != exec_request.per_node_scan_ranges.end(); ++entry) {
-    const TPlanNodeId node_id = entry->first;
-    FragmentIdx fragment_idx = schedule->GetFragmentIdx(node_id);
-    const TPlanFragment& fragment = exec_request.fragments[fragment_idx];
-    bool exec_at_coord = (fragment.partition.type == 
TPartitionType::UNPARTITIONED);
-
-    const TPlanNode& node = fragment.plan.nodes[schedule->GetNodeIdx(node_id)];
-    DCHECK_EQ(node.node_id, node_id);
-
-    const TReplicaPreference::type* node_replica_preference = 
node.__isset.hdfs_scan_node
-        && node.hdfs_scan_node.__isset.replica_preference
-        ? &node.hdfs_scan_node.replica_preference : NULL;
-
-    bool node_random_replica = node.__isset.hdfs_scan_node
-        && node.hdfs_scan_node.__isset.random_replica
-        && node.hdfs_scan_node.random_replica;
-
-    FragmentScanRangeAssignment* assignment =
-        &(*schedule->exec_params())[fragment_idx].scan_range_assignment;
-    RETURN_IF_ERROR(ComputeScanRangeAssignment(*backend_config,
-        node_id, node_replica_preference, node_random_replica, entry->second,
-        exec_request.host_list, exec_at_coord, schedule->query_options(),
-        total_assignment_timer, assignment));
-    schedule->IncNumScanRanges(entry->second.size());
-  }
-  return Status::OK();
-}
-
-Status SimpleScheduler::MtComputeScanRangeAssignment(QuerySchedule* schedule) {
+Status SimpleScheduler::ComputeScanRangeAssignment(QuerySchedule* schedule) {
   RuntimeProfile::Counter* total_assignment_timer =
       ADD_TIMER(schedule->summary_profile(), 
"ComputeScanRangeAssignmentTimer");
   BackendConfigPtr backend_config = GetBackendConfig();
   const TQueryExecRequest& exec_request = schedule->request();
-  for (const TPlanExecInfo& plan_exec_info: exec_request.mt_plan_exec_info) {
+  for (const TPlanExecInfo& plan_exec_info: exec_request.plan_exec_info) {
     for (const auto& entry: plan_exec_info.per_node_scan_ranges) {
       const TPlanNodeId node_id = entry.first;
       const TPlanFragment& fragment = schedule->GetContainingFragment(node_id);
@@ -366,38 +327,34 @@ Status 
SimpleScheduler::MtComputeScanRangeAssignment(QuerySchedule* schedule) {
   return Status::OK();
 }
 
-void SimpleScheduler::MtComputeFragmentExecParams(QuerySchedule* schedule) {
+void SimpleScheduler::ComputeFragmentExecParams(QuerySchedule* schedule) {
   const TQueryExecRequest& exec_request = schedule->request();
 
   // for each plan, compute the FInstanceExecParams for the tree of fragments
-  for (const TPlanExecInfo& plan_exec_info: exec_request.mt_plan_exec_info) {
+  for (const TPlanExecInfo& plan_exec_info: exec_request.plan_exec_info) {
     // set instance_id, host, per_node_scan_ranges
-    MtComputeFragmentExecParams(
-        plan_exec_info,
+    ComputeFragmentExecParams(plan_exec_info,
         schedule->GetFragmentExecParams(plan_exec_info.fragments[0].idx),
         schedule);
 
     // Set destinations, per_exch_num_senders, sender_id.
-    // fragments[f] sends its output to fragments[dest_fragment_idx[f-1]];
-    // fragments[0] is an endpoint.
-    for (int i = 0; i < plan_exec_info.dest_fragment_idx.size(); ++i) {
-      int dest_idx = plan_exec_info.dest_fragment_idx[i];
+    for (const TPlanFragment& src_fragment: plan_exec_info.fragments) {
+      if (!src_fragment.output_sink.__isset.stream_sink) continue;
+      FragmentIdx dest_idx =
+          
schedule->GetFragmentIdx(src_fragment.output_sink.stream_sink.dest_node_id);
       DCHECK_LT(dest_idx, plan_exec_info.fragments.size());
       const TPlanFragment& dest_fragment = plan_exec_info.fragments[dest_idx];
-      DCHECK_LT(i + 1, plan_exec_info.fragments.size());
-      const TPlanFragment& src_fragment = plan_exec_info.fragments[i + 1];
-      DCHECK(src_fragment.output_sink.__isset.stream_sink);
-      MtFragmentExecParams* dest_params =
+      FragmentExecParams* dest_params =
           schedule->GetFragmentExecParams(dest_fragment.idx);
-      MtFragmentExecParams* src_params =
+      FragmentExecParams* src_params =
           schedule->GetFragmentExecParams(src_fragment.idx);
 
       // populate src_params->destinations
       
src_params->destinations.resize(dest_params->instance_exec_params.size());
-      for (int j = 0; j < dest_params->instance_exec_params.size(); ++j) {
-        TPlanFragmentDestination& dest = src_params->destinations[j];
-        
dest.__set_fragment_instance_id(dest_params->instance_exec_params[j].instance_id);
-        dest.__set_server(dest_params->instance_exec_params[j].host);
+      for (int i = 0; i < dest_params->instance_exec_params.size(); ++i) {
+        TPlanFragmentDestination& dest = src_params->destinations[i];
+        
dest.__set_fragment_instance_id(dest_params->instance_exec_params[i].instance_id);
+        dest.__set_server(dest_params->instance_exec_params[i].host);
       }
 
       // enumerate senders consecutively;
@@ -411,26 +368,24 @@ void 
SimpleScheduler::MtComputeFragmentExecParams(QuerySchedule* schedule) {
       int sender_id_base = dest_params->per_exch_num_senders[exch_id];
       dest_params->per_exch_num_senders[exch_id] +=
           src_params->instance_exec_params.size();
-      for (int j = 0; j < src_params->instance_exec_params.size(); ++j) {
-        FInstanceExecParams& src_instance_params = 
src_params->instance_exec_params[j];
-        src_instance_params.sender_id = sender_id_base + j;
+      for (int i = 0; i < src_params->instance_exec_params.size(); ++i) {
+        FInstanceExecParams& src_instance_params = 
src_params->instance_exec_params[i];
+        src_instance_params.sender_id = sender_id_base + i;
       }
     }
   }
 }
 
-void SimpleScheduler::MtComputeFragmentExecParams(
-    const TPlanExecInfo& plan_exec_info, MtFragmentExecParams* fragment_params,
+void SimpleScheduler::ComputeFragmentExecParams(
+    const TPlanExecInfo& plan_exec_info, FragmentExecParams* fragment_params,
     QuerySchedule* schedule) {
   // traverse input fragments
   for (FragmentIdx input_fragment_idx: fragment_params->input_fragments) {
-    MtComputeFragmentExecParams(
+    ComputeFragmentExecParams(
         plan_exec_info, schedule->GetFragmentExecParams(input_fragment_idx), 
schedule);
   }
 
   const TPlanFragment& fragment = fragment_params->fragment;
-  // TODO: deal with Union nodes
-  DCHECK(!ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE));
   // case 1: single instance executed at coordinator
   if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
     const TNetworkAddress& coord = local_backend_descriptor_.address;
@@ -439,7 +394,7 @@ void SimpleScheduler::MtComputeFragmentExecParams(
         ? schedule->query_id()
         : schedule->GetNextInstanceId();
     fragment_params->instance_exec_params.emplace_back(
-      instance_id, coord, 0, *fragment_params);
+        instance_id, coord, 0, *fragment_params);
     FInstanceExecParams& instance_params = 
fragment_params->instance_exec_params.back();
 
     // That instance gets all of the scan ranges, if there are any.
@@ -448,25 +403,81 @@ void SimpleScheduler::MtComputeFragmentExecParams(
       auto first_entry = fragment_params->scan_range_assignment.begin();
       instance_params.per_node_scan_ranges = first_entry->second;
     }
+
+    return;
+  }
+
+  if (ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)) {
+    CreateUnionInstances(fragment_params, schedule);
+    return;
+  }
+
+  PlanNodeId leftmost_scan_id = FindLeftmostScan(fragment.plan);
+  if (leftmost_scan_id != 
g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) {
+    // case 2: leaf fragment with leftmost scan
+    // TODO: check that there's only one scan in this fragment
+    CreateScanInstances(leftmost_scan_id, fragment_params, schedule);
   } else {
-    PlanNodeId leftmost_scan_id = FindLeftmostScan(fragment.plan);
-    if (leftmost_scan_id != 
g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) {
-      // case 2: leaf fragment with leftmost scan
-      // TODO: check that there's only one scan in this fragment
-      MtCreateScanInstances(leftmost_scan_id, fragment_params, schedule);
-    } else {
-      // case 3: interior fragment without leftmost scan
-      // we assign the same hosts as those of our leftmost input fragment (so 
that a
-      // merge aggregation fragment runs on the hosts that provide the input 
data)
-      MtCreateMirrorInstances(fragment_params, schedule);
+    // case 3: interior fragment without leftmost scan
+    // we assign the same hosts as those of our leftmost input fragment (so 
that a
+    // merge aggregation fragment runs on the hosts that provide the input 
data)
+    CreateCollocatedInstances(fragment_params, schedule);
+  }
+}
+
+void SimpleScheduler::CreateUnionInstances(
+    FragmentExecParams* fragment_params, QuerySchedule* schedule) {
+  const TPlanFragment& fragment = fragment_params->fragment;
+  DCHECK(ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE));
+
+  // Add hosts of scan nodes.
+  vector<TPlanNodeType::type> scan_node_types {
+    TPlanNodeType::HDFS_SCAN_NODE, TPlanNodeType::HBASE_SCAN_NODE,
+    TPlanNodeType::DATA_SOURCE_NODE, TPlanNodeType::KUDU_SCAN_NODE};
+  vector<TPlanNodeId> scan_node_ids;
+  FindNodes(fragment.plan, scan_node_types, &scan_node_ids);
+  vector<TNetworkAddress> scan_hosts;
+  for (TPlanNodeId id: scan_node_ids) GetScanHosts(id, *fragment_params, 
&scan_hosts);
+
+  unordered_set<TNetworkAddress> hosts(scan_hosts.begin(), scan_hosts.end());
+
+  // Add hosts of input fragments.
+  for (FragmentIdx idx: fragment_params->input_fragments) {
+    const FragmentExecParams& input_params = 
*schedule->GetFragmentExecParams(idx);
+    for (const FInstanceExecParams& instance_params: 
input_params.instance_exec_params) {
+      hosts.insert(instance_params.host);
+    }
+  }
+  DCHECK(!hosts.empty())
+      << "no hosts for fragment " << fragment.idx << " with a UnionNode";
+
+  // create a single instance per host
+  // TODO-MT: figure out how to parallelize Union
+  int per_fragment_idx = 0;
+  for (const TNetworkAddress& host: hosts) {
+    fragment_params->instance_exec_params.emplace_back(
+        schedule->GetNextInstanceId(), host, per_fragment_idx++, 
*fragment_params);
+    // assign all scan ranges
+    FInstanceExecParams& instance_params = 
fragment_params->instance_exec_params.back();
+    if (fragment_params->scan_range_assignment.count(host) > 0) {
+      instance_params.per_node_scan_ranges = 
fragment_params->scan_range_assignment[host];
     }
   }
 }
 
-void SimpleScheduler::MtCreateScanInstances(
-    PlanNodeId leftmost_scan_id, MtFragmentExecParams* fragment_params,
+void SimpleScheduler::CreateScanInstances(
+    PlanNodeId leftmost_scan_id, FragmentExecParams* fragment_params,
     QuerySchedule* schedule) {
   int max_num_instances = 
schedule->request().query_ctx.request.query_options.mt_dop;
+  if (max_num_instances == 0) max_num_instances = 1;
+
+  if (fragment_params->scan_range_assignment.empty()) {
+    // this scan doesn't have any scan ranges: run a single instance on the 
coordinator
+    fragment_params->instance_exec_params.emplace_back(
+        schedule->GetNextInstanceId(), local_backend_descriptor_.address, 0, 
*fragment_params);
+    return;
+  }
+
   for (const auto& assignment_entry: fragment_params->scan_range_assignment) {
     // evenly divide up the scan ranges of the leftmost scan between at most
     // <dop> instances
@@ -477,9 +488,13 @@ void SimpleScheduler::MtCreateScanInstances(
 
     int64 total_size = 0;
     for (const TScanRangeParams& params: params_list) {
-      // TODO: implement logic for hbase and kudu
-      DCHECK(params.scan_range.__isset.hdfs_file_split);
-      total_size += params.scan_range.hdfs_file_split.length;
+      if (params.scan_range.__isset.hdfs_file_split) {
+        total_size += params.scan_range.hdfs_file_split.length;
+      } else {
+        // fake load-balancing for Kudu and Hbase: every split has length 1
+        // TODO: implement more accurate logic for Kudu and Hbase
+        ++total_size;
+      }
     }
 
     // try to load-balance scan ranges by assigning just beyond the average 
number of
@@ -507,7 +522,12 @@ void SimpleScheduler::MtCreateScanInstances(
         const TScanRangeParams& scan_range_params = params_list[params_idx];
         instance_params.per_node_scan_ranges[leftmost_scan_id].push_back(
             scan_range_params);
-        total_assigned_bytes += 
scan_range_params.scan_range.hdfs_file_split.length;
+        if (scan_range_params.scan_range.__isset.hdfs_file_split) {
+          total_assigned_bytes += 
scan_range_params.scan_range.hdfs_file_split.length;
+        } else {
+          // for Kudu and Hbase every split has length 1
+          ++total_assigned_bytes;
+        }
         ++params_idx;
       }
       if (params_idx >= params_list.size()) break;  // nothing left to assign
@@ -516,10 +536,10 @@ void SimpleScheduler::MtCreateScanInstances(
   }
 }
 
-void SimpleScheduler::MtCreateMirrorInstances(
-    MtFragmentExecParams* fragment_params, QuerySchedule* schedule) {
+void SimpleScheduler::CreateCollocatedInstances(
+    FragmentExecParams* fragment_params, QuerySchedule* schedule) {
   DCHECK_GE(fragment_params->input_fragments.size(), 1);
-  const MtFragmentExecParams* input_fragment_params =
+  const FragmentExecParams* input_fragment_params =
       schedule->GetFragmentExecParams(fragment_params->input_fragments[0]);
   int per_fragment_instance_idx = 0;
   for (const FInstanceExecParams& input_instance_params:
@@ -533,7 +553,7 @@ void SimpleScheduler::MtCreateMirrorInstances(
 Status SimpleScheduler::ComputeScanRangeAssignment(
     const BackendConfig& backend_config, PlanNodeId node_id,
     const TReplicaPreference::type* node_replica_preference, bool 
node_random_replica,
-    const vector<TScanRangeLocations>& locations,
+    const vector<TScanRangeLocationList>& locations,
     const vector<TNetworkAddress>& host_list, bool exec_at_coord,
     const TQueryOptions& query_options, RuntimeProfile::Counter* timer,
     FragmentScanRangeAssignment* assignment) {
@@ -564,11 +584,11 @@ Status SimpleScheduler::ComputeScanRangeAssignment(
   AssignmentCtx assignment_ctx(backend_config, total_assignments_,
       total_local_assignments_);
 
-  vector<const TScanRangeLocations*> remote_scan_range_locations;
+  vector<const TScanRangeLocationList*> remote_scan_range_locations;
 
   // Loop over all scan ranges, select a backend for those with local impalads 
and collect
   // all others for later processing.
-  for (const TScanRangeLocations& scan_range_locations: locations) {
+  for (const TScanRangeLocationList& scan_range_locations: locations) {
     TReplicaPreference::type min_distance = TReplicaPreference::REMOTE;
 
     // Select backend host for the current scan range.
@@ -643,7 +663,7 @@ Status SimpleScheduler::ComputeScanRangeAssignment(
   }  // End of for loop over scan ranges.
 
   // Assign remote scans to backends.
-  for (const TScanRangeLocations* scan_range_locations: 
remote_scan_range_locations) {
+  for (const TScanRangeLocationList* scan_range_locations: 
remote_scan_range_locations) {
     const IpAddr* backend_ip = assignment_ctx.SelectRemoteBackendHost();
     TBackendDescriptor backend;
     assignment_ctx.SelectBackendOnHost(*backend_ip, &backend);
@@ -656,132 +676,6 @@ Status SimpleScheduler::ComputeScanRangeAssignment(
   return Status::OK();
 }
 
-void SimpleScheduler::ComputeFragmentExecParams(const TQueryExecRequest& 
exec_request,
-    QuerySchedule* schedule) {
-  vector<FragmentExecParams>* fragment_exec_params = schedule->exec_params();
-  // assign instance ids
-  int64_t num_fragment_instances = 0;
-  for (FragmentExecParams& params: *fragment_exec_params) {
-    for (int j = 0; j < params.hosts.size(); ++j, ++num_fragment_instances) {
-      params.instance_ids.push_back(
-          CreateInstanceId(schedule->query_id(), num_fragment_instances));
-    }
-  }
-
-  // compute destinations and # senders per exchange node
-  // (the root fragment doesn't have a destination)
-  for (int i = 1; i < fragment_exec_params->size(); ++i) {
-    FragmentExecParams& params = (*fragment_exec_params)[i];
-    int dest_fragment_idx = exec_request.dest_fragment_idx[i - 1];
-    DCHECK_LT(dest_fragment_idx, fragment_exec_params->size());
-    FragmentExecParams& dest_params = 
(*fragment_exec_params)[dest_fragment_idx];
-
-    // set # of senders
-    DCHECK(exec_request.fragments[i].output_sink.__isset.stream_sink);
-    const TDataStreamSink& sink = 
exec_request.fragments[i].output_sink.stream_sink;
-    // we can only handle unpartitioned (= broadcast), random-partitioned or
-    // hash-partitioned output at the moment
-    DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED
-           || sink.output_partition.type == TPartitionType::HASH_PARTITIONED
-           || sink.output_partition.type == TPartitionType::RANDOM);
-    PlanNodeId exch_id = sink.dest_node_id;
-    // we might have multiple fragments sending to this exchange node
-    // (distributed MERGE), which is why we need to add up the #senders
-    params.sender_id_base = dest_params.per_exch_num_senders[exch_id];
-    dest_params.per_exch_num_senders[exch_id] += params.hosts.size();
-
-    // create one TPlanFragmentDestination per destination host
-    params.destinations.resize(dest_params.hosts.size());
-    for (int j = 0; j < dest_params.hosts.size(); ++j) {
-      TPlanFragmentDestination& dest = params.destinations[j];
-      dest.fragment_instance_id = dest_params.instance_ids[j];
-      dest.server = dest_params.hosts[j];
-      VLOG_RPC  << "dest for fragment " << i << ":"
-                << " instance_id=" << dest.fragment_instance_id
-                << " server=" << dest.server;
-    }
-  }
-}
-
-void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& 
exec_request,
-    QuerySchedule* schedule) {
-  vector<FragmentExecParams>* fragment_exec_params = schedule->exec_params();
-  DCHECK_EQ(fragment_exec_params->size(), exec_request.fragments.size());
-
-  // compute hosts of producer fragment before those of consumer fragment(s),
-  // the latter might inherit the set of hosts from the former
-  for (int i = exec_request.fragments.size() - 1; i >= 0; --i) {
-    const TPlanFragment& fragment = exec_request.fragments[i];
-    FragmentExecParams& params = (*fragment_exec_params)[i];
-    if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
-      // all single-node fragments run on the coordinator host
-      params.hosts.push_back(local_backend_descriptor_.address);
-      continue;
-    }
-
-    // UnionNodes are special because they can consume multiple partitioned 
inputs,
-    // as well as execute multiple scans in the same fragment.
-    // Fragments containing a UnionNode are executed on the union of hosts of 
all
-    // scans in the fragment as well as the hosts of all its input fragments 
(s.t.
-    // a UnionNode with partitioned joins or grouping aggregates as children 
runs on
-    // at least as many hosts as the input to those children).
-    if (ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)) {
-      vector<TPlanNodeType::type> scan_node_types {
-        TPlanNodeType::HDFS_SCAN_NODE, TPlanNodeType::HBASE_SCAN_NODE,
-        TPlanNodeType::DATA_SOURCE_NODE, TPlanNodeType::KUDU_SCAN_NODE};
-      vector<TPlanNodeId> scan_nodes;
-      FindNodes(fragment.plan, scan_node_types, &scan_nodes);
-      vector<TPlanNodeId> exch_nodes;
-      FindNodes(fragment.plan,
-          vector<TPlanNodeType::type>(1, TPlanNodeType::EXCHANGE_NODE),
-          &exch_nodes);
-
-      // Add hosts of scan nodes.
-      vector<TNetworkAddress> scan_hosts;
-      for (int j = 0; j < scan_nodes.size(); ++j) {
-        GetScanHosts(scan_nodes[j], exec_request, params, &scan_hosts);
-      }
-      unordered_set<TNetworkAddress> hosts(scan_hosts.begin(), 
scan_hosts.end());
-
-      // Add hosts of input fragments.
-      for (int j = 0; j < exch_nodes.size(); ++j) {
-        int input_fragment_idx = FindSenderFragment(exch_nodes[j], i, 
exec_request);
-        const vector<TNetworkAddress>& input_fragment_hosts =
-            (*fragment_exec_params)[input_fragment_idx].hosts;
-        hosts.insert(input_fragment_hosts.begin(), input_fragment_hosts.end());
-      }
-      DCHECK(!hosts.empty()) << "no hosts for fragment " << i << " with a 
UnionNode";
-
-      params.hosts.assign(hosts.begin(), hosts.end());
-      continue;
-    }
-
-    PlanNodeId leftmost_scan_id = FindLeftmostScan(fragment.plan);
-    if (leftmost_scan_id == 
g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) {
-      // there is no leftmost scan; we assign the same hosts as those of our
-      // leftmost input fragment (so that a partitioned aggregation fragment
-      // runs on the hosts that provide the input data)
-      int input_fragment_idx = FindLeftmostInputFragment(i, exec_request);
-      DCHECK_GE(input_fragment_idx, 0);
-      DCHECK_LT(input_fragment_idx, fragment_exec_params->size());
-      params.hosts = (*fragment_exec_params)[input_fragment_idx].hosts;
-      // TODO: switch to unpartitioned/coord execution if our input fragment
-      // is executed that way (could have been downgraded from distributed)
-      continue;
-    }
-
-    // This fragment is executed on those hosts that have scan ranges
-    // for the leftmost scan.
-    GetScanHosts(leftmost_scan_id, exec_request, params, &params.hosts);
-  }
-
-  unordered_set<TNetworkAddress> unique_hosts;
-  for (const FragmentExecParams& exec_params: *fragment_exec_params) {
-    unique_hosts.insert(exec_params.hosts.begin(), exec_params.hosts.end());
-  }
-  schedule->SetUniqueHosts(unique_hosts);
-}
-
 PlanNodeId SimpleScheduler::FindLeftmostNode(
     const TPlan& plan, const vector<TPlanNodeType::type>& types) {
   // the first node with num_children == 0 is the leftmost node
@@ -827,11 +721,17 @@ void SimpleScheduler::FindNodes(const TPlan& plan,
 }
 
 void SimpleScheduler::GetScanHosts(TPlanNodeId scan_id,
-    const TQueryExecRequest& exec_request, const FragmentExecParams& params,
-    vector<TNetworkAddress>* scan_hosts) {
-  map<TPlanNodeId, vector<TScanRangeLocations>>::const_iterator entry =
-      exec_request.per_node_scan_ranges.find(scan_id);
-  if (entry == exec_request.per_node_scan_ranges.end() || 
entry->second.empty()) {
+    const FragmentExecParams& params, vector<TNetworkAddress>* scan_hosts) {
+  // Get the list of impalad host from scan_range_assignment_
+  for (const FragmentScanRangeAssignment::value_type& scan_range_assignment:
+      params.scan_range_assignment) {
+    const PerNodeScanRanges& per_node_scan_ranges = 
scan_range_assignment.second;
+    if (per_node_scan_ranges.find(scan_id) != per_node_scan_ranges.end()) {
+      scan_hosts->push_back(scan_range_assignment.first);
+    }
+  }
+
+  if (scan_hosts->empty()) {
     // this scan node doesn't have any scan ranges; run it on the coordinator
     // TODO: we'll need to revisit this strategy once we can partition joins
     // (in which case this fragment might be executing a right outer join
@@ -839,40 +739,6 @@ void SimpleScheduler::GetScanHosts(TPlanNodeId scan_id,
     scan_hosts->push_back(local_backend_descriptor_.address);
     return;
   }
-
-  // Get the list of impalad host from scan_range_assignment_
-  for (const FragmentScanRangeAssignment::value_type& scan_range_assignment:
-      params.scan_range_assignment) {
-    scan_hosts->push_back(scan_range_assignment.first);
-  }
-}
-
-int SimpleScheduler::FindLeftmostInputFragment(
-    int fragment_idx, const TQueryExecRequest& exec_request) {
-  // find the leftmost node, which we expect to be an exchage node
-  vector<TPlanNodeType::type> exch_node_type;
-  exch_node_type.push_back(TPlanNodeType::EXCHANGE_NODE);
-  PlanNodeId exch_id =
-      FindLeftmostNode(exec_request.fragments[fragment_idx].plan, 
exch_node_type);
-  if (exch_id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) {
-    return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID;
-  }
-  // find the fragment that sends to this exchange node
-  return FindSenderFragment(exch_id, fragment_idx, exec_request);
-}
-
-int SimpleScheduler::FindSenderFragment(TPlanNodeId exch_id, int fragment_idx,
-    const TQueryExecRequest& exec_request) {
-  for (int i = 0; i < exec_request.dest_fragment_idx.size(); ++i) {
-    if (exec_request.dest_fragment_idx[i] != fragment_idx) continue;
-    const TPlanFragment& input_fragment = exec_request.fragments[i + 1];
-    DCHECK(input_fragment.__isset.output_sink);
-    DCHECK(input_fragment.output_sink.__isset.stream_sink);
-    if (input_fragment.output_sink.stream_sink.dest_node_id == exch_id) return 
i + 1;
-  }
-  // this shouldn't happen
-  DCHECK(false) << "no fragment sends to exch id " << exch_id;
-  return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID;
 }
 
 Status SimpleScheduler::Schedule(QuerySchedule* schedule) {
@@ -882,29 +748,26 @@ Status SimpleScheduler::Schedule(QuerySchedule* schedule) 
{
   schedule->set_request_pool(resolved_pool);
   schedule->summary_profile()->AddInfoString("Request Pool", resolved_pool);
 
-  bool is_mt_execution = 
schedule->request().query_ctx.request.query_options.mt_dop > 0;
-  if (is_mt_execution) {
-    RETURN_IF_ERROR(MtComputeScanRangeAssignment(schedule));
-    MtComputeFragmentExecParams(schedule);
-
-    // compute unique hosts
-    unordered_set<TNetworkAddress> unique_hosts;
-    for (const MtFragmentExecParams& f: schedule->mt_fragment_exec_params()) {
-      for (const FInstanceExecParams& i: f.instance_exec_params) {
-        unique_hosts.insert(i.host);
-      }
-    }
-    schedule->SetUniqueHosts(unique_hosts);
+  RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule));
+  ComputeFragmentExecParams(schedule);
+#ifndef NDEBUG
+  schedule->Validate();
+#endif
 
-    // TODO-MT: call AdmitQuery()
-  } else {
-    RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule->request(), schedule));
-    ComputeFragmentHosts(schedule->request(), schedule);
-    ComputeFragmentExecParams(schedule->request(), schedule);
-    if (!FLAGS_disable_admission_control) {
-      RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule));
+  // compute unique hosts
+  unordered_set<TNetworkAddress> unique_hosts;
+  for (const FragmentExecParams& f: schedule->fragment_exec_params()) {
+    for (const FInstanceExecParams& i: f.instance_exec_params) {
+      unique_hosts.insert(i.host);
     }
   }
+  schedule->SetUniqueHosts(unique_hosts);
+
+  // TODO-MT: call AdmitQuery()
+  bool is_mt_execution = 
schedule->request().query_ctx.request.query_options.mt_dop > 0;
+  if (!is_mt_execution && !FLAGS_disable_admission_control) {
+    RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule));
+  }
   return Status::OK();
 }
 
@@ -1020,7 +883,7 @@ void 
SimpleScheduler::AssignmentCtx::SelectBackendOnHost(const IpAddr& backend_i
 void SimpleScheduler::AssignmentCtx::RecordScanRangeAssignment(
     const TBackendDescriptor& backend, PlanNodeId node_id,
     const vector<TNetworkAddress>& host_list,
-    const TScanRangeLocations& scan_range_locations,
+    const TScanRangeLocationList& scan_range_locations,
     FragmentScanRangeAssignment* assignment) {
   int64_t scan_range_length = 0;
   if (scan_range_locations.scan_range.__isset.hdfs_file_split) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/scheduling/simple-scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.h 
b/be/src/scheduling/simple-scheduler.h
index 6a9d6db..cc68432 100644
--- a/be/src/scheduling/simple-scheduler.h
+++ b/be/src/scheduling/simple-scheduler.h
@@ -40,8 +40,6 @@
 
 namespace impala {
 
-class Coordinator;
-
 namespace test {
 class SchedulerWrapper;
 }
@@ -199,7 +197,7 @@ class SimpleScheduler : public Scheduler {
     /// scan range and its replica locations.
     void RecordScanRangeAssignment(const TBackendDescriptor& backend, 
PlanNodeId node_id,
         const vector<TNetworkAddress>& host_list,
-        const TScanRangeLocations& scan_range_locations,
+        const TScanRangeLocationList& scan_range_locations,
         FragmentScanRangeAssignment* assignment);
 
     const BackendConfig& backend_config() const { return backend_config_; }
@@ -331,11 +329,11 @@ class SimpleScheduler : public Scheduler {
   Status GetRequestPool(const std::string& user, const TQueryOptions& 
query_options,
       std::string* pool) const;
 
-  /// Compute the assignment of scan ranges to hosts for each scan node in 
'schedule'.
+  /// Compute the assignment of scan ranges to hosts for each scan node in
+  /// the schedule's TQueryExecRequest.plan_exec_info.
   /// Unpartitioned fragments are assigned to the coordinator. Populate the 
schedule's
   /// fragment_exec_params_ with the resulting scan range assignment.
-  Status ComputeScanRangeAssignment(const TQueryExecRequest& exec_request,
-      QuerySchedule* schedule);
+  Status ComputeScanRangeAssignment(QuerySchedule* schedule);
 
   /// Process the list of scan ranges of a single plan node and compute scan 
range
   /// assignments (returned in 'assignment'). The result is a mapping from 
hosts to their
@@ -397,50 +395,49 @@ class SimpleScheduler : public Scheduler {
   /// assignment:              Output parameter, to which new assignments will 
be added.
   Status ComputeScanRangeAssignment(const BackendConfig& backend_config,
       PlanNodeId node_id, const TReplicaPreference::type* 
node_replica_preference,
-      bool node_random_replica, const std::vector<TScanRangeLocations>& 
locations,
+      bool node_random_replica, const std::vector<TScanRangeLocationList>& 
locations,
       const std::vector<TNetworkAddress>& host_list, bool exec_at_coord,
       const TQueryOptions& query_options, RuntimeProfile::Counter* timer,
       FragmentScanRangeAssignment* assignment);
 
-  /// Populate fragment_exec_params_ in schedule.
-  void ComputeFragmentExecParams(const TQueryExecRequest& exec_request,
-      QuerySchedule* schedule);
-
-  /// Compute the assignment of scan ranges to hosts for each scan node in
-  /// the schedule's TQueryExecRequest.mt_plan_exec_info.
-  /// Unpartitioned fragments are assigned to the coordinator. Populate the 
schedule's
-  /// mt_fragment_exec_params_ with the resulting scan range assignment.
-  Status MtComputeScanRangeAssignment(QuerySchedule* schedule);
-
-  /// Compute the MtFragmentExecParams for all plans in the schedule's
-  /// TQueryExecRequest.mt_plan_exec_info.
+  /// Compute the FragmentExecParams for all plans in the schedule's
+  /// TQueryExecRequest.plan_exec_info.
   /// This includes the routing information (destinations, 
per_exch_num_senders,
   /// sender_id)
-  void MtComputeFragmentExecParams(QuerySchedule* schedule);
+  void ComputeFragmentExecParams(QuerySchedule* schedule);
 
   /// Recursively create FInstanceExecParams and set per_node_scan_ranges for
   /// fragment_params and its input fragments via a depth-first traversal.
   /// All fragments are part of plan_exec_info.
-  void MtComputeFragmentExecParams(const TPlanExecInfo& plan_exec_info,
-      MtFragmentExecParams* fragment_params, QuerySchedule* schedule);
+  void ComputeFragmentExecParams(const TPlanExecInfo& plan_exec_info,
+      FragmentExecParams* fragment_params, QuerySchedule* schedule);
+
+  /// Create instances of the fragment corresponding to fragment_params, which 
contains
+  /// a Union node.
+  /// UnionNodes are special because they can consume multiple partitioned 
inputs,
+  /// as well as execute multiple scans in the same fragment.
+  /// Fragments containing a UnionNode are executed on the union of hosts of 
all
+  /// scans in the fragment as well as the hosts of all its input fragments 
(s.t.
+  /// a UnionNode with partitioned joins or grouping aggregates as children 
runs on
+  /// at least as many hosts as the input to those children).
+  /// TODO: is this really necessary? If not, revise.
+  void CreateUnionInstances(
+      FragmentExecParams* fragment_params, QuerySchedule* schedule);
 
   /// Create instances of the fragment corresponding to fragment_params to run 
on the
   /// selected replica hosts of the scan ranges of the node with id scan_id.
   /// The maximum number of instances is the value of query option mt_dop.
-  /// This attempts to load balance among instances by computing the average 
number
-  /// of bytes per instances and then in a single pass assigning scan ranges 
to each
+  /// For HDFS, this attempts to load balance among instances by computing the 
average
+  /// number of bytes per instances and then in a single pass assigning scan 
ranges to each
   /// instances to roughly meet that average.
-  void MtCreateScanInstances(PlanNodeId scan_id,
-      MtFragmentExecParams* fragment_params, QuerySchedule* schedule);
-
-  /// For each instance of the single input fragment of the fragment 
corresponding to
-  /// fragment_params, create an instance for this fragment.
-  void MtCreateMirrorInstances(MtFragmentExecParams* fragment_params,
-      QuerySchedule* schedule);
-
-  /// For each fragment in exec_request, computes hosts on which to run the 
instances
-  /// and stores result in fragment_exec_params_.hosts.
-  void ComputeFragmentHosts(const TQueryExecRequest& exec_request,
+  /// For all other storage mgrs, it load-balances the number of splits per 
instance.
+  void CreateScanInstances(PlanNodeId scan_id,
+      FragmentExecParams* fragment_params, QuerySchedule* schedule);
+
+  /// For each instance of fragment_params's input fragment, create a 
collocated
+  /// instance for fragment_params's fragment.
+  /// Expects that fragment_params only has a single input fragment.
+  void CreateCollocatedInstances(FragmentExecParams* fragment_params,
       QuerySchedule* schedule);
 
   /// Return the id of the leftmost node of any of the given types in 'plan', 
or
@@ -450,14 +447,9 @@ class SimpleScheduler : public Scheduler {
   /// Same for scan nodes.
   PlanNodeId FindLeftmostScan(const TPlan& plan);
 
-  /// Return the index (w/in exec_request.fragments) of fragment that sends 
its output to
-  /// exec_request.fragment[fragment_idx]'s leftmost ExchangeNode.
-  /// Return INVALID_PLAN_NODE_ID if the leftmost node is not an exchange node.
-  int FindLeftmostInputFragment(int fragment_idx, const TQueryExecRequest& 
exec_request);
-
   /// Add all hosts the given scan is executed on to scan_hosts.
-  void GetScanHosts(TPlanNodeId scan_id, const TQueryExecRequest& exec_request,
-      const FragmentExecParams& params, std::vector<TNetworkAddress>* 
scan_hosts);
+  void GetScanHosts(TPlanNodeId scan_id, const FragmentExecParams& params,
+      std::vector<TNetworkAddress>* scan_hosts);
 
   /// Return true if 'plan' contains a node of the given type.
   bool ContainsNode(const TPlan& plan, TPlanNodeType::type type);
@@ -466,11 +458,6 @@ class SimpleScheduler : public Scheduler {
   void FindNodes(const TPlan& plan, const std::vector<TPlanNodeType::type>& 
types,
       std::vector<TPlanNodeId>* results);
 
-  /// Returns the index (w/in exec_request.fragments) of fragment that sends 
its output
-  /// to the given exchange in the given fragment index.
-  int FindSenderFragment(TPlanNodeId exch_id, int fragment_idx,
-      const TQueryExecRequest& exec_request);
-
   friend class impala::test::SchedulerWrapper;
   FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentDeterministicNonCached);
   FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomNonCached);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc 
b/be/src/service/impala-http-handler.cc
index 923c864..b335a29 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -713,7 +713,12 @@ void ImpalaHttpHandler::QuerySummaryHandler(bool 
include_json_plan, bool include
         summary = exec_state->coord()->exec_summary();
       }
       if (include_json_plan) {
-        fragments = exec_state->exec_request().query_exec_request.fragments;
+        for (const TPlanExecInfo& plan_exec_info:
+            exec_state->exec_request().query_exec_request.plan_exec_info) {
+          for (const TPlanFragment& fragment: plan_exec_info.fragments) {
+            fragments.push_back(fragment);
+          }
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 4886bdf..c0eed50 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1599,7 +1599,11 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const 
QueryExecState& exec_stat
   }
 
   // Save the query fragments so that the plan can be visualised.
-  fragments = exec_state.exec_request().query_exec_request.fragments;
+  for (const TPlanExecInfo& plan_exec_info:
+      exec_state.exec_request().query_exec_request.plan_exec_info) {
+    fragments.insert(fragments.end(),
+        plan_exec_info.fragments.begin(), plan_exec_info.fragments.end());
+  }
   all_rows_returned = exec_state.eos();
   last_active_time = exec_state.last_active();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc 
b/be/src/service/query-exec-state.cc
index 7c40d37..5cd4c0c 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -379,8 +379,7 @@ Status ImpalaServer::QueryExecState::ExecLocalCatalogOp(
 Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
     const TQueryExecRequest& query_exec_request) {
   // we always need at least one plan fragment
-  DCHECK(query_exec_request.fragments.size() > 0
-      || query_exec_request.mt_plan_exec_info.size() > 0);
+  DCHECK(query_exec_request.plan_exec_info.size() > 0);
 
   if (query_exec_request.__isset.query_plan) {
     stringstream plan_ss;
@@ -427,11 +426,6 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
     summary_profile_.AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str());
   }
 
-  bool is_mt_exec = query_exec_request.query_ctx.request.query_options.mt_dop 
> 0;
-  const TPlanFragment& fragment = is_mt_exec
-      ? query_exec_request.mt_plan_exec_info[0].fragments[0]
-      : query_exec_request.fragments[0];
-
   {
     lock_guard<mutex> l(lock_);
     // Don't start executing the query if Cancel() was called concurrently 
with Exec().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index fbbf7be..f6ba35a 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -347,15 +347,9 @@ struct TPlanExecInfo {
   // it is unpartitioned.
   1: required list<Planner.TPlanFragment> fragments
 
-  // Specifies the destination fragment of the output of each fragment.
-  // dest_fragment_idx.size() == fragments.size() - 1 and
-  // fragments[i] sends its output to fragments[dest_fragment_idx[i-1]]
-  // TODO: remove; TPlanFragment.output_sink.dest_node_id is sufficient
-  2: optional list<i32> dest_fragment_idx
-
   // A map from scan node ids to a list of scan range locations.
-  // The node ids refer to scan nodes in fragments[].plan_tree
-  3: optional map<Types.TPlanNodeId, list<Planner.TScanRangeLocations>>
+  // The node ids refer to scan nodes in fragments[].plan
+  2: optional map<Types.TPlanNodeId, list<Planner.TScanRangeLocationList>>
       per_node_scan_ranges
 }
 
@@ -364,57 +358,39 @@ struct TQueryExecRequest {
   // global descriptor tbl for all fragments
   1: optional Descriptors.TDescriptorTable desc_tbl
 
-  // fragments[i] may consume the output of fragments[j > i];
-  // fragments[0] is the root fragment and also the coordinator fragment, if
-  // it is unpartitioned.
-  2: optional list<Planner.TPlanFragment> fragments
-
-  // Specifies the destination fragment of the output of each fragment.
-  // parent_fragment_idx.size() == fragments.size() - 1 and
-  // fragments[i] sends its output to fragments[dest_fragment_idx[i-1]]
-  3: optional list<i32> dest_fragment_idx
-
-  // A map from scan node ids to a list of scan range locations.
-  // The node ids refer to scan nodes in fragments[].plan_tree
-  4: optional map<Types.TPlanNodeId, list<Planner.TScanRangeLocations>>
-      per_node_scan_ranges
-
-  // Multi-threaded execution: exec info for all plans; the first one 
materializes
-  // the query result
-  // TODO: this will eventually supercede fields fragments, dest_fragment_idx,
-  // per_node_scan_ranges
-  14: optional list<TPlanExecInfo> mt_plan_exec_info
+  // exec info for all plans; the first one materializes the query result
+  2: optional list<TPlanExecInfo> plan_exec_info
 
   // Metadata of the query result set (only for select)
-  5: optional Results.TResultSetMetadata result_set_metadata
+  3: optional Results.TResultSetMetadata result_set_metadata
 
   // Set if the query needs finalization after it executes
-  6: optional TFinalizeParams finalize_params
+  4: optional TFinalizeParams finalize_params
 
-  7: required ImpalaInternalService.TQueryCtx query_ctx
+  5: required ImpalaInternalService.TQueryCtx query_ctx
 
   // The same as the output of 'explain <query>'
-  8: optional string query_plan
+  6: optional string query_plan
 
   // The statement type governs when the coordinator can judge a query to be 
finished.
   // DML queries are complete after Wait(), SELECTs may not be. Generally 
matches
   // the stmt_type of the parent TExecRequest, but in some cases (such as 
CREATE TABLE
   // AS SELECT), these may differ.
-  9: required Types.TStmtType stmt_type
+  7: required Types.TStmtType stmt_type
 
   // Estimated per-host peak memory consumption in bytes. Used for resource 
management.
-  10: optional i64 per_host_mem_req
+  8: optional i64 per_host_mem_req
 
   // Estimated per-host CPU requirements in YARN virtual cores.
   // Used for resource management.
   // TODO: Remove this and associated code in Planner.
-  11: optional i16 per_host_vcores
+  9: optional i16 per_host_vcores
 
   // List of replica hosts.  Used by the host_idx field of TScanRangeLocation.
-  12: required list<Types.TNetworkAddress> host_list
+  10: required list<Types.TNetworkAddress> host_list
 
   // Column lineage graph
-  13: optional LineageGraph.TLineageGraph lineage_graph
+  11: optional LineageGraph.TLineageGraph lineage_graph
 }
 
 enum TCatalogOpType {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/common/thrift/Planner.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Planner.thrift b/common/thrift/Planner.thrift
index 92c8681..766551e 100644
--- a/common/thrift/Planner.thrift
+++ b/common/thrift/Planner.thrift
@@ -77,9 +77,7 @@ struct TScanRangeLocation {
 }
 
 // A single scan range plus the hosts that serve it
-// TODO: rename to TScanRangeLocationList, having this differ from the above 
struct
-// by only a single letter has caused needless confusion
-struct TScanRangeLocations {
+struct TScanRangeLocationList {
   1: required PlanNodes.TScanRange scan_range
   // non-empty list
   2: list<TScanRangeLocation> locations

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
index 1f5665f..307e67e 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
@@ -54,7 +54,7 @@ import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TScanRange;
 import org.apache.impala.thrift.TScanRangeLocation;
-import org.apache.impala.thrift.TScanRangeLocations;
+import org.apache.impala.thrift.TScanRangeLocationList;
 import org.apache.impala.thrift.TStatus;
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
@@ -322,7 +322,7 @@ public class DataSourceScanNode extends ScanNode {
     TNetworkAddress networkAddress = 
addressToTNetworkAddress("localhost:12345");
     Integer hostIndex = analyzer.getHostIndex().getIndex(networkAddress);
     scanRanges_ = Lists.newArrayList(
-        new TScanRangeLocations(
+        new TScanRangeLocationList(
             new TScanRange(), Lists.newArrayList(new 
TScanRangeLocation(hostIndex))));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
index f299131..89296f1 100644
--- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
@@ -59,7 +59,7 @@ import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TScanRange;
 import org.apache.impala.thrift.TScanRangeLocation;
-import org.apache.impala.thrift.TScanRangeLocations;
+import org.apache.impala.thrift.TScanRangeLocationList;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -337,13 +337,13 @@ public class HBaseScanNode extends ScanNode {
           // extend the key range
           setKeyRangeEnd(keyRange, curRegEndKey);
         } else {
-          // create a new HBaseKeyRange (and TScanRange2/TScanRangeLocations 
to go
+          // create a new HBaseKeyRange (and 
TScanRange2/TScanRangeLocationList to go
           // with it).
           keyRange = new THBaseKeyRange();
           setKeyRangeStart(keyRange, curRegStartKey);
           setKeyRangeEnd(keyRange, curRegEndKey);
 
-          TScanRangeLocations scanRangeLocation = new TScanRangeLocations();
+          TScanRangeLocationList scanRangeLocation = new 
TScanRangeLocationList();
           TNetworkAddress networkAddress = 
addressToTNetworkAddress(locEntry.getKey());
           scanRangeLocation.addToLocations(
               new 
TScanRangeLocation(analyzer.getHostIndex().getIndex(networkAddress)));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 95d86e3..66ed792 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -51,7 +51,7 @@ import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TReplicaPreference;
 import org.apache.impala.thrift.TScanRange;
 import org.apache.impala.thrift.TScanRangeLocation;
-import org.apache.impala.thrift.TScanRangeLocations;
+import org.apache.impala.thrift.TScanRangeLocationList;
 import org.apache.impala.util.MembershipSnapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -353,7 +353,7 @@ public class HdfsScanNode extends ScanNode {
                 fileDesc.getFileName(), currentOffset, currentLength, 
partition.getId(),
                 fileDesc.getFileLength(), fileDesc.getFileCompression(),
                 fileDesc.getModificationTime()));
-            TScanRangeLocations scanRangeLocations = new TScanRangeLocations();
+            TScanRangeLocationList scanRangeLocations = new 
TScanRangeLocationList();
             scanRangeLocations.scan_range = scanRange;
             scanRangeLocations.locations = locations;
             scanRanges_.add(scanRangeLocations);
@@ -456,7 +456,7 @@ public class HdfsScanNode extends ScanNode {
     int totalNodes = 0;
     int numLocalRanges = 0;
     int numRemoteRanges = 0;
-    for (TScanRangeLocations range: scanRanges_) {
+    for (TScanRangeLocationList range: scanRanges_) {
       boolean anyLocal = false;
       for (TScanRangeLocation loc: range.locations) {
         TNetworkAddress dataNode = 
analyzer.getHostIndex().getEntry(loc.getHost_idx());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index d338608..7ef1c20 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -43,7 +43,7 @@ import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TScanRange;
 import org.apache.impala.thrift.TScanRangeLocation;
-import org.apache.impala.thrift.TScanRangeLocations;
+import org.apache.impala.thrift.TScanRangeLocationList;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.KuduClient;
@@ -183,7 +183,7 @@ public class KuduScanNode extends ScanNode {
             token.toString(), e);
       }
 
-      TScanRangeLocations locs = new TScanRangeLocations();
+      TScanRangeLocationList locs = new TScanRangeLocationList();
       locs.setScan_range(scanRange);
       locs.locations = locations;
       scanRanges_.add(locs);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java 
b/fe/src/main/java/org/apache/impala/planner/Planner.java
index fade723..ec4cf7e 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -302,9 +302,7 @@ public class Planner {
       for (int i = 0; i < fragments.size(); ++i) {
         PlanFragment fragment = fragments.get(i);
         str.append(fragment.getExplainString(explainLevel));
-        if (explainLevel == TExplainLevel.VERBOSE && i + 1 != 
fragments.size()) {
-          str.append("\n");
-        }
+        if (i < fragments.size() - 1) str.append("\n");
       }
     }
     return str.toString();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/fe/src/main/java/org/apache/impala/planner/ScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
index f444ffa..985a03a 100644
--- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
@@ -26,7 +26,7 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.common.NotImplementedException;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TNetworkAddress;
-import org.apache.impala.thrift.TScanRangeLocations;
+import org.apache.impala.thrift.TScanRangeLocationList;
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
@@ -45,7 +45,7 @@ abstract public class ScanNode extends PlanNode {
   protected int numPartitionsMissingStats_ = 0;
 
   // List of scan-range locations. Populated in init().
-  protected List<TScanRangeLocations> scanRanges_;
+  protected List<TScanRangeLocationList> scanRanges_;
 
   public ScanNode(PlanNodeId id, TupleDescriptor desc, String displayName) {
     super(id, desc.getId().asList(), displayName);
@@ -82,7 +82,7 @@ abstract public class ScanNode extends PlanNode {
   /**
    * Returns all scan ranges plus their locations.
    */
-  public List<TScanRangeLocations> getScanRangeLocations() {
+  public List<TScanRangeLocationList> getScanRangeLocations() {
     Preconditions.checkNotNull(scanRanges_, "Need to call init() first.");
     return scanRanges_;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 4bb51ad..a974278 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -919,27 +919,15 @@ public class Frontend {
     TPlanExecInfo result = new TPlanExecInfo();
     ArrayList<PlanFragment> fragments = planRoot.getNodesPreOrder();
 
-    // map from fragment to its index in TPlanExecInfo.fragments; needed for
-    // TPlanExecInfo.dest_fragment_idx
+    // collect ScanNodes
     List<ScanNode> scanNodes = Lists.newArrayList();
-    Map<PlanFragment, Integer> fragmentIdx = Maps.newHashMap();
-    for (int idx = 0; idx < fragments.size(); ++idx) {
-      PlanFragment fragment = fragments.get(idx);
+    for (PlanFragment fragment: fragments) {
       Preconditions.checkNotNull(fragment.getPlanRoot());
       fragment.getPlanRoot().collect(Predicates.instanceOf(ScanNode.class), 
scanNodes);
-      fragmentIdx.put(fragment, idx);
-    }
-
-    // set fragment destinations
-    for (int i = 1; i < fragments.size(); ++i) {
-      PlanFragment dest = fragments.get(i).getDestFragment();
-      Integer idx = fragmentIdx.get(dest);
-      Preconditions.checkState(idx != null);
-      result.addToDest_fragment_idx(idx.intValue());
     }
 
     // Set scan ranges/locations for scan nodes.
-    LOG.debug("get scan range locations");
+    LOG.trace("get scan range locations");
     Set<TTableName> tablesMissingStats = Sets.newTreeSet();
     Set<TTableName> tablesWithCorruptStats = Sets.newTreeSet();
     for (ScanNode scanNode: scanNodes) {
@@ -960,6 +948,16 @@ public class Frontend {
       queryCtx.addToTables_with_corrupt_stats(tableName);
     }
 
+    // Compute resource requirements after scan range locations because the 
cost
+    // estimates of scan nodes rely on them.
+    try {
+      planner.computeResourceReqs(fragments, true, queryExecRequest);
+    } catch (Exception e) {
+      // Turn exceptions into a warning to allow the query to execute.
+      LOG.error("Failed to compute resource requirements for query\n" +
+          queryCtx.request.getStmt(), e);
+    }
+
     // The fragment at this point has all state set, serialize it to thrift.
     for (PlanFragment fragment: fragments) {
       TPlanFragment thriftFragment = fragment.toThrift();
@@ -970,144 +968,54 @@ public class Frontend {
   }
 
   /**
-   * Create a populated TQueryExecRequest, corresponding to the supplied 
planner,
-   * for multi-threaded execution.
+   * Create a populated TQueryExecRequest, corresponding to the supplied 
planner.
    */
-  private TQueryExecRequest mtCreateExecRequest(
-      Planner planner, StringBuilder explainString)
-      throws ImpalaException {
+  private TQueryExecRequest createExecRequest(
+      Planner planner, StringBuilder explainString) throws ImpalaException {
     TQueryCtx queryCtx = planner.getQueryCtx();
-    Preconditions.checkState(queryCtx.request.query_options.mt_dop > 0);
-    // for now, always disable spilling in the backend
-    // TODO-MT: re-enable spilling
-    queryCtx.setDisable_spilling(true);
-    TQueryExecRequest result = new TQueryExecRequest();
-
-    LOG.debug("create mt plan");
-    List<PlanFragment> planRoots = planner.createParallelPlans();
+    AnalysisContext.AnalysisResult analysisResult = 
planner.getAnalysisResult();
+    boolean isMtExec =
+        analysisResult.isQueryStmt() && queryCtx.request.query_options.mt_dop 
> 0;
 
-    // create EXPLAIN output
-    result.setQuery_ctx(queryCtx);  // needed by getExplainString()
-    explainString.append(
-        planner.getExplainString(Lists.newArrayList(planRoots.get(0)), 
result));
-    result.setQuery_plan(explainString.toString());
+    List<PlanFragment> planRoots = Lists.newArrayList();
+    TQueryExecRequest result = new TQueryExecRequest();
+    if (isMtExec) {
+      LOG.debug("create mt plan");
+      planRoots.addAll(planner.createParallelPlans());
+    } else {
+      LOG.debug("create plan");
+      planRoots.add(planner.createPlan().get(0));
+    }
 
     // create per-plan exec info;
     // also assemble list of names of tables with missing or corrupt stats for
     // assembling a warning message
     for (PlanFragment planRoot: planRoots) {
-      result.addToMt_plan_exec_info(
+      result.addToPlan_exec_info(
           createPlanExecInfo(planRoot, planner, queryCtx, result));
     }
 
-    // assign fragment ids
-    int idx = 0;
-    for (TPlanExecInfo planExecInfo: result.mt_plan_exec_info) {
-      for (TPlanFragment fragment: planExecInfo.fragments) 
fragment.setIdx(idx++);
-    }
-
-    // TODO-MT: implement
-    // Compute resource requirements after scan range locations because the 
cost
-    // estimates of scan nodes rely on them.
-    //try {
-      //planner.computeResourceReqs(fragments, true, queryExecRequest);
-    //} catch (Exception e) {
-      //// Turn exceptions into a warning to allow the query to execute.
-      //LOG.error("Failed to compute resource requirements for query\n" +
-          //queryCtx.request.getStmt(), e);
-    //}
-
-    return result;
-  }
-
-  /**
-   * Create a populated TQueryExecRequest corresponding to the supplied 
TQueryCtx.
-   * TODO-MT: remove this function and rename mtCreateExecRequest() to
-   * createExecRequest()
-   */
-  private TQueryExecRequest createExecRequest(
-      Planner planner, StringBuilder explainString)
-      throws ImpalaException {
-    LOG.debug("create plan");
-    ArrayList<PlanFragment> fragments = planner.createPlan();
-
-    List<ScanNode> scanNodes = Lists.newArrayList();
-    // map from fragment to its index in queryExecRequest.fragments; needed for
-    // queryExecRequest.dest_fragment_idx
-    Map<PlanFragment, Integer> fragmentIdx = Maps.newHashMap();
-
-    for (int idx = 0; idx < fragments.size(); ++idx) {
-      PlanFragment fragment = fragments.get(idx);
-      Preconditions.checkNotNull(fragment.getPlanRoot());
-      fragment.getPlanRoot().collect(Predicates.instanceOf(ScanNode.class), 
scanNodes);
-      fragmentIdx.put(fragment, idx);
-    }
-
-    TQueryExecRequest result = new TQueryExecRequest();
-    // set fragment destinations
-    for (int i = 1; i < fragments.size(); ++i) {
-      PlanFragment dest = fragments.get(i).getDestFragment();
-      Integer idx = fragmentIdx.get(dest);
-      Preconditions.checkState(idx != null);
-      result.addToDest_fragment_idx(idx.intValue());
-    }
-
-    // Set scan ranges/locations for scan nodes.
-    // Also assemble list of tables names missing stats for assembling a 
warning message.
-    LOG.debug("get scan range locations");
-    Set<TTableName> tablesMissingStats = Sets.newTreeSet();
-    // Assemble a similar list for corrupt stats
-    Set<TTableName> tablesWithCorruptStats = Sets.newTreeSet();
-    for (ScanNode scanNode: scanNodes) {
-      result.putToPer_node_scan_ranges(
-          scanNode.getId().asInt(), scanNode.getScanRangeLocations());
-      if (scanNode.isTableMissingStats()) {
-        
tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift());
-      }
-      if (scanNode.hasCorruptTableStats()) {
-        
tablesWithCorruptStats.add(scanNode.getTupleDesc().getTableName().toThrift());
-      }
-    }
-
-    TQueryCtx queryCtx = planner.getQueryCtx();
-    for (TTableName tableName: tablesMissingStats) {
-      queryCtx.addToTables_missing_stats(tableName);
-    }
-    for (TTableName tableName: tablesWithCorruptStats) {
-      queryCtx.addToTables_with_corrupt_stats(tableName);
-    }
-
     // Optionally disable spilling in the backend. Allow spilling if there are 
plan hints
     // or if all tables have stats.
-    AnalysisContext.AnalysisResult analysisResult = 
planner.getAnalysisResult();
-    if (queryCtx.request.query_options.isDisable_unsafe_spills()
-        && !tablesMissingStats.isEmpty()
-        && !analysisResult.getAnalyzer().hasPlanHints()) {
-      queryCtx.setDisable_spilling(true);
-    }
-
-    // Compute resource requirements after scan range locations because the 
cost
-    // estimates of scan nodes rely on them.
-    try {
-      planner.computeResourceReqs(fragments, true, result);
-    } catch (Exception e) {
-      // Turn exceptions into a warning to allow the query to execute.
-      LOG.error("Failed to compute resource requirements for query\n" +
-          queryCtx.request.getStmt(), e);
-    }
-
-    // The fragment at this point has all state set, assign sequential ids
-    // and serialize to thrift.
-    for (int i = 0; i < fragments.size(); ++i) {
-      PlanFragment fragment = fragments.get(i);
-      TPlanFragment thriftFragment = fragment.toThrift();
-      thriftFragment.setIdx(i);
-      result.addToFragments(thriftFragment);
+    boolean disableSpilling =
+        queryCtx.request.query_options.isDisable_unsafe_spills()
+          && !queryCtx.tables_missing_stats.isEmpty()
+          && !analysisResult.getAnalyzer().hasPlanHints();
+    // for now, always disable spilling for multi-threaded execution
+    if (isMtExec || disableSpilling) queryCtx.setDisable_spilling(true);
+
+    // assign fragment idx
+    int idx = 0;
+    for (TPlanExecInfo planExecInfo: result.plan_exec_info) {
+      for (TPlanFragment fragment: planExecInfo.fragments) 
fragment.setIdx(idx++);
     }
 
+    // create EXPLAIN output after setting everything else
     result.setQuery_ctx(queryCtx);  // needed by getExplainString()
-    explainString.append(planner.getExplainString(fragments, result));
+    ArrayList<PlanFragment> allFragments = planRoots.get(0).getNodesPreOrder();
+    explainString.append(planner.getExplainString(allFragments, result));
     result.setQuery_plan(explainString.toString());
+
     return result;
   }
 
@@ -1156,12 +1064,7 @@ public class Frontend {
         || analysisResult.isDeleteStmt());
 
     Planner planner = new Planner(analysisResult, queryCtx);
-    TQueryExecRequest queryExecRequest;
-    if (analysisResult.isQueryStmt() && queryCtx.request.query_options.mt_dop 
> 0) {
-      queryExecRequest = mtCreateExecRequest(planner, explainString);
-    } else {
-      queryExecRequest = createExecRequest(planner, explainString);
-    }
+    TQueryExecRequest queryExecRequest = createExecRequest(planner, 
explainString);
     queryExecRequest.setDesc_tbl(
         planner.getAnalysisResult().getAnalyzer().getDescTbl().toThrift());
     queryExecRequest.setQuery_ctx(queryCtx);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index a94901b..745efa3 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -57,12 +57,13 @@ import org.apache.impala.thrift.THdfsScanNode;
 import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.TLineageGraph;
 import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.thrift.TPlanExecInfo;
 import org.apache.impala.thrift.TPlanFragment;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryExecRequest;
 import org.apache.impala.thrift.TQueryOptions;
-import org.apache.impala.thrift.TScanRangeLocations;
+import org.apache.impala.thrift.TScanRangeLocationList;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableSink;
 import org.apache.impala.thrift.TTupleDescriptor;
@@ -133,9 +134,11 @@ public class PlannerTestBase extends FrontendTestBase {
     planMap_.clear();
     tupleMap_.clear();
     tableMap_.clear();
-    for (TPlanFragment frag: execRequest.fragments) {
-      for (TPlanNode node: frag.plan.nodes) {
-        planMap_.put(node.node_id, node);
+    for (TPlanExecInfo execInfo: execRequest.plan_exec_info) {
+      for (TPlanFragment frag: execInfo.fragments) {
+        for (TPlanNode node: frag.plan.nodes) {
+          planMap_.put(node.node_id, node);
+        }
       }
     }
     if (execRequest.isSetDesc_tbl()) {
@@ -194,24 +197,28 @@ public class PlannerTestBase extends FrontendTestBase {
     long insertTableId = -1;
     // Collect all partitions that are referenced by a scan range.
     Set<THdfsPartition> scanRangePartitions = Sets.newHashSet();
-    if (execRequest.per_node_scan_ranges != null) {
-      for (Map.Entry<Integer, List<TScanRangeLocations>> entry:
-           execRequest.per_node_scan_ranges.entrySet()) {
-        if (entry.getValue() == null) {
-          continue;
-        }
-        for (TScanRangeLocations locations: entry.getValue()) {
-          if (locations.scan_range.isSetHdfs_file_split()) {
-            THdfsFileSplit split = locations.scan_range.getHdfs_file_split();
-            THdfsPartition partition = findPartition(entry.getKey(), split);
-            scanRangePartitions.add(partition);
+    for (TPlanExecInfo execInfo: execRequest.plan_exec_info) {
+      if (execInfo.per_node_scan_ranges != null) {
+        for (Map.Entry<Integer, List<TScanRangeLocationList>> entry:
+             execInfo.per_node_scan_ranges.entrySet()) {
+          if (entry.getValue() == null) {
+            continue;
+          }
+          for (TScanRangeLocationList locationList: entry.getValue()) {
+            if (locationList.scan_range.isSetHdfs_file_split()) {
+              THdfsFileSplit split = 
locationList.scan_range.getHdfs_file_split();
+              THdfsPartition partition = findPartition(entry.getKey(), split);
+              scanRangePartitions.add(partition);
+            }
           }
         }
       }
     }
+
     if (execRequest.isSetFinalize_params()) {
       insertTableId = execRequest.getFinalize_params().getTable_id();
     }
+
     boolean first = true;
     // Iterate through all partitions of the descriptor table and verify all 
partitions
     // are referenced.
@@ -241,64 +248,62 @@ public class PlannerTestBase extends FrontendTestBase {
    */
   private StringBuilder printScanRangeLocations(TQueryExecRequest execRequest) 
{
     StringBuilder result = new StringBuilder();
-    if (execRequest.per_node_scan_ranges == null) {
-      return result;
-    }
-    for (Map.Entry<Integer, List<TScanRangeLocations>> entry:
-        execRequest.per_node_scan_ranges.entrySet()) {
-      result.append("NODE " + entry.getKey().toString() + ":\n");
-      if (entry.getValue() == null) {
-        continue;
-      }
-
-      for (TScanRangeLocations locations: entry.getValue()) {
-        // print scan range
-        result.append("  ");
-        if (locations.scan_range.isSetHdfs_file_split()) {
-          THdfsFileSplit split = locations.scan_range.getHdfs_file_split();
-          THdfsTable table = findTable(entry.getKey());
-          THdfsPartition partition = 
table.getPartitions().get(split.partition_id);
-          THdfsPartitionLocation location = partition.getLocation();
-          String file_location = location.getSuffix();
-          if (location.prefix_index != -1) {
-            file_location =
-                table.getPartition_prefixes().get(location.prefix_index) + 
file_location;
-          }
-          Path filePath = new Path(file_location, split.file_name);
-          filePath = cleanseFilePath(filePath);
-          result.append("HDFS SPLIT " + filePath.toString() + " "
-              + Long.toString(split.offset) + ":" + 
Long.toString(split.length));
-        }
-        if (locations.scan_range.isSetHbase_key_range()) {
-          THBaseKeyRange keyRange = locations.scan_range.getHbase_key_range();
-          Integer hostIdx = locations.locations.get(0).host_idx;
-          TNetworkAddress networkAddress = 
execRequest.getHost_list().get(hostIdx);
-          result.append("HBASE KEYRANGE ");
-          result.append("port=" + networkAddress.port + " ");
-          if (keyRange.isSetStartKey()) {
-            
result.append(HBaseScanNode.printKey(keyRange.getStartKey().getBytes()));
-          } else {
-            result.append("<unbounded>");
+    for (TPlanExecInfo execInfo: execRequest.plan_exec_info) {
+      if (execInfo.per_node_scan_ranges == null) continue;
+      for (Map.Entry<Integer, List<TScanRangeLocationList>> entry:
+          execInfo.per_node_scan_ranges.entrySet()) {
+        result.append("NODE " + entry.getKey().toString() + ":\n");
+        if (entry.getValue() == null) continue;
+
+        for (TScanRangeLocationList locations: entry.getValue()) {
+          // print scan range
+          result.append("  ");
+          if (locations.scan_range.isSetHdfs_file_split()) {
+            THdfsFileSplit split = locations.scan_range.getHdfs_file_split();
+            THdfsTable table = findTable(entry.getKey());
+            THdfsPartition partition = 
table.getPartitions().get(split.partition_id);
+            THdfsPartitionLocation location = partition.getLocation();
+            String file_location = location.getSuffix();
+            if (location.prefix_index != -1) {
+              file_location =
+                  table.getPartition_prefixes().get(location.prefix_index) + 
file_location;
+            }
+            Path filePath = new Path(file_location, split.file_name);
+            filePath = cleanseFilePath(filePath);
+            result.append("HDFS SPLIT " + filePath.toString() + " "
+                + Long.toString(split.offset) + ":" + 
Long.toString(split.length));
           }
-          result.append(":");
-          if (keyRange.isSetStopKey()) {
-            
result.append(HBaseScanNode.printKey(keyRange.getStopKey().getBytes()));
-          } else {
-            result.append("<unbounded>");
+          if (locations.scan_range.isSetHbase_key_range()) {
+            THBaseKeyRange keyRange = 
locations.scan_range.getHbase_key_range();
+            Integer hostIdx = locations.locations.get(0).host_idx;
+            TNetworkAddress networkAddress = 
execRequest.getHost_list().get(hostIdx);
+            result.append("HBASE KEYRANGE ");
+            result.append("port=" + networkAddress.port + " ");
+            if (keyRange.isSetStartKey()) {
+              
result.append(HBaseScanNode.printKey(keyRange.getStartKey().getBytes()));
+            } else {
+              result.append("<unbounded>");
+            }
+            result.append(":");
+            if (keyRange.isSetStopKey()) {
+              
result.append(HBaseScanNode.printKey(keyRange.getStopKey().getBytes()));
+            } else {
+              result.append("<unbounded>");
+            }
           }
-        }
 
-        if (locations.scan_range.isSetKudu_scan_token()) {
-          Preconditions.checkNotNull(kuduClient_,
-              "Test should not be invoked on platforms that do not support 
Kudu.");
-          try {
-            result.append(KuduScanToken.stringifySerializedToken(
-                locations.scan_range.kudu_scan_token.array(), kuduClient_));
-          } catch (IOException e) {
-            throw new IllegalStateException("Unable to parse Kudu scan token", 
e);
+          if (locations.scan_range.isSetKudu_scan_token()) {
+            Preconditions.checkNotNull(kuduClient_,
+                "Test should not be invoked on platforms that do not support 
Kudu.");
+            try {
+              result.append(KuduScanToken.stringifySerializedToken(
+                  locations.scan_range.kudu_scan_token.array(), kuduClient_));
+            } catch (IOException e) {
+              throw new IllegalStateException("Unable to parse Kudu scan 
token", e);
+            }
           }
+          result.append("\n");
         }
-        result.append("\n");
       }
     }
     return result;
@@ -446,8 +451,8 @@ public class PlannerTestBase extends FrontendTestBase {
       }
     }
 
-    if (execRequest.isSetFragments() && !execRequest.fragments.isEmpty()) {
-      TPlanFragment firstPlanFragment = execRequest.fragments.get(0);
+    if (execRequest.isSetPlan_exec_info() && 
!execRequest.plan_exec_info.isEmpty()) {
+      TPlanFragment firstPlanFragment = 
execRequest.plan_exec_info.get(0).fragments.get(0);
       if (firstPlanFragment.isSetOutput_sink()
           && firstPlanFragment.output_sink.isSetTable_sink()) {
         TTableSink tableSink = firstPlanFragment.output_sink.table_sink;
@@ -552,7 +557,7 @@ public class PlannerTestBase extends FrontendTestBase {
     // Query exec request may not be set for DDL, e.g., CTAS.
     String locationsStr = null;
     if (execRequest != null && execRequest.isSetQuery_exec_request()) {
-      if (execRequest.query_exec_request.fragments == null) return;
+      if (execRequest.query_exec_request.plan_exec_info == null) return;
       buildMaps(execRequest.query_exec_request);
       // If we optimize the partition key scans, we may get all the partition 
key values
       // from the metadata and don't reference any table. Skip the check in 
this case.
@@ -614,27 +619,29 @@ public class PlannerTestBase extends FrontendTestBase {
     if (execRequest == null) return;
     if (!execRequest.isSetQuery_exec_request()
         || execRequest.query_exec_request == null
-        || execRequest.query_exec_request.fragments == null) {
+        || execRequest.query_exec_request.plan_exec_info == null) {
       return;
     }
-    for (TPlanFragment planFragment : 
execRequest.query_exec_request.fragments) {
-      if (!planFragment.isSetPlan() || planFragment.plan == null) continue;
-      for (TPlanNode node : planFragment.plan.nodes) {
-        if (!node.isSetLimit() || -1 == node.limit) continue;
-        if (!node.isSetEstimated_stats() || node.estimated_stats == null) 
continue;
-        if (node.limit < node.estimated_stats.cardinality) {
-          StringBuilder limitCardinalityError = new StringBuilder();
-          limitCardinalityError.append("Query: " + query + "\n");
-          limitCardinalityError.append(
-              "Expected cardinality estimate less than or equal to LIMIT: "
-              + node.limit + "\n");
-          limitCardinalityError.append(
-              "Actual cardinality estimate: "
-              + node.estimated_stats.cardinality + "\n");
-          limitCardinalityError.append(
-              "In node id "
-              + node.node_id + "\n");
-          errorLog.append(limitCardinalityError.toString());
+    for (TPlanExecInfo execInfo : 
execRequest.query_exec_request.plan_exec_info) {
+      for (TPlanFragment planFragment : execInfo.fragments) {
+        if (!planFragment.isSetPlan() || planFragment.plan == null) continue;
+        for (TPlanNode node : planFragment.plan.nodes) {
+          if (!node.isSetLimit() || -1 == node.limit) continue;
+          if (!node.isSetEstimated_stats() || node.estimated_stats == null) 
continue;
+          if (node.limit < node.estimated_stats.cardinality) {
+            StringBuilder limitCardinalityError = new StringBuilder();
+            limitCardinalityError.append("Query: " + query + "\n");
+            limitCardinalityError.append(
+                "Expected cardinality estimate less than or equal to LIMIT: "
+                + node.limit + "\n");
+            limitCardinalityError.append(
+                "Actual cardinality estimate: "
+                + node.estimated_stats.cardinality + "\n");
+            limitCardinalityError.append(
+                "In node id "
+                + node.node_id + "\n");
+            errorLog.append(limitCardinalityError.toString());
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
index fe25599..6ce43db 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
@@ -70,23 +70,23 @@ PLAN-ROOT SINK
 |
 02:TOP-N [LIMIT=10]
 |  order by: count(int_col) ASC, bigint_col ASC
-|  hosts=3 per-host-mem=unavailable
+|  hosts=3 per-host-mem=160B
 |  tuple-ids=2 row-size=16B cardinality=10
 |
 04:AGGREGATE [FINALIZE]
 |  output: count:merge(int_col)
 |  group by: bigint_col
-|  hosts=3 per-host-mem=unavailable
+|  hosts=3 per-host-mem=128.00MB
 |  tuple-ids=1 row-size=16B cardinality=unavailable
 |
 03:EXCHANGE [HASH(bigint_col)]
-|  hosts=3 per-host-mem=unavailable
+|  hosts=3 per-host-mem=0B
 |  tuple-ids=1 row-size=16B cardinality=unavailable
 |
 01:AGGREGATE [STREAMING]
 |  output: count(int_col)
 |  group by: bigint_col
-|  hosts=3 per-host-mem=unavailable
+|  hosts=3 per-host-mem=128.00MB
 |  tuple-ids=1 row-size=16B cardinality=unavailable
 |
 00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
@@ -94,7 +94,7 @@ PLAN-ROOT SINK
    predicates: id < 10
    table stats: unavailable
    column stats: unavailable
-   hosts=3 per-host-mem=unavailable
+   hosts=3 per-host-mem=16.00MB
    tuple-ids=0 row-size=16B cardinality=unavailable
 ====
 # Single-table scan/filter/analysic should work.
@@ -136,16 +136,16 @@ PLAN-ROOT SINK
 |  partition by: int_col
 |  order by: id ASC
 |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-|  hosts=3 per-host-mem=unavailable
+|  hosts=3 per-host-mem=0B
 |  tuple-ids=3,2 row-size=16B cardinality=unavailable
 |
 01:SORT
 |  order by: int_col ASC NULLS FIRST, id ASC
-|  hosts=3 per-host-mem=unavailable
+|  hosts=3 per-host-mem=0B
 |  tuple-ids=3 row-size=8B cardinality=unavailable
 |
 03:EXCHANGE [HASH(int_col)]
-|  hosts=3 per-host-mem=unavailable
+|  hosts=3 per-host-mem=0B
 |  tuple-ids=0 row-size=8B cardinality=unavailable
 |
 00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
@@ -153,7 +153,7 @@ PLAN-ROOT SINK
    predicates: id < 10
    table stats: unavailable
    column stats: unavailable
-   hosts=3 per-host-mem=unavailable
+   hosts=3 per-host-mem=16.00MB
    tuple-ids=0 row-size=8B cardinality=unavailable
 ====
 # Nested-loop join in a subplan should work.
@@ -216,39 +216,39 @@ PLAN-ROOT SINK
 |  tuple-ids=2,1,0 row-size=562B cardinality=1500000
 |
 01:SUBPLAN
-|  hosts=3 per-host-mem=unavailable
+|  hosts=3 per-host-mem=0B
 |  tuple-ids=2,1,0 row-size=562B cardinality=1500000
 |
 |--08:NESTED LOOP JOIN [CROSS JOIN]
-|  |  hosts=3 per-host-mem=unavailable
+|  |  hosts=3 per-host-mem=254B
 |  |  tuple-ids=2,1,0 row-size=562B cardinality=100
 |  |
 |  |--02:SINGULAR ROW SRC
 |  |     parent-subplan=01
-|  |     hosts=3 per-host-mem=unavailable
+|  |     hosts=3 per-host-mem=0B
 |  |     tuple-ids=0 row-size=254B cardinality=1
 |  |
 |  04:SUBPLAN
-|  |  hosts=3 per-host-mem=unavailable
+|  |  hosts=3 per-host-mem=0B
 |  |  tuple-ids=2,1 row-size=308B cardinality=100
 |  |
 |  |--07:NESTED LOOP JOIN [CROSS JOIN]
-|  |  |  hosts=3 per-host-mem=unavailable
+|  |  |  hosts=3 per-host-mem=124B
 |  |  |  tuple-ids=2,1 row-size=308B cardinality=10
 |  |  |
 |  |  |--05:SINGULAR ROW SRC
 |  |  |     parent-subplan=04
-|  |  |     hosts=3 per-host-mem=unavailable
+|  |  |     hosts=3 per-host-mem=0B
 |  |  |     tuple-ids=1 row-size=124B cardinality=1
 |  |  |
 |  |  06:UNNEST [o.o_lineitems]
 |  |     parent-subplan=04
-|  |     hosts=3 per-host-mem=unavailable
+|  |     hosts=3 per-host-mem=0B
 |  |     tuple-ids=2 row-size=0B cardinality=10
 |  |
 |  03:UNNEST [c.c_orders o]
 |     parent-subplan=01
-|     hosts=3 per-host-mem=unavailable
+|     hosts=3 per-host-mem=0B
 |     tuple-ids=1 row-size=0B cardinality=10
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM]
@@ -258,7 +258,7 @@ PLAN-ROOT SINK
    predicates on o_lineitems: l_linenumber < 3
    table stats: 150000 rows total
    columns missing stats: c_orders
-   hosts=3 per-host-mem=unavailable
+   hosts=3 per-host-mem=88.00MB
    tuple-ids=0 row-size=254B cardinality=15000
 ====
 # Hash-join in a subplan should work.
@@ -312,31 +312,31 @@ PLAN-ROOT SINK
 |  tuple-ids=1,0,2 row-size=286B cardinality=1500000
 |
 01:SUBPLAN
-|  hosts=3 per-host-mem=unavailable
+|  hosts=3 per-host-mem=0B
 |  tuple-ids=1,0,2 row-size=286B cardinality=1500000
 |
 |--06:HASH JOIN [INNER JOIN]
 |  |  hash predicates: o1.o_orderkey = o2.o_orderkey + 2
-|  |  hosts=3 per-host-mem=unavailable
+|  |  hosts=3 per-host-mem=0B
 |  |  tuple-ids=1,0,2 row-size=286B cardinality=10
 |  |
 |  |--04:UNNEST [c.c_orders o2]
 |  |     parent-subplan=01
-|  |     hosts=3 per-host-mem=unavailable
+|  |     hosts=3 per-host-mem=0B
 |  |     tuple-ids=2 row-size=0B cardinality=10
 |  |
 |  05:NESTED LOOP JOIN [CROSS JOIN]
-|  |  hosts=3 per-host-mem=unavailable
+|  |  hosts=3 per-host-mem=270B
 |  |  tuple-ids=1,0 row-size=278B cardinality=10
 |  |
 |  |--02:SINGULAR ROW SRC
 |  |     parent-subplan=01
-|  |     hosts=3 per-host-mem=unavailable
+|  |     hosts=3 per-host-mem=0B
 |  |     tuple-ids=0 row-size=270B cardinality=1
 |  |
 |  03:UNNEST [c.c_orders o1]
 |     parent-subplan=01
-|     hosts=3 per-host-mem=unavailable
+|     hosts=3 per-host-mem=0B
 |     tuple-ids=1 row-size=0B cardinality=10
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM]
@@ -345,6 +345,6 @@ PLAN-ROOT SINK
    predicates on o1: o1.o_orderkey < 5
    table stats: 150000 rows total
    columns missing stats: c_orders, c_orders
-   hosts=3 per-host-mem=unavailable
+   hosts=3 per-host-mem=88.00MB
    tuple-ids=0 row-size=270B cardinality=150000
 ====

Reply via email to