http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index bede0ef..741da32 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -56,10 +56,13 @@
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/parallel-executor.h"
-#include "runtime/plan-fragment-executor.h"
 #include "runtime/query-exec-mgr.h"
 #include "runtime/row-batch.h"
 #include "runtime/tuple-row.h"
+#include "runtime/coordinator-filter-state.h"
+#include "runtime/coordinator-backend-state.h"
+#include "runtime/debug-options.h"
+#include "runtime/query-state.h"
 #include "scheduling/scheduler.h"
 #include "util/bloom-filter.h"
 #include "util/container-util.h"
@@ -78,7 +81,6 @@
 
 using namespace apache::thrift;
 using namespace strings;
-namespace accumulators = boost::accumulators;
 using boost::algorithm::iequals;
 using boost::algorithm::is_any_of;
 using boost::algorithm::join;
@@ -98,336 +100,17 @@ namespace impala {
 // Maximum number of fragment instances that can publish each broadcast filter.
 static const int MAX_BROADCAST_FILTER_PRODUCERS = 3;
 
-// container for debug options in TPlanFragmentInstanceCtx (debug_node, 
debug_action,
-// debug_phase)
-struct DebugOptions {
-  int instance_state_idx;
-  int node_id;
-  TDebugAction::type action;
-  TExecNodePhase::type phase;  // INVALID: debug options invalid
-
-  DebugOptions()
-    : instance_state_idx(-1), node_id(-1), action(TDebugAction::WAIT),
-      phase(TExecNodePhase::INVALID) {}
-
-  // If these debug options apply to the candidate fragment instance, returns 
true
-  // otherwise returns false.
-  bool IsApplicable(int candidate_instance_state_idx) {
-    if (phase == TExecNodePhase::INVALID) return false;
-    return (instance_state_idx == -1 ||
-        instance_state_idx == candidate_instance_state_idx);
-  }
-};
-
-/// Execution state of a particular fragment instance.
-///
-/// Concurrent accesses:
-/// - updates through UpdateFragmentExecStatus()
-class Coordinator::InstanceState {
- public:
-  InstanceState(const FInstanceExecParams& params, ObjectPool* obj_pool)
-    : exec_params_(params),
-      total_split_size_(0),
-      profile_(nullptr),
-      total_ranges_complete_(0),
-      rpc_latency_(0),
-      rpc_sent_(false),
-      done_(false),
-      profile_created_(false) {
-    const string& profile_name = Substitute("Instance $0 (host=$1)",
-        PrintId(params.instance_id), lexical_cast<string>(params.host));
-    profile_ = obj_pool->Add(new RuntimeProfile(obj_pool, profile_name));
-  }
-
-  /// Called to set the initial status of the fragment instance after the
-  /// ExecRemoteFragment() RPC has returned. If 'rpc_sent' is true,
-  /// CancelFragmentInstances() will include this instance in the set of 
potential
-  /// fragment instances to cancel.
-  void SetInitialStatus(const Status& status, bool rpc_sent) {
-    DCHECK(!rpc_sent_);
-    rpc_sent_ = rpc_sent;
-    status_ = status;
-    if (!status_.ok()) return;
-    stopwatch_.Start();
-  }
-
-  /// Computes sum of split sizes of leftmost scan.
-  void ComputeTotalSplitSize(const PerNodeScanRanges& per_node_scan_ranges);
-
-  /// Updates the total number of scan ranges complete for this fragment. 
Returns the
-  /// delta since the last time this was called. Not thread-safe without 
lock() being
-  /// acquired by the caller.
-  int64_t UpdateNumScanRangesCompleted();
-
-  // The following getters do not require lock() to be held.
-  const TUniqueId& fragment_instance_id() const { return 
exec_params_.instance_id; }
-  FragmentIdx fragment_idx() const { return exec_params_.fragment().idx; }
-  MonotonicStopWatch* stopwatch() { return &stopwatch_; }
-  const TNetworkAddress& impalad_address() const { return exec_params_.host; }
-  int64_t total_split_size() const { return total_split_size_; }
-  bool done() const { return done_; }
-  int per_fragment_instance_idx() const { return 
exec_params_.per_fragment_instance_idx; }
-  bool rpc_sent() const { return rpc_sent_; }
-  int64_t rpc_latency() const { return rpc_latency_; }
-
-  mutex* lock() { return &lock_; }
-
-  void set_status(const Status& status) { status_ = status; }
-  void set_done(bool done) { done_ = done; }
-  void set_rpc_latency(int64_t millis) {
-    DCHECK_EQ(rpc_latency_, 0);
-    rpc_latency_ = millis;
-  }
-
-  // Return values of the following functions must be accessed with lock() held
-  RuntimeProfile* profile() const { return profile_; }
-  void set_profile(RuntimeProfile* profile) { profile_ = profile; }
-  FragmentInstanceCounters* aggregate_counters() { return 
&aggregate_counters_; }
-  ErrorLogMap* error_log() { return &error_log_; }
-  Status* status() { return &status_; }
-
-  /// Registers that the fragment instance's profile has been created and 
initially
-  /// populated. Returns whether the profile had already been initialised so 
that callers
-  /// can tell if they are the first to do so. Not thread-safe.
-  bool SetProfileCreated() {
-    bool cur = profile_created_;
-    profile_created_ = true;
-    return cur;
-  }
-
- private:
-  const FInstanceExecParams& exec_params_;
-
-  /// Wall clock timer for this fragment.
-  MonotonicStopWatch stopwatch_;
-
-  /// Summed across all splits; in bytes.
-  int64_t total_split_size_;
-
-  /// Protects fields below. Can be held while doing an RPC, so SpinLock is a 
bad idea.
-  /// lock ordering: Coordinator::lock_ must only be obtained *prior* to lock_
-  mutex lock_;
-
-  /// If the status indicates an error status, execution of this fragment has 
either been
-  /// aborted by the executing impalad (which then reported the error) or 
cancellation has
-  /// been initiated; either way, execution must not be cancelled.
-  Status status_;
-
-  /// Owned by coordinator object pool provided in the c'tor
-  RuntimeProfile* profile_;
-
-  /// Errors reported by this fragment instance.
-  ErrorLogMap error_log_;
-
-  /// Total scan ranges complete across all scan nodes.
-  int64_t total_ranges_complete_;
-
-  /// Summary counters aggregated across the duration of execution.
-  FragmentInstanceCounters aggregate_counters_;
-
-  /// Time, in ms, that it took to execute the ExecRemoteFragment() RPC.
-  int64_t rpc_latency_;
-
-  /// If true, ExecPlanFragment() rpc has been sent - even if it was not 
determined to be
-  /// successful.
-  bool rpc_sent_;
-
-  /// If true, execution terminated; do not cancel in that case.
-  bool done_;
-
-  /// True after the first call to profile->Update()
-  bool profile_created_;
-};
-
-/// Represents a runtime filter target.
-struct Coordinator::FilterTarget {
-  TPlanNodeId node_id;
-  bool is_local;
-  bool is_bound_by_partition_columns;
-
-  // indices into fragment_instance_states_
-  unordered_set<int> fragment_instance_state_idxs;
-
-  FilterTarget(const TRuntimeFilterTargetDesc& tFilterTarget) {
-    node_id = tFilterTarget.node_id;
-    is_bound_by_partition_columns = 
tFilterTarget.is_bound_by_partition_columns;
-    is_local = tFilterTarget.is_local_target;
-  }
-};
-
-
-/// State of filters that are received for aggregation.
-///
-/// A broadcast join filter is published as soon as the first update is 
received for it
-/// and subsequent updates are ignored (as they will be the same).
-/// Updates for a partitioned join filter are aggregated in 'bloom_filter' and 
this is
-/// published once 'pending_count' reaches 0 and if the filter was not 
disabled before
-/// that.
-///
-/// A filter is disabled if an always_true filter update is received, an OOM 
is hit,
-/// filter aggregation is complete or if the query is complete.
-/// Once a filter is disabled, subsequent updates for that filter are ignored.
-class Coordinator::FilterState {
- public:
-  FilterState(const TRuntimeFilterDesc& desc, const TPlanNodeId& src) : 
desc_(desc),
-      src_(src), pending_count_(0), first_arrival_time_(0L), 
completion_time_(0L),
-      disabled_(false) { }
-
-  TBloomFilter* bloom_filter() { return bloom_filter_.get(); }
-  boost::unordered_set<int>* src_fragment_instance_state_idxs() {
-    return &src_fragment_instance_state_idxs_;
-  }
-  const boost::unordered_set<int>& src_fragment_instance_state_idxs() const {
-    return src_fragment_instance_state_idxs_;
-  }
-  std::vector<FilterTarget>* targets() { return &targets_; }
-  const std::vector<FilterTarget>& targets() const { return targets_; }
-  int64_t first_arrival_time() const { return first_arrival_time_; }
-  int64_t completion_time() const { return completion_time_; }
-  const TPlanNodeId& src() const { return src_; }
-  const TRuntimeFilterDesc& desc() const { return desc_; }
-  int pending_count() const { return pending_count_; }
-  void set_pending_count(int pending_count) { pending_count_ = pending_count; }
-  bool disabled() const { return disabled_; }
-
-  /// Aggregates partitioned join filters and updates memory consumption.
-  /// Disables filter if always_true filter is received or OOM is hit.
-  void ApplyUpdate(const TUpdateFilterParams& params, Coordinator* coord);
-
-  /// Disables a filter. A disabled filter consumes no memory.
-  void Disable(MemTracker* tracker);
-
- private:
-  /// Contains the specification of the runtime filter.
-  TRuntimeFilterDesc desc_;
-
-  TPlanNodeId src_;
-  std::vector<FilterTarget> targets_;
-
-  // Index into fragment_instance_states_ for source fragment instances.
-  boost::unordered_set<int> src_fragment_instance_state_idxs_;
-
-  /// Number of remaining backends to hear from before filter is complete.
-  int pending_count_;
-
-  /// BloomFilter aggregated from all source plan nodes, to be broadcast to all
-  /// destination plan fragment instances. Owned by this object so that it can 
be
-  /// deallocated once finished with. Only set for partitioned joins 
(broadcast joins
-  /// need no aggregation).
-  /// In order to avoid memory spikes, an incoming filter is moved (vs. 
copied) to the
-  /// output structure in the case of a broadcast join. Similarly, for 
partitioned joins,
-  /// the filter is moved from the following member to the output structure.
-  std::unique_ptr<TBloomFilter> bloom_filter_;
-
-  /// Time at which first local filter arrived.
-  int64_t first_arrival_time_;
-
-  /// Time at which all local filters arrived.
-  int64_t completion_time_;
-
-  /// True if the filter is permanently disabled for this query.
-  bool disabled_;
-
-  /// TODO: Add a per-object lock so that we can avoid holding the global 
filter_lock_
-  /// for every filter update.
-
-};
-
-void Coordinator::InstanceState::ComputeTotalSplitSize(
-    const PerNodeScanRanges& per_node_scan_ranges) {
-  total_split_size_ = 0;
-
-  for (const PerNodeScanRanges::value_type& entry: per_node_scan_ranges) {
-    for (const TScanRangeParams& scan_range_params: entry.second) {
-      if (!scan_range_params.scan_range.__isset.hdfs_file_split) continue;
-      total_split_size_ += scan_range_params.scan_range.hdfs_file_split.length;
-    }
-  }
-}
-
-int64_t Coordinator::InstanceState::UpdateNumScanRangesCompleted() {
-  int64_t total = 0;
-  CounterMap& complete = aggregate_counters_.scan_ranges_complete_counters;
-  for (CounterMap::iterator i = complete.begin(); i != complete.end(); ++i) {
-    total += i->second->value();
-  }
-  int64_t delta = total - total_ranges_complete_;
-  total_ranges_complete_ = total;
-  DCHECK_GE(delta, 0);
-  return delta;
-}
-
-Coordinator::Coordinator(const QuerySchedule& schedule, ExecEnv* exec_env,
-    RuntimeProfile::EventSequence* events)
+Coordinator::Coordinator(
+    const QuerySchedule& schedule, RuntimeProfile::EventSequence* events)
   : schedule_(schedule),
-    exec_env_(exec_env),
-    has_called_wait_(false),
-    returned_all_results_(false),
-    query_state_(nullptr),
-    num_remaining_fragment_instances_(0),
-    obj_pool_(new ObjectPool()),
-    query_events_(events),
-    filter_routing_table_complete_(false),
     filter_mode_(schedule.query_options().runtime_filter_mode),
-    torn_down_(false) {}
+    obj_pool_(new ObjectPool()),
+    query_events_(events) {}
 
 Coordinator::~Coordinator() {
   DCHECK(torn_down_) << "TearDown() must be called before Coordinator is 
destroyed";
 }
 
-PlanFragmentExecutor* Coordinator::executor() {
-  return (coord_instance_ == nullptr) ? nullptr : coord_instance_->executor();
-}
-
-TExecNodePhase::type GetExecNodePhase(const string& key) {
-  map<int, const char*>::const_iterator entry =
-      _TExecNodePhase_VALUES_TO_NAMES.begin();
-  for (; entry != _TExecNodePhase_VALUES_TO_NAMES.end(); ++entry) {
-    if (iequals(key, (*entry).second)) {
-      return static_cast<TExecNodePhase::type>(entry->first);
-    }
-  }
-  return TExecNodePhase::INVALID;
-}
-
-TDebugAction::type GetDebugAction(const string& key) {
-  map<int, const char*>::const_iterator entry =
-      _TDebugAction_VALUES_TO_NAMES.begin();
-  for (; entry != _TDebugAction_VALUES_TO_NAMES.end(); ++entry) {
-    if (iequals(key, (*entry).second)) {
-      return static_cast<TDebugAction::type>(entry->first);
-    }
-  }
-  return TDebugAction::WAIT;
-}
-
-static void ProcessQueryOptions(
-    const TQueryOptions& query_options, DebugOptions* debug_options) {
-  DCHECK(debug_options != NULL);
-  if (!query_options.__isset.debug_action || 
query_options.debug_action.empty()) {
-    debug_options->phase = TExecNodePhase::INVALID;  // signal not set
-    return;
-  }
-  vector<string> components;
-  split(components, query_options.debug_action, is_any_of(":"), 
token_compress_on);
-  if (components.size() < 3 || components.size() > 4) return;
-  if (components.size() == 3) {
-    debug_options->instance_state_idx = -1;
-    debug_options->node_id = atoi(components[0].c_str());
-    debug_options->phase = GetExecNodePhase(components[1]);
-    debug_options->action = GetDebugAction(components[2]);
-  } else {
-    debug_options->instance_state_idx = atoi(components[0].c_str());
-    debug_options->node_id = atoi(components[1].c_str());
-    debug_options->phase = GetExecNodePhase(components[2]);
-    debug_options->action = GetDebugAction(components[3]);
-  }
-  DCHECK(!(debug_options->phase == TExecNodePhase::CLOSE &&
-           debug_options->action == TDebugAction::WAIT))
-      << "Do not use CLOSE:WAIT debug actions "
-      << "because nodes cannot be cancelled in Close()";
-}
-
 Status Coordinator::Exec() {
   const TQueryExecRequest& request = schedule_.request();
   DCHECK(request.plan_exec_info.size() > 0);
@@ -438,19 +121,21 @@ Status Coordinator::Exec() {
   VLOG_QUERY << "Exec() query_id=" << schedule_.query_id()
              << " stmt=" << request.query_ctx.client_request.stmt;
   stmt_type_ = request.stmt_type;
-  query_id_ = schedule_.query_id();
-  desc_tbl_ = request.desc_tbl;
   query_ctx_ = request.query_ctx;
+  // set descriptor table here globally
+  // TODO: remove TQueryExecRequest.desc_tbl
+  query_ctx_.__set_desc_tbl(request.desc_tbl);
+  query_ctx_.__set_request_pool(schedule_.request_pool());
 
   query_profile_.reset(
-      new RuntimeProfile(obj_pool(), "Execution Profile " + 
PrintId(query_id_)));
+      new RuntimeProfile(obj_pool(), "Execution Profile " + 
PrintId(query_id())));
   finalization_timer_ = ADD_TIMER(query_profile_, "FinalizationTimer");
   filter_updates_received_ = ADD_COUNTER(query_profile_, "FiltersReceived", 
TUnit::UNIT);
 
   SCOPED_TIMER(query_profile_->total_time_counter());
 
   // initialize progress updater
-  const string& str = Substitute("Query $0", PrintId(query_id_));
+  const string& str = Substitute("Query $0", PrintId(query_id()));
   progress_.Init(str, schedule_.num_scan_ranges());
 
   // runtime filters not yet supported for mt execution
@@ -460,182 +145,279 @@ Status Coordinator::Exec() {
   // to keep things simple, make async Cancel() calls wait until plan fragment
   // execution has been initiated, otherwise we might try to cancel fragment
   // execution at Impala daemons where it hasn't even started
+  // TODO: revisit this, it may not be true anymore
   lock_guard<mutex> l(lock_);
 
-  query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(
-      query_ctx_, schedule_.request_pool());
+  query_state_ = 
ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx_);
   filter_mem_tracker_.reset(new MemTracker(
       -1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), 
false));
 
-  InitExecProfiles();
-  InitExecSummary();
-  StartFInstances();
+  InitFragmentStats();
+  // create BackendStates and per-instance state, including profiles, and 
install
+  // the latter in the FragmentStats' root profile
+  InitBackendStates();
+  exec_summary_.Init(schedule_);
 
-  // In the error case, it's safe to return and not to get coord_sink_ here to 
close - if
-  // there was an error, but the coordinator fragment was successfully 
started, it should
-  // cancel itself when it receives an error status after reporting its 
profile.
-  RETURN_IF_ERROR(FinishInstanceStartup());
+  // TODO-MT: populate the runtime filter routing table
+  // This requires local aggregation of filters prior to sending
+  // for broadcast joins in order to avoid more complicated merge logic here.
 
-  // Grab executor and wait until Prepare() has finished so that runtime state 
etc. will
-  // be set up. Must do this here in order to get a reference to 
coord_instance_
-  // so that coord_sink_ remains valid throughout query lifetime.
+  if (filter_mode_ != TRuntimeFilterMode::OFF) {
+    DCHECK_EQ(request.plan_exec_info.size(), 1);
+    // Populate the runtime filter routing table. This should happen before 
starting the
+    // fragment instances. This code anticipates the indices of the instance 
states
+    // created later on in ExecRemoteFragment()
+    InitFilterRoutingTable();
+  }
+
+  // At this point, all static setup is done and all structures are 
initialized.
+  // Only runtime-related state changes past this point (examples:
+  // num_remaining_backends_, fragment instance profiles, etc.)
+
+  StartBackendExec();
+  RETURN_IF_ERROR(FinishBackendStartup());
+
+  // set coord_instance_ and coord_sink_
   if (schedule_.GetCoordFragment() != nullptr) {
-    coord_instance_ = query_state_->GetFInstanceState(query_id_);
+    // this blocks until all fragment instances have finished their Prepare 
phase
+    coord_instance_ = query_state_->GetFInstanceState(query_id());
     if (coord_instance_ == nullptr) {
-      // Coordinator instance might have failed and unregistered itself even
-      // though it was successfully started (e.g. Prepare() might have failed).
-      InstanceState* coord_state = fragment_instance_states_[0];
-      DCHECK(coord_state != nullptr);
-      lock_guard<mutex> instance_state_lock(*coord_state->lock());
-      // Try and return the fragment instance status if it was already set.
-      // TODO: Consider waiting for coord_state->done() here.
-      RETURN_IF_ERROR(*coord_state->status());
-      return Status(
-          Substitute("Coordinator fragment instance ($0) failed", 
PrintId(query_id_)));
+      // at this point, the query is done with the Prepare phase, and we expect
+      // to have a coordinator instance, but coord_instance_ == nullptr,
+      // which means we failed Prepare
+      Status prepare_status = query_state_->WaitForPrepare();
+      DCHECK(!prepare_status.ok());
+      return prepare_status;
     }
 
-    // When WaitForPrepare() returns OK(), the executor's root sink will be 
set up. At
-    // that point, the coordinator must be sure to call 
root_sink()->CloseConsumer(); the
+    // When GetFInstanceState() returns the coordinator instance, the Prepare 
phase
+    // is done and the FragmentInstanceState's root sink will be set up. At 
that point,
+    // the coordinator must be sure to call root_sink()->CloseConsumer(); the
     // fragment instance's executor will not complete until that point.
+    // TODO: what does this mean?
     // TODO: Consider moving this to Wait().
-    Status prepare_status = executor()->WaitForPrepare();
-    coord_sink_ = executor()->root_sink();
-    RETURN_IF_ERROR(prepare_status);
+    // TODO: clarify need for synchronization on this event
+    DCHECK(coord_instance_->IsPrepared() && 
coord_instance_->WaitForPrepare().ok());
+    coord_sink_ = coord_instance_->root_sink();
     DCHECK(coord_sink_ != nullptr);
   }
 
-  PrintFragmentInstanceInfo();
   return Status::OK();
 }
 
-void Coordinator::UpdateFilterRoutingTable(const FragmentExecParams& 
fragment_params) {
+void Coordinator::InitFragmentStats() {
+  vector<const TPlanFragment*> fragments;
+  schedule_.GetTPlanFragments(&fragments);
+  const TPlanFragment* coord_fragment = schedule_.GetCoordFragment();
+
+  for (const TPlanFragment* fragment: fragments) {
+    string root_profile_name =
+        Substitute(
+          fragment == coord_fragment ? "Coordinator Fragment $0" : "Fragment 
$0",
+          fragment->display_name);
+    string avg_profile_name =
+        Substitute("Averaged Fragment $0", fragment->display_name);
+    int num_instances =
+        
schedule_.GetFragmentExecParams(fragment->idx).instance_exec_params.size();
+    // TODO: special-case the coordinator fragment?
+    FragmentStats* fragment_stats = obj_pool()->Add(
+        new FragmentStats(
+          avg_profile_name, root_profile_name, num_instances, obj_pool()));
+    fragment_stats_.push_back(fragment_stats);
+    query_profile_->AddChild(fragment_stats->avg_profile(), true);
+    query_profile_->AddChild(fragment_stats->root_profile());
+  }
+}
+
+void Coordinator::InitBackendStates() {
+  int num_backends = schedule_.unique_hosts().size();
+  DCHECK_GT(num_backends, 0);
+  backend_states_.resize(num_backends);
+
+  // collect the FInstanceExecParams for each host
+  typedef map<TNetworkAddress, vector<const FInstanceExecParams*>> 
BackendParamsMap;
+  BackendParamsMap backend_params_map;
+  for (const FragmentExecParams& fragment_params: 
schedule_.fragment_exec_params()) {
+    for (const FInstanceExecParams& instance_params:
+        fragment_params.instance_exec_params) {
+      backend_params_map[instance_params.host].push_back(&instance_params);
+    }
+  }
+
+  // create BackendStates
+  bool has_coord_fragment = schedule_.GetCoordFragment() != nullptr;
+  const TNetworkAddress& coord_address = 
ExecEnv::GetInstance()->backend_address();
+  int backend_idx = 0;
+  for (const auto& entry: backend_params_map) {
+    if (has_coord_fragment && coord_address == entry.first) {
+      coord_backend_idx_ = backend_idx;
+    }
+    BackendState* backend_state = obj_pool()->Add(
+        new BackendState(query_id(), backend_idx, filter_mode_));
+    backend_state->Init(entry.second, fragment_stats_, obj_pool());
+    backend_states_[backend_idx++] = backend_state;
+  }
+  DCHECK(!has_coord_fragment || coord_backend_idx_ != -1);
+}
+
+void Coordinator::ExecSummary::Init(const QuerySchedule& schedule) {
+  const TQueryExecRequest& request = schedule.request();
+  // init exec_summary_.{nodes, exch_to_sender_map}
+  thrift_exec_summary.__isset.nodes = true;
+  DCHECK(thrift_exec_summary.nodes.empty());
+  for (const TPlanExecInfo& plan_exec_info: request.plan_exec_info) {
+    for (const TPlanFragment& fragment: plan_exec_info.fragments) {
+      if (!fragment.__isset.plan) continue;
+
+      // eventual index of fragment's root node in exec_summary_.nodes
+      int root_node_idx = thrift_exec_summary.nodes.size();
+
+      const TPlan& plan = fragment.plan;
+      int num_instances =
+          
schedule.GetFragmentExecParams(fragment.idx).instance_exec_params.size();
+      for (const TPlanNode& node: plan.nodes) {
+        node_id_to_idx_map[node.node_id] = thrift_exec_summary.nodes.size();
+        thrift_exec_summary.nodes.emplace_back();
+        TPlanNodeExecSummary& node_summary = thrift_exec_summary.nodes.back();
+        node_summary.__set_node_id(node.node_id);
+        node_summary.__set_fragment_idx(fragment.idx);
+        node_summary.__set_label(node.label);
+        node_summary.__set_label_detail(node.label_detail);
+        node_summary.__set_num_children(node.num_children);
+        if (node.__isset.estimated_stats) {
+          node_summary.__set_estimated_stats(node.estimated_stats);
+        }
+        node_summary.exec_stats.resize(num_instances);
+      }
+
+      if (fragment.__isset.output_sink
+          && fragment.output_sink.type == TDataSinkType::DATA_STREAM_SINK) {
+        const TDataStreamSink& sink = fragment.output_sink.stream_sink;
+        int exch_idx = node_id_to_idx_map[sink.dest_node_id];
+        if (sink.output_partition.type == TPartitionType::UNPARTITIONED) {
+          thrift_exec_summary.nodes[exch_idx].__set_is_broadcast(true);
+        }
+        thrift_exec_summary.__isset.exch_to_sender_map = true;
+        thrift_exec_summary.exch_to_sender_map[exch_idx] = root_node_idx;
+      }
+    }
+  }
+}
+
+void Coordinator::InitFilterRoutingTable() {
   DCHECK(schedule_.request().query_ctx.client_request.query_options.mt_dop == 
0);
-  int num_hosts = fragment_params.instance_exec_params.size();
-  DCHECK_GT(num_hosts, 0);
   DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
-      << "UpdateFilterRoutingTable() called although runtime filters are 
disabled";
+      << "InitFilterRoutingTable() called although runtime filters are 
disabled";
   DCHECK(!filter_routing_table_complete_)
-      << "UpdateFilterRoutingTable() called after setting 
filter_routing_table_complete_";
+      << "InitFilterRoutingTable() called after setting 
filter_routing_table_complete_";
 
-  for (const TPlanNode& plan_node: fragment_params.fragment.plan.nodes) {
-    if (!plan_node.__isset.runtime_filters) continue;
-    for (const TRuntimeFilterDesc& filter: plan_node.runtime_filters) {
-      if (filter_mode_ == TRuntimeFilterMode::LOCAL && 
!filter.has_local_targets) {
-        continue;
-      }
-      FilterRoutingTable::iterator i = filter_routing_table_.emplace(
-          filter.filter_id, FilterState(filter, plan_node.node_id)).first;
-      FilterState* f = &(i->second);
-      if (plan_node.__isset.hash_join_node) {
-        // Set the 'pending_count_' to zero to indicate that for a filter with 
local-only
-        // targets the coordinator does not expect to receive any filter 
updates.
-        int pending_count = filter.is_broadcast_join ?
-            (filter.has_remote_targets ? 1 : 0) : num_hosts;
-        f->set_pending_count(pending_count);
-        vector<int> src_idxs = fragment_params.GetInstanceIdxs();
-
-        // If this is a broadcast join with only non-local targets, build and 
publish it
-        // on MAX_BROADCAST_FILTER_PRODUCERS instances. If this is not a 
broadcast join
-        // or it is a broadcast join with local targets, it should be generated
-        // everywhere the join is executed.
-        if (filter.is_broadcast_join && !filter.has_local_targets
-            && num_hosts > MAX_BROADCAST_FILTER_PRODUCERS) {
-          random_shuffle(src_idxs.begin(), src_idxs.end());
-          src_idxs.resize(MAX_BROADCAST_FILTER_PRODUCERS);
-        }
-        f->src_fragment_instance_state_idxs()->insert(src_idxs.begin(), 
src_idxs.end());
-      } else if (plan_node.__isset.hdfs_scan_node) {
-        auto it = filter.planid_to_target_ndx.find(plan_node.node_id);
-        DCHECK(it != filter.planid_to_target_ndx.end());
-        const TRuntimeFilterTargetDesc& tFilterTarget = 
filter.targets[it->second];
-        if (filter_mode_ == TRuntimeFilterMode::LOCAL && 
!tFilterTarget.is_local_target) {
+  for (const FragmentExecParams& fragment_params: 
schedule_.fragment_exec_params()) {
+    int num_instances = fragment_params.instance_exec_params.size();
+    DCHECK_GT(num_instances, 0);
+
+    for (const TPlanNode& plan_node: fragment_params.fragment.plan.nodes) {
+      if (!plan_node.__isset.runtime_filters) continue;
+      for (const TRuntimeFilterDesc& filter: plan_node.runtime_filters) {
+        if (filter_mode_ == TRuntimeFilterMode::LOCAL && 
!filter.has_local_targets) {
           continue;
         }
-        vector<int> idxs = fragment_params.GetInstanceIdxs();
-        FilterTarget target(tFilterTarget);
-        target.fragment_instance_state_idxs.insert(idxs.begin(), idxs.end());
-        f->targets()->push_back(target);
-      } else {
-        DCHECK(false) << "Unexpected plan node with runtime filters: "
-            << ThriftDebugString(plan_node);
+        FilterRoutingTable::iterator i = filter_routing_table_.emplace(
+            filter.filter_id, FilterState(filter, plan_node.node_id)).first;
+        FilterState* f = &(i->second);
+
+        // source plan node of filter
+        if (plan_node.__isset.hash_join_node) {
+          // Set the 'pending_count_' to zero to indicate that for a filter 
with
+          // local-only targets the coordinator does not expect to receive any 
filter
+          // updates.
+          int pending_count = filter.is_broadcast_join
+              ? (filter.has_remote_targets ? 1 : 0) : num_instances;
+          f->set_pending_count(pending_count);
+
+          // determine source instances
+          // TODO: store this in FInstanceExecParams, not in FilterState
+          vector<int> src_idxs = fragment_params.GetInstanceIdxs();
+
+          // If this is a broadcast join with only non-local targets, build 
and publish it
+          // on MAX_BROADCAST_FILTER_PRODUCERS instances. If this is not a 
broadcast join
+          // or it is a broadcast join with local targets, it should be 
generated
+          // everywhere the join is executed.
+          if (filter.is_broadcast_join && !filter.has_local_targets
+              && num_instances > MAX_BROADCAST_FILTER_PRODUCERS) {
+            random_shuffle(src_idxs.begin(), src_idxs.end());
+            src_idxs.resize(MAX_BROADCAST_FILTER_PRODUCERS);
+          }
+          f->src_fragment_instance_idxs()->insert(src_idxs.begin(), 
src_idxs.end());
+
+        // target plan node of filter
+        } else if (plan_node.__isset.hdfs_scan_node) {
+          auto it = filter.planid_to_target_ndx.find(plan_node.node_id);
+          DCHECK(it != filter.planid_to_target_ndx.end());
+          const TRuntimeFilterTargetDesc& t_target = 
filter.targets[it->second];
+          if (filter_mode_ == TRuntimeFilterMode::LOCAL && 
!t_target.is_local_target) {
+            continue;
+          }
+          f->targets()->emplace_back(t_target, fragment_params.fragment.idx);
+        } else {
+          DCHECK(false) << "Unexpected plan node with runtime filters: "
+              << ThriftDebugString(plan_node);
+        }
       }
     }
   }
-}
 
-void Coordinator::StartFInstances() {
-  int num_fragment_instances = schedule_.GetNumFragmentInstances();
-  DCHECK_GT(num_fragment_instances, 0);
-
-  fragment_instance_states_.resize(num_fragment_instances);
-  exec_complete_barrier_.reset(new CountingBarrier(num_fragment_instances));
-  num_remaining_fragment_instances_ = num_fragment_instances;
+  query_profile_->AddInfoString(
+      "Number of filters", Substitute("$0", filter_routing_table_.size()));
+  query_profile_->AddInfoString("Filter routing table", FilterDebugString());
+  if (VLOG_IS_ON(2)) VLOG_QUERY << FilterDebugString();
+  filter_routing_table_complete_ = true;
+}
 
-  DebugOptions debug_options;
-  ProcessQueryOptions(schedule_.query_options(), &debug_options);
-  const TQueryExecRequest& request = schedule_.request();
+void Coordinator::StartBackendExec() {
+  int num_backends = backend_states_.size();
+  exec_complete_barrier_.reset(new CountingBarrier(num_backends));
+  num_remaining_backends_ = num_backends;
 
-  VLOG_QUERY << "starting " << num_fragment_instances << " fragment instances 
for query "
-             << query_id_;
-  query_events_->MarkEvent(
-      Substitute("Ready to start $0 fragment instances", 
num_fragment_instances));
+  DebugOptions debug_options(schedule_.query_options());
 
-  // TODO-MT: populate the runtime filter routing table
-  // This requires local aggregation of filters prior to sending
-  // for broadcast joins in order to avoid more complicated merge logic here.
+  VLOG_QUERY << "starting execution on " << num_backends << " backends for 
query "
+             << query_id();
+  query_events_->MarkEvent(Substitute("Ready to start on $0 backends", 
num_backends));
 
-  if (filter_mode_ != TRuntimeFilterMode::OFF) {
-    DCHECK_EQ(request.plan_exec_info.size(), 1);
-    // Populate the runtime filter routing table. This should happen before 
starting the
-    // fragment instances. This code anticipates the indices of the instance 
states
-    // created later on in ExecRemoteFragment()
-    for (const FragmentExecParams& fragment_params: 
schedule_.fragment_exec_params()) {
-      UpdateFilterRoutingTable(fragment_params);
-    }
-    MarkFilterRoutingTableComplete();
+  for (BackendState* backend_state: backend_states_) {
+    ExecEnv::GetInstance()->exec_rpc_thread_pool()->Offer(
+        [backend_state, this, &debug_options]() {
+          backend_state->Exec(query_ctx_, debug_options, filter_routing_table_,
+            exec_complete_barrier_.get());
+        });
   }
 
-  int num_instances = 0;
-  for (const FragmentExecParams& fragment_params: 
schedule_.fragment_exec_params()) {
-    num_instances += fragment_params.instance_exec_params.size();
-    for (const FInstanceExecParams& instance_params:
-        fragment_params.instance_exec_params) {
-      InstanceState* exec_state = obj_pool()->Add(
-          new InstanceState(instance_params, obj_pool()));
-      int instance_state_idx = GetInstanceIdx(instance_params.instance_id);
-      fragment_instance_states_[instance_state_idx] = exec_state;
-
-      DebugOptions* instance_debug_options =
-          debug_options.IsApplicable(instance_state_idx) ? &debug_options : 
NULL;
-      exec_env_->fragment_exec_thread_pool()->Offer(
-          std::bind(&Coordinator::ExecRemoteFInstance,
-            this, std::cref(instance_params), instance_debug_options));
-    }
-  }
   exec_complete_barrier_->Wait();
-  VLOG_QUERY << "started " << num_fragment_instances << " fragment instances 
for query "
-      << query_id_;
+  VLOG_QUERY << "started execution on " << num_backends << " backends for 
query "
+             << query_id();
   query_events_->MarkEvent(
-      Substitute("All $0 fragment instances started", num_instances));
+      Substitute("All $0 execution backends ($1 fragment instances) started",
+        num_backends, schedule_.GetNumFragmentInstances()));
 }
 
-Status Coordinator::FinishInstanceStartup() {
+Status Coordinator::FinishBackendStartup() {
   Status status = Status::OK();
   const TMetricDef& def =
-      MakeTMetricDef("fragment-latencies", TMetricKind::HISTOGRAM, 
TUnit::TIME_MS);
+      MakeTMetricDef("backend-startup-latencies", TMetricKind::HISTOGRAM, 
TUnit::TIME_MS);
   HistogramMetric latencies(def, 20000, 3);
-  for (InstanceState* exec_state: fragment_instance_states_) {
-    lock_guard<mutex> l(*exec_state->lock());
-    // Preserve the first non-OK status, if there is one
-    if (status.ok()) status = *exec_state->status();
-    latencies.Update(exec_state->rpc_latency());
+  for (BackendState* backend_state: backend_states_) {
+    // preserve the first non-OK, if there is one
+    Status backend_status = backend_state->GetStatus();
+    if (!backend_status.ok() && status.ok()) status = backend_status;
+    latencies.Update(backend_state->rpc_latency());
   }
 
   query_profile_->AddInfoString(
-      "Fragment instance start latencies", latencies.ToHumanReadable());
+      "Backend startup latencies", latencies.ToHumanReadable());
 
   if (!status.ok()) {
-    DCHECK(query_status_.ok()); // nobody should have been able to cancel
+    // TODO: do not allow cancellation via the debug page until Exec() has 
returned
+    //DCHECK(query_status_.ok()); // nobody should have been able to cancel
     query_status_ = status;
     CancelInternal();
   }
@@ -647,7 +429,6 @@ string Coordinator::FilterDebugString() {
   table_printer.AddColumn("ID", false);
   table_printer.AddColumn("Src. Node", false);
   table_printer.AddColumn("Tgt. Node(s)", false);
-  table_printer.AddColumn("Targets", false);
   table_printer.AddColumn("Target type", false);
   table_printer.AddColumn("Partition filter", false);
 
@@ -665,25 +446,21 @@ string Coordinator::FilterDebugString() {
     row.push_back(lexical_cast<string>(v.first));
     row.push_back(lexical_cast<string>(state.src()));
     vector<string> target_ids;
-    vector<string> num_target_instances;
     vector<string> target_types;
     vector<string> partition_filter;
     for (const FilterTarget& target: state.targets()) {
       target_ids.push_back(lexical_cast<string>(target.node_id));
-      num_target_instances.push_back(
-          lexical_cast<string>(target.fragment_instance_state_idxs.size()));
       target_types.push_back(target.is_local ? "LOCAL" : "REMOTE");
       partition_filter.push_back(target.is_bound_by_partition_columns ? "true" 
: "false");
     }
     row.push_back(join(target_ids, ", "));
-    row.push_back(join(num_target_instances, ", "));
     row.push_back(join(target_types, ", "));
     row.push_back(join(partition_filter, ", "));
 
     if (filter_mode_ == TRuntimeFilterMode::GLOBAL) {
       int pending_count = state.completion_time() != 0L ? 0 : 
state.pending_count();
       row.push_back(Substitute("$0 ($1)", pending_count,
-          state.src_fragment_instance_state_idxs().size()));
+          state.src_fragment_instance_idxs().size()));
       if (state.first_arrival_time() == 0L) {
         row.push_back("N/A");
       } else {
@@ -704,16 +481,6 @@ string Coordinator::FilterDebugString() {
   return Substitute("\n$0", table_printer.ToString());
 }
 
-void Coordinator::MarkFilterRoutingTableComplete() {
-  DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
-      << "MarkFilterRoutingTableComplete() called although runtime filters are 
disabled";
-  query_profile_->AddInfoString(
-      "Number of filters", Substitute("$0", filter_routing_table_.size()));
-  query_profile_->AddInfoString("Filter routing table", FilterDebugString());
-  if (VLOG_IS_ON(2)) VLOG_QUERY << FilterDebugString();
-  filter_routing_table_complete_ = true;
-}
-
 Status Coordinator::GetStatus() {
   lock_guard<mutex> l(lock_);
   return query_status_;
@@ -724,7 +491,7 @@ Status Coordinator::UpdateStatus(const Status& status, 
const TUniqueId& instance
   {
     lock_guard<mutex> l(lock_);
 
-    // The query is done and we are just waiting for fragment instances to 
clean up.
+    // The query is done and we are just waiting for backends to clean up.
     // Ignore their cancelled updates.
     if (returned_all_results_ && status.IsCancelled()) return query_status_;
 
@@ -738,8 +505,8 @@ Status Coordinator::UpdateStatus(const Status& status, 
const TUniqueId& instance
     CancelInternal();
   }
 
-  // Log the id of the fragment that first failed so we can track it down 
easier.
-  VLOG_QUERY << "Query id=" << query_id_ << " failed because fragment id="
+  // Log the id of the fragment that first failed so we can track it down more 
easily.
+  VLOG_QUERY << "Query id=" << query_id() << " failed because instance id="
              << instance_id << " on host=" << instance_hostname << " failed.";
 
   return query_status_;
@@ -789,7 +556,7 @@ void Coordinator::PopulatePathPermissionCache(hdfsFS fs, 
const string& path_str,
     PermissionCache::const_iterator it = permissions_cache->find(path);
     if (it == permissions_cache->end()) {
       hdfsFileInfo* info = hdfsGetPathInfo(fs, path.c_str());
-      if (info != NULL) {
+      if (info != nullptr) {
         // File exists, so fill the cache with its current permissions.
         permissions_cache->insert(
             make_pair(path, make_pair(false, info->mPermissions)));
@@ -815,11 +582,15 @@ Status Coordinator::FinalizeSuccessfulInsert() {
   // 1. If OVERWRITE, remove all the files in the target directory
   // 2. Create all the necessary partition directories.
   DescriptorTbl* descriptor_table;
-  DescriptorTbl::Create(obj_pool(), desc_tbl_, &descriptor_table);
+  // TODO: add DescriptorTbl::CreateTableDescriptor() so we can create a
+  // descriptor for just the output table, calling Create() can be very
+  // expensive.
+  DescriptorTbl::Create(obj_pool(), query_ctx_.desc_tbl, nullptr, 
&descriptor_table);
   HdfsTableDescriptor* hdfs_table = static_cast<HdfsTableDescriptor*>(
       descriptor_table->GetTableDescriptor(finalize_params_.table_id));
-  DCHECK(hdfs_table != NULL) << "INSERT target table not known in descriptor 
table: "
-                             << finalize_params_.table_id;
+  DCHECK(hdfs_table != nullptr)
+      << "INSERT target table not known in descriptor table: "
+      << finalize_params_.table_id;
 
   // Loop over all partitions that were updated by this insert, and create the 
set of
   // filesystem operations required to create the correct partition structure 
on disk.
@@ -843,9 +614,9 @@ Status Coordinator::FinalizeSuccessfulInsert() {
       part_path_ss << finalize_params_.hdfs_base_dir << "/" << partition.first;
     } else {
       HdfsPartitionDescriptor* part = 
hdfs_table->GetPartition(partition.second.id);
-      DCHECK(part != NULL) << "table_id=" << hdfs_table->id()
-                           << " partition_id=" << partition.second.id
-                           << "\n" <<  
PrintThrift(runtime_state()->instance_ctx());
+      DCHECK(part != nullptr)
+          << "table_id=" << hdfs_table->id() << " partition_id=" << 
partition.second.id
+          << "\n" <<  PrintThrift(runtime_state()->instance_ctx());
       part_path_ss << part->location();
     }
     const string& part_path = part_path_ss.str();
@@ -869,15 +640,15 @@ Status Coordinator::FinalizeSuccessfulInsert() {
         errno = 0;
         hdfsFileInfo* existing_files =
             hdfsListDirectory(partition_fs_connection, part_path.c_str(), 
&num_files);
-        if (existing_files == NULL && errno == EAGAIN) {
+        if (existing_files == nullptr && errno == EAGAIN) {
           errno = 0;
           existing_files =
               hdfsListDirectory(partition_fs_connection, part_path.c_str(), 
&num_files);
         }
-        // hdfsListDirectory() returns NULL not only when there is an error 
but also
+        // hdfsListDirectory() returns nullptr not only when there is an error 
but also
         // when the directory is empty(HDFS-8407). Need to check errno to make 
sure
         // the call fails.
-        if (existing_files == NULL && errno != 0) {
+        if (existing_files == nullptr && errno != 0) {
           return GetHdfsErrorMsg("Could not list directory: ", part_path);
         }
         for (int i = 0; i < num_files; ++i) {
@@ -927,7 +698,8 @@ Status Coordinator::FinalizeSuccessfulInsert() {
   {
     SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, 
"Overwrite/PartitionCreationTimer",
           "FinalizationTimer"));
-    if (!partition_create_ops.Execute(exec_env_->hdfs_op_thread_pool(), 
false)) {
+    if (!partition_create_ops.Execute(
+        ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
       for (const HdfsOperationSet::Error& err: partition_create_ops.errors()) {
         // It's ok to ignore errors creating the directories, since they may 
already
         // exist. If there are permission errors, we'll run into them later.
@@ -962,7 +734,7 @@ Status Coordinator::FinalizeSuccessfulInsert() {
 
   {
     SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "FileMoveTimer", 
"FinalizationTimer"));
-    if (!move_ops.Execute(exec_env_->hdfs_op_thread_pool(), false)) {
+    if (!move_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), 
false)) {
       stringstream ss;
       ss << "Error(s) moving partition files. First error (of "
          << move_ops.errors().size() << ") was: " << 
move_ops.errors()[0].second;
@@ -974,7 +746,7 @@ Status Coordinator::FinalizeSuccessfulInsert() {
   {
     SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "FileDeletionTimer",
          "FinalizationTimer"));
-    if (!dir_deletion_ops.Execute(exec_env_->hdfs_op_thread_pool(), false)) {
+    if 
(!dir_deletion_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), 
false)) {
       stringstream ss;
       ss << "Error(s) deleting staging directories. First error (of "
          << dir_deletion_ops.errors().size() << ") was: "
@@ -996,7 +768,7 @@ Status Coordinator::FinalizeSuccessfulInsert() {
         chmod_ops.Add(CHMOD, perm.first, permissions);
       }
     }
-    if (!chmod_ops.Execute(exec_env_->hdfs_op_thread_pool(), false)) {
+    if (!chmod_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), 
false)) {
       stringstream ss;
       ss << "Error(s) setting permissions on newly created partition 
directories. First"
          << " error (of " << chmod_ops.errors().size() << ") was: "
@@ -1015,7 +787,7 @@ Status Coordinator::FinalizeQuery() {
   DCHECK(has_called_wait_);
   DCHECK(needs_finalization_);
 
-  VLOG_QUERY << "Finalizing query: " << query_id_;
+  VLOG_QUERY << "Finalizing query: " << query_id();
   SCOPED_TIMER(finalization_timer_);
   Status return_status = GetStatus();
   if (return_status.ok()) {
@@ -1024,7 +796,7 @@ Status Coordinator::FinalizeQuery() {
 
   stringstream staging_dir;
   DCHECK(finalize_params_.__isset.staging_dir);
-  staging_dir << finalize_params_.staging_dir << "/" << PrintId(query_id_,"_") 
<< "/";
+  staging_dir << finalize_params_.staging_dir << "/" << 
PrintId(query_id(),"_") << "/";
 
   hdfsFS hdfs_conn;
   RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(staging_dir.str(), 
&hdfs_conn));
@@ -1034,17 +806,17 @@ Status Coordinator::FinalizeQuery() {
   return return_status;
 }
 
-Status Coordinator::WaitForAllInstances() {
+Status Coordinator::WaitForBackendCompletion() {
   unique_lock<mutex> l(lock_);
-  while (num_remaining_fragment_instances_ > 0 && query_status_.ok()) {
-    VLOG_QUERY << "Coordinator waiting for fragment instances to finish, "
-               << num_remaining_fragment_instances_ << " remaining";
-    instance_completion_cv_.wait(l);
+  while (num_remaining_backends_ > 0 && query_status_.ok()) {
+    VLOG_QUERY << "Coordinator waiting for backends to finish, "
+               << num_remaining_backends_ << " remaining";
+    backend_completion_cv_.wait(l);
   }
   if (query_status_.ok()) {
-    VLOG_QUERY << "All fragment instances finished successfully.";
+    VLOG_QUERY << "All backends finished successfully.";
   } else {
-    VLOG_QUERY << "All fragment instances finished due to one or more errors. "
+    VLOG_QUERY << "All backends finished due to one or more errors. "
                << query_status_.GetDetail();
   }
 
@@ -1058,9 +830,9 @@ Status Coordinator::Wait() {
   has_called_wait_ = true;
 
   if (stmt_type_ == TStmtType::QUERY) {
-    DCHECK(executor() != nullptr);
-    return UpdateStatus(executor()->WaitForOpen(), 
runtime_state()->fragment_instance_id(),
-        FLAGS_hostname);
+    DCHECK(coord_instance_ != nullptr);
+    return UpdateStatus(coord_instance_->WaitForOpen(),
+        runtime_state()->fragment_instance_id(), FLAGS_hostname);
   }
 
   DCHECK_EQ(stmt_type_, TStmtType::DML);
@@ -1070,7 +842,7 @@ Status Coordinator::Wait() {
   // fragment which will be available after Open() returns.
   // Ignore the returned status if finalization is required., since 
FinalizeQuery() will
   // pick it up and needs to execute regardless.
-  Status status = WaitForAllInstances();
+  Status status = WaitForBackendCompletion();
   if (!needs_finalization_ && !status.ok()) return status;
 
   // Query finalization is required only for HDFS table sinks
@@ -1078,16 +850,14 @@ Status Coordinator::Wait() {
 
   query_profile_->AddInfoString(
       "DML Stats", DataSink::OutputDmlStats(per_partition_status_, "\n"));
-  // For DML queries, when Wait is done, the query is complete.  Report 
aggregate
-  // query profiles at this point.
-  // TODO: make sure ReportQuerySummary gets called on error
-  ReportQuerySummary();
+  // For DML queries, when Wait is done, the query is complete.
+  ComputeQuerySummary();
 
   return status;
 }
 
 Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
-  VLOG_ROW << "GetNext() query_id=" << query_id_;
+  VLOG_ROW << "GetNext() query_id=" << query_id();
   DCHECK(has_called_wait_);
   SCOPED_TIMER(query_profile_->total_time_counter());
 
@@ -1112,7 +882,7 @@ Status Coordinator::GetNext(QueryResultSet* results, int 
max_rows, bool* eos) {
   if (*eos) {
     returned_all_results_ = true;
     // Trigger tear-down of coordinator fragment by closing the consumer. Must 
do before
-    // WaitForAllInstances().
+    // WaitForBackendCompletion().
     coord_sink_->CloseConsumer();
     coord_sink_ = nullptr;
 
@@ -1120,211 +890,23 @@ Status Coordinator::GetNext(QueryResultSet* results, 
int max_rows, bool* eos) {
     // all instances to complete before ultimately signalling the end of 
execution via a
     // NULL batch. After NULL is returned, the coordinator may tear down query 
state, and
     // perform post-query finalization which might depend on the reports from 
all
-    // instances.
+    // backends.
     //
     // TODO: Waiting should happen in TearDown() (and then we wouldn't need to 
call
     // CloseConsumer() here). See IMPALA-4275 for details.
-    RETURN_IF_ERROR(WaitForAllInstances());
-    if (query_status_.ok()) {
-      // If the query completed successfully, report aggregate query profiles.
-      ReportQuerySummary();
-    }
+    RETURN_IF_ERROR(WaitForBackendCompletion());
+    // if the query completed successfully, compute the summary
+    if (query_status_.ok()) ComputeQuerySummary();
   }
 
   return Status::OK();
 }
 
-void Coordinator::PrintFragmentInstanceInfo() {
-  for (InstanceState* state: fragment_instance_states_) {
-    SummaryStats& acc = 
fragment_profiles_[state->fragment_idx()].bytes_assigned;
-    acc(state->total_split_size());
-  }
-
-  for (int id = (executor() == NULL ? 0 : 1); id < fragment_profiles_.size(); 
++id) {
-    SummaryStats& acc = fragment_profiles_[id].bytes_assigned;
-    double min = accumulators::min(acc);
-    double max = accumulators::max(acc);
-    double mean = accumulators::mean(acc);
-    double stddev = sqrt(accumulators::variance(acc));
-    stringstream ss;
-    ss << " min: " << PrettyPrinter::Print(min, TUnit::BYTES)
-      << ", max: " << PrettyPrinter::Print(max, TUnit::BYTES)
-      << ", avg: " << PrettyPrinter::Print(mean, TUnit::BYTES)
-      << ", stddev: " << PrettyPrinter::Print(stddev, TUnit::BYTES);
-    fragment_profiles_[id].averaged_profile->AddInfoString("split sizes", 
ss.str());
-
-    if (VLOG_FILE_IS_ON) {
-      VLOG_FILE << "Byte split for fragment " << id << " " << ss.str();
-      for (InstanceState* exec_state: fragment_instance_states_) {
-        if (exec_state->fragment_idx() != id) continue;
-        VLOG_FILE << "data volume for ipaddress " << exec_state << ": "
-                  << PrettyPrinter::Print(exec_state->total_split_size(), 
TUnit::BYTES);
-      }
-    }
-  }
-}
-
-void Coordinator::InitExecSummary() {
-  const TQueryExecRequest& request = schedule_.request();
-  // init exec_summary_.{nodes, exch_to_sender_map}
-  exec_summary_.__isset.nodes = true;
-  DCHECK(exec_summary_.nodes.empty());
-  for (const TPlanExecInfo& plan_exec_info: request.plan_exec_info) {
-    for (const TPlanFragment& fragment: plan_exec_info.fragments) {
-      if (!fragment.__isset.plan) continue;
-
-      // eventual index of fragment's root node in exec_summary_.nodes
-      int root_node_idx = exec_summary_.nodes.size();
-
-      const TPlan& plan = fragment.plan;
-      int num_instances =
-          
schedule_.GetFragmentExecParams(fragment.idx).instance_exec_params.size();
-      for (const TPlanNode& node: plan.nodes) {
-        plan_node_id_to_summary_map_[node.node_id] = 
exec_summary_.nodes.size();
-        exec_summary_.nodes.emplace_back();
-        TPlanNodeExecSummary& node_summary = exec_summary_.nodes.back();
-        node_summary.__set_node_id(node.node_id);
-        node_summary.__set_fragment_idx(fragment.idx);
-        node_summary.__set_label(node.label);
-        node_summary.__set_label_detail(node.label_detail);
-        node_summary.__set_num_children(node.num_children);
-        if (node.__isset.estimated_stats) {
-          node_summary.__set_estimated_stats(node.estimated_stats);
-        }
-        node_summary.exec_stats.resize(num_instances);
-      }
-
-      if (fragment.__isset.output_sink
-          && fragment.output_sink.type == TDataSinkType::DATA_STREAM_SINK) {
-        const TDataStreamSink& sink = fragment.output_sink.stream_sink;
-        int exch_idx = plan_node_id_to_summary_map_[sink.dest_node_id];
-        if (sink.output_partition.type == TPartitionType::UNPARTITIONED) {
-          exec_summary_.nodes[exch_idx].__set_is_broadcast(true);
-        }
-        exec_summary_.__isset.exch_to_sender_map = true;
-        exec_summary_.exch_to_sender_map[exch_idx] = root_node_idx;
-      }
-    }
-  }
-}
-
-void Coordinator::InitExecProfiles() {
-  vector<const TPlanFragment*> fragments;
-  schedule_.GetTPlanFragments(&fragments);
-  fragment_profiles_.resize(fragments.size());
-
-  const TPlanFragment* coord_fragment = schedule_.GetCoordFragment();
-
-  // Initialize the runtime profile structure. This adds the per fragment 
average
-  // profiles followed by the per fragment instance profiles.
-  for (const TPlanFragment* fragment: fragments) {
-    string profile_name =
-        (fragment == coord_fragment) ? "Coordinator Fragment $0" : "Fragment 
$0";
-    PerFragmentProfileData* data = &fragment_profiles_[fragment->idx];
-    data->num_instances =
-        
schedule_.GetFragmentExecParams(fragment->idx).instance_exec_params.size();
-    // TODO-MT: stop special-casing the coordinator fragment
-    if (fragment != coord_fragment) {
-      data->averaged_profile = obj_pool()->Add(new RuntimeProfile(
-          obj_pool(), Substitute("Averaged Fragment $0", 
fragment->display_name), true));
-      query_profile_->AddChild(data->averaged_profile, true);
-    }
-    data->root_profile = obj_pool()->Add(
-        new RuntimeProfile(obj_pool(), Substitute(profile_name, 
fragment->display_name)));
-    // Note: we don't start the wall timer here for the fragment profile;
-    // it's uninteresting and misleading.
-    query_profile_->AddChild(data->root_profile);
-  }
-}
-
-void Coordinator::CollectScanNodeCounters(RuntimeProfile* profile,
-    FragmentInstanceCounters* counters) {
-  vector<RuntimeProfile*> children;
-  profile->GetAllChildren(&children);
-  for (RuntimeProfile* p: children) {
-    PlanNodeId id = ExecNode::GetNodeIdFromProfile(p);
-
-    // This profile is not for an exec node.
-    if (id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) continue;
-
-    RuntimeProfile::Counter* throughput_counter =
-        p->GetCounter(ScanNode::TOTAL_THROUGHPUT_COUNTER);
-    if (throughput_counter != NULL) {
-      counters->throughput_counters[id] = throughput_counter;
-    }
-    RuntimeProfile::Counter* scan_ranges_counter =
-        p->GetCounter(ScanNode::SCAN_RANGES_COMPLETE_COUNTER);
-    if (scan_ranges_counter != NULL) {
-      counters->scan_ranges_complete_counters[id] = scan_ranges_counter;
-    }
-  }
-}
-
-void Coordinator::ExecRemoteFInstance(
-    const FInstanceExecParams& exec_params, const DebugOptions* debug_options) 
{
-  NotifyBarrierOnExit notifier(exec_complete_barrier_.get());
-  TExecPlanFragmentParams rpc_params;
-  SetExecPlanFragmentParams(exec_params, &rpc_params);
-  if (debug_options != NULL) {
-    
rpc_params.fragment_instance_ctx.__set_debug_node_id(debug_options->node_id);
-    rpc_params.fragment_instance_ctx.__set_debug_action(debug_options->action);
-    rpc_params.fragment_instance_ctx.__set_debug_phase(debug_options->phase);
-  }
-  int instance_state_idx = GetInstanceIdx(exec_params.instance_id);
-  InstanceState* exec_state = fragment_instance_states_[instance_state_idx];
-  exec_state->ComputeTotalSplitSize(
-      rpc_params.fragment_instance_ctx.per_node_scan_ranges);
-  VLOG_FILE << "making rpc: ExecPlanFragment"
-      << " host=" << exec_state->impalad_address()
-      << " instance_id=" << PrintId(exec_state->fragment_instance_id());
-
-  // Guard against concurrent UpdateExecStatus() that may arrive after RPC 
returns.
-  lock_guard<mutex> l(*exec_state->lock());
-  int64_t start = MonotonicMillis();
-
-  Status client_connect_status;
-  ImpalaBackendConnection backend_client(exec_env_->impalad_client_cache(),
-      exec_state->impalad_address(), &client_connect_status);
-  if (!client_connect_status.ok()) {
-    exec_state->SetInitialStatus(client_connect_status, false);
-    return;
-  }
-
-  TExecPlanFragmentResult thrift_result;
-  Status rpc_status = 
backend_client.DoRpc(&ImpalaBackendClient::ExecPlanFragment,
-      rpc_params, &thrift_result);
-  exec_state->set_rpc_latency(MonotonicMillis() - start);
-
-  const string ERR_TEMPLATE = "ExecPlanRequest rpc query_id=$0 instance_id=$1 
failed: $2";
-
-  if (!rpc_status.ok()) {
-    const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id()),
-        PrintId(exec_state->fragment_instance_id()), rpc_status.msg().msg());
-    VLOG_QUERY << err_msg;
-    exec_state->SetInitialStatus(Status(err_msg), true);
-    return;
-  }
-
-  Status exec_status = Status(thrift_result.status);
-  if (!exec_status.ok()) {
-    const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id()),
-        PrintId(exec_state->fragment_instance_id()),
-        exec_status.msg().GetFullMessageDetails());
-    VLOG_QUERY << err_msg;
-    exec_state->SetInitialStatus(Status(err_msg), true);
-    return;
-  }
-
-  exec_state->SetInitialStatus(Status::OK(), true);
-  VLOG_FILE << "rpc succeeded: ExecPlanFragment"
-      << " instance_id=" << PrintId(exec_state->fragment_instance_id());
-}
-
 void Coordinator::Cancel(const Status* cause) {
   lock_guard<mutex> l(lock_);
-  // if the query status indicates an error, cancellation has already been 
initiated
-  if (!query_status_.ok()) return;
+  // if the query status indicates an error, cancellation has already been 
initiated;
   // prevent others from cancelling a second time
+  if (!query_status_.ok()) return;
 
   // TODO: This should default to OK(), not CANCELLED if there is no cause (or 
callers
   // should explicitly pass Status::OK()). Fragment instances may be cancelled 
at the end
@@ -1335,214 +917,105 @@ void Coordinator::Cancel(const Status* cause) {
 }
 
 void Coordinator::CancelInternal() {
-  VLOG_QUERY << "Cancel() query_id=" << query_id_;
-  CancelFragmentInstances();
+  VLOG_QUERY << "Cancel() query_id=" << query_id();
 
-  // Report the summary with whatever progress the query made before being 
cancelled.
-  ReportQuerySummary();
-}
-
-void Coordinator::CancelFragmentInstances() {
   int num_cancelled = 0;
-  for (InstanceState* exec_state: fragment_instance_states_) {
-    DCHECK(exec_state != nullptr);
-
-    // lock each exec_state individually to synchronize correctly with
-    // UpdateFragmentExecStatus() (which doesn't get the global lock_
-    // to set its status)
-    lock_guard<mutex> l(*exec_state->lock());
-
-    // Nothing to cancel if the exec rpc was not sent
-    if (!exec_state->rpc_sent()) continue;
-
-    // don't cancel if it already finished
-    if (exec_state->done()) continue;
-
-    /// If the status is not OK, we still try to cancel - !OK status might mean
-    /// communication failure between fragment instance and coordinator, but 
fragment
-    /// instance might still be running.
-
-    // set an error status to make sure we only cancel this once
-    exec_state->set_status(Status::CANCELLED);
-
-    // if we get an error while trying to get a connection to the backend,
-    // keep going
-    Status status;
-    ImpalaBackendConnection backend_client(
-        exec_env_->impalad_client_cache(), exec_state->impalad_address(), 
&status);
-    if (!status.ok()) continue;
-    ++num_cancelled;
-    TCancelPlanFragmentParams params;
-    params.protocol_version = ImpalaInternalServiceVersion::V1;
-    params.__set_fragment_instance_id(exec_state->fragment_instance_id());
-    TCancelPlanFragmentResult res;
-    VLOG_QUERY << "sending CancelPlanFragment rpc for instance_id="
-               << exec_state->fragment_instance_id() << " backend="
-               << exec_state->impalad_address();
-    Status rpc_status;
-    // Try to send the RPC 3 times before failing.
-    bool retry_is_safe;
-    for (int i = 0; i < 3; ++i) {
-      rpc_status = 
backend_client.DoRpc(&ImpalaBackendClient::CancelPlanFragment,
-          params, &res, &retry_is_safe);
-      if (rpc_status.ok() || !retry_is_safe) break;
-    }
-    if (!rpc_status.ok()) {
-      exec_state->status()->MergeStatus(rpc_status);
-      stringstream msg;
-      msg << "CancelPlanFragment rpc query_id=" << query_id_
-          << " instance_id=" << exec_state->fragment_instance_id()
-          << " failed: " << rpc_status.msg().msg();
-      // make a note of the error status, but keep on cancelling the other 
fragments
-      exec_state->status()->AddDetail(msg.str());
-      continue;
-    }
-    if (res.status.status_code != TErrorCode::OK) {
-      exec_state->status()->AddDetail(join(res.status.error_msgs, "; "));
-    }
+  for (BackendState* backend_state: backend_states_) {
+    DCHECK(backend_state != nullptr);
+    if (backend_state->Cancel()) ++num_cancelled;
   }
   VLOG_QUERY << Substitute(
-      "CancelFragmentInstances() query_id=$0, tried to cancel $1 fragment 
instances",
-      PrintId(query_id_), num_cancelled);
+      "CancelBackends() query_id=$0, tried to cancel $1 backends",
+      PrintId(query_id()), num_cancelled);
+  backend_completion_cv_.notify_all();
 
-  // Notify that we completed with an error.
-  instance_completion_cv_.notify_all();
+  // Report the summary with whatever progress the query made before being 
cancelled.
+  ComputeQuerySummary();
 }
 
-Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& 
params) {
-  VLOG_FILE << "UpdateFragmentExecStatus() "
-            << " instance=" << PrintId(params.fragment_instance_id)
-            << " status=" << params.status.status_code
-            << " done=" << (params.done ? "true" : "false");
-  int instance_state_idx = GetInstanceIdx(params.fragment_instance_id);
-  if (instance_state_idx >= fragment_instance_states_.size()) {
+Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& 
params) {
+  VLOG_FILE << "UpdateBackendExecStatus()  backend_idx=" << 
params.coord_state_idx;
+  if (params.coord_state_idx >= backend_states_.size()) {
     return Status(TErrorCode::INTERNAL_ERROR,
-        Substitute("Unknown fragment instance index $0 (max known: $1)",
-            instance_state_idx, fragment_instance_states_.size() - 1));
-  }
-  InstanceState* exec_state = fragment_instance_states_[instance_state_idx];
-
-  const TRuntimeProfileTree& cumulative_profile = params.profile;
-  Status status(params.status);
-  {
-    lock_guard<mutex> l(*exec_state->lock());
-    if (!status.ok()) {
-      // During query cancellation, exec_state is set to CANCELLED. However, 
we might
-      // process a non-error message from a fragment executor that is sent
-      // before query cancellation is invoked. Make sure we don't go from 
error status to
-      // OK.
-      exec_state->set_status(status);
-    }
-    exec_state->set_done(params.done);
-    if (exec_state->status()->ok()) {
-      // We can't update this backend's profile if ReportQuerySummary() is 
running,
-      // because it depends on all profiles not changing during its execution 
(when it
-      // calls SortChildren()). ReportQuerySummary() only gets called after
-      // WaitForAllInstances() returns or at the end of 
CancelFragmentInstances().
-      // WaitForAllInstances() only returns after all backends have completed 
(in which
-      // case we wouldn't be in this function), or when there's an error, in 
which case
-      // CancelFragmentInstances() is called. CancelFragmentInstances sets all
-      // exec_state's statuses to cancelled.
-      // TODO: We're losing this profile information. Call ReportQuerySummary 
only after
-      // all backends have completed.
-      exec_state->profile()->Update(cumulative_profile);
-
-      // Update the average profile for the fragment corresponding to this 
instance.
-      exec_state->profile()->ComputeTimeInProfile();
-      UpdateAverageProfile(exec_state);
-      UpdateExecSummary(*exec_state);
-    }
-    if (!exec_state->SetProfileCreated()) {
-      CollectScanNodeCounters(exec_state->profile(), 
exec_state->aggregate_counters());
-    }
-
-    // Log messages aggregated by type
-    if (params.__isset.error_log && params.error_log.size() > 0) {
-      // Append the log messages from each update with the global state of the 
query
-      // execution
-      MergeErrorMaps(exec_state->error_log(), params.error_log);
-      VLOG_FILE << "instance_id=" << exec_state->fragment_instance_id()
-                << " error log: " << 
PrintErrorMapToString(*exec_state->error_log());
-    }
-    progress_.Update(exec_state->UpdateNumScanRangesCompleted());
+        Substitute("Unknown backend index $0 (max known: $1)",
+            params.coord_state_idx, backend_states_.size() - 1));
   }
+  BackendState* backend_state = backend_states_[params.coord_state_idx];
+  // ignore stray exec reports if we're already done, otherwise we lose
+  // track of num_remaining_backends_
+  if (backend_state->IsDone()) return Status::OK();
+  // TODO: return here if returned_all_results_?
+  // TODO: return CANCELLED in that case? Although that makes the cancellation 
propagation
+  // path more irregular.
 
-  if (params.done && params.__isset.insert_exec_status) {
-    lock_guard<mutex> l(lock_);
-    // Merge in table update data (partitions written to, files to be moved as 
part of
-    // finalization)
-    for (const PartitionStatusMap::value_type& partition:
-         params.insert_exec_status.per_partition_status) {
-      TInsertPartitionStatus* status = 
&(per_partition_status_[partition.first]);
-      status->__set_num_modified_rows(
-          status->num_modified_rows + partition.second.num_modified_rows);
-      status->__set_kudu_latest_observed_ts(std::max(
-          partition.second.kudu_latest_observed_ts, 
status->kudu_latest_observed_ts));
-      status->__set_id(partition.second.id);
-      status->__set_partition_base_dir(partition.second.partition_base_dir);
-
-      if (partition.second.__isset.stats) {
-        if (!status->__isset.stats) status->__set_stats(TInsertStats());
-        DataSink::MergeDmlStats(partition.second.stats, &status->stats);
-      }
-    }
-    files_to_move_.insert(
-        params.insert_exec_status.files_to_move.begin(),
-        params.insert_exec_status.files_to_move.end());
-  }
+  bool done;
+  backend_state->ApplyExecStatusReport(params, &exec_summary_, &progress_, 
&done);
 
-  if (VLOG_FILE_IS_ON) {
-    stringstream s;
-    exec_state->profile()->PrettyPrint(&s);
-    VLOG_FILE << "profile for instance_id=" << 
exec_state->fragment_instance_id()
-              << "\n" << s.str();
-  }
-  // also print the cumulative profile
-  // TODO: fix the coordinator/PlanFragmentExecutor, so this isn't needed
-  if (VLOG_FILE_IS_ON) {
-    stringstream s;
-    query_profile_->PrettyPrint(&s);
-    VLOG_FILE << "cumulative profile for query_id=" << query_id_
-              << "\n" << s.str();
+  // TODO: only do this when the sink is done; probably missing a done field
+  // in TReportExecStatus for that
+  if (params.__isset.insert_exec_status) {
+    UpdateInsertExecStatus(params.insert_exec_status);
   }
 
-  // for now, abort the query if we see any error except if the error is 
cancelled
-  // and returned_all_results_ is true.
+  // for now, abort the query if we see any error except if 
returned_all_results_ is true
   // (UpdateStatus() initiates cancellation, if it hasn't already been)
-  if (!(returned_all_results_ && status.IsCancelled()) && !status.ok()) {
-    UpdateStatus(status, exec_state->fragment_instance_id(),
-        TNetworkAddressToString(exec_state->impalad_address()));
+  // TODO: clarify control flow here, it's unclear we should even process this 
status
+  // report if returned_all_results_ is true
+  TUniqueId failed_instance_id;
+  Status status = backend_state->GetStatus(&failed_instance_id);
+  if (!status.ok() && !returned_all_results_) {
+    Status ignored = UpdateStatus(status, failed_instance_id,
+        TNetworkAddressToString(backend_state->impalad_address()));
     return Status::OK();
   }
 
-  if (params.done) {
+  if (done) {
     lock_guard<mutex> l(lock_);
-    exec_state->stopwatch()->Stop();
-    DCHECK_GT(num_remaining_fragment_instances_, 0);
-    VLOG_QUERY << "Fragment instance completed:"
-        << " id=" << PrintId(exec_state->fragment_instance_id())
-        << " host=" << exec_state->impalad_address()
-        << " remaining=" << num_remaining_fragment_instances_ - 1;
-    if (VLOG_QUERY_IS_ON && num_remaining_fragment_instances_ > 1) {
+    DCHECK_GT(num_remaining_backends_, 0);
+    VLOG_QUERY << "Backend completed: "
+        << " host=" << backend_state->impalad_address()
+        << " remaining=" << num_remaining_backends_ - 1;
+    if (VLOG_QUERY_IS_ON && num_remaining_backends_ > 1) {
       // print host/port info for the first backend that's still in progress 
as a
       // debugging aid for backend deadlocks
-      for (InstanceState* exec_state: fragment_instance_states_) {
-        lock_guard<mutex> l2(*exec_state->lock());
-        if (!exec_state->done()) {
-          VLOG_QUERY << "query_id=" << query_id_ << ": first in-progress 
backend: "
-                     << exec_state->impalad_address();
+      for (BackendState* backend_state: backend_states_) {
+        if (!backend_state->IsDone()) {
+          VLOG_QUERY << "query_id=" << query_id() << ": first in-progress 
backend: "
+                     << backend_state->impalad_address();
           break;
         }
       }
     }
-    if (--num_remaining_fragment_instances_ == 0) {
-      instance_completion_cv_.notify_all();
+    if (--num_remaining_backends_ == 0 || !status.ok()) {
+      backend_completion_cv_.notify_all();
     }
   }
 
   return Status::OK();
 }
 
+void Coordinator::UpdateInsertExecStatus(const TInsertExecStatus& 
insert_exec_status) {
+  lock_guard<mutex> l(lock_);
+  for (const PartitionStatusMap::value_type& partition:
+       insert_exec_status.per_partition_status) {
+    TInsertPartitionStatus* status = &(per_partition_status_[partition.first]);
+    status->__set_num_modified_rows(
+        status->num_modified_rows + partition.second.num_modified_rows);
+    status->__set_kudu_latest_observed_ts(std::max(
+        partition.second.kudu_latest_observed_ts, 
status->kudu_latest_observed_ts));
+    status->__set_id(partition.second.id);
+    status->__set_partition_base_dir(partition.second.partition_base_dir);
+
+    if (partition.second.__isset.stats) {
+      if (!status->__isset.stats) status->__set_stats(TInsertStats());
+      DataSink::MergeDmlStats(partition.second.stats, &status->stats);
+    }
+  }
+  files_to_move_.insert(
+      insert_exec_status.files_to_move.begin(), 
insert_exec_status.files_to_move.end());
+}
+
+
 uint64_t Coordinator::GetLatestKuduInsertTimestamp() const {
   uint64_t max_ts = 0;
   for (const auto& entry : per_partition_status_) {
@@ -1553,7 +1026,7 @@ uint64_t Coordinator::GetLatestKuduInsertTimestamp() 
const {
 }
 
 RuntimeState* Coordinator::runtime_state() {
-  return executor() == NULL ? NULL : executor()->runtime_state();
+  return coord_instance_ == nullptr ? nullptr : 
coord_instance_->runtime_state();
 }
 
 bool Coordinator::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) {
@@ -1567,330 +1040,46 @@ bool 
Coordinator::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) {
   return catalog_update->created_partitions.size() != 0;
 }
 
-// Comparator to order RuntimeProfiles by descending total time
-typedef struct {
-  typedef pair<RuntimeProfile*, bool> Profile;
-  bool operator()(const Profile& a, const Profile& b) const {
-    // Reverse ordering: we want the longest first
-    return
-        a.first->total_time_counter()->value() > 
b.first->total_time_counter()->value();
-  }
-} InstanceComparator;
-
-void Coordinator::UpdateAverageProfile(InstanceState* instance_state) {
-  FragmentIdx fragment_idx = instance_state->fragment_idx();
-  DCHECK_GE(fragment_idx, 0);
-  DCHECK_LT(fragment_idx, fragment_profiles_.size());
-  PerFragmentProfileData* data = &fragment_profiles_[fragment_idx];
-
-  // No locks are taken since UpdateAverage() and AddChild() take their own 
locks
-  if (data->averaged_profile != nullptr) {
-    data->averaged_profile->UpdateAverage(instance_state->profile());
-  }
-  data->root_profile->AddChild(instance_state->profile());
-}
-
-void Coordinator::ComputeFragmentSummaryStats(InstanceState* instance_state) {
-  FragmentIdx fragment_idx = instance_state->fragment_idx();
-  DCHECK_GE(fragment_idx, 0);
-  DCHECK_LT(fragment_idx, fragment_profiles_.size());
-  PerFragmentProfileData* data = &fragment_profiles_[fragment_idx];
-
-  int64_t completion_time = instance_state->stopwatch()->ElapsedTime();
-  data->completion_times(completion_time);
-  data->rates(instance_state->total_split_size()
-      / (completion_time / 1000.0 / 1000.0 / 1000.0));
-
-  // Add the child in case it has not been added previously
-  // via UpdateAverageProfile(). AddChild() will do nothing if the child
-  // already exists.
-  data->root_profile->AddChild(instance_state->profile());
-}
-
-void Coordinator::UpdateExecSummary(const InstanceState& instance_state) {
-  vector<RuntimeProfile*> children;
-  instance_state.profile()->GetAllChildren(&children);
-
-  lock_guard<SpinLock> l(exec_summary_lock_);
-  for (int i = 0; i < children.size(); ++i) {
-    int node_id = ExecNode::GetNodeIdFromProfile(children[i]);
-    if (node_id == -1) continue;
-
-    TPlanNodeExecSummary& exec_summary =
-        exec_summary_.nodes[plan_node_id_to_summary_map_[node_id]];
-    DCHECK_LT(instance_state.per_fragment_instance_idx(), 
exec_summary.exec_stats.size());
-    DCHECK_EQ(fragment_profiles_[instance_state.fragment_idx()].num_instances,
-        exec_summary.exec_stats.size());
-    TExecStats& stats =
-        exec_summary.exec_stats[instance_state.per_fragment_instance_idx()];
-
-    RuntimeProfile::Counter* rows_counter = 
children[i]->GetCounter("RowsReturned");
-    RuntimeProfile::Counter* mem_counter = 
children[i]->GetCounter("PeakMemoryUsage");
-    if (rows_counter != NULL) stats.__set_cardinality(rows_counter->value());
-    if (mem_counter != NULL) stats.__set_memory_used(mem_counter->value());
-    stats.__set_latency_ns(children[i]->local_time());
-    // TODO: we don't track cpu time per node now. Do that.
-    exec_summary.__isset.exec_stats = true;
-  }
-  VLOG(2) << PrintExecSummary(exec_summary_);
-}
-
-// This function appends summary information to the query_profile_ before
-// outputting it to VLOG.  It adds:
-//   1. Averaged fragment instance profiles (TODO: add outliers)
-//   2. Summary of fragment instance durations (min, max, mean, stddev)
-//   3. Summary of fragment instance rates (min, max, mean, stddev)
 // TODO: add histogram/percentile
-void Coordinator::ReportQuerySummary() {
+void Coordinator::ComputeQuerySummary() {
   // In this case, the query did not even get to start all fragment instances.
   // Some of the state that is used below might be uninitialized.  In this 
case,
   // the query has made so little progress, reporting a summary is not very 
useful.
   if (!has_called_wait_) return;
 
-  if (!fragment_instance_states_.empty()) {
-    // Average all fragment instances for each fragment.
-    for (InstanceState* state: fragment_instance_states_) {
-      state->profile()->ComputeTimeInProfile();
-      UpdateAverageProfile(state);
-      // Skip coordinator fragment, if one exists.
-      // TODO: Can we remove the special casing here?
-      if (coord_instance_ == nullptr || state->fragment_idx() != 0) {
-        ComputeFragmentSummaryStats(state);
-      }
-      UpdateExecSummary(*state);
-    }
+  if (backend_states_.empty()) return;
+  // make sure fragment_stats_ are up-to-date
+  for (BackendState* backend_state: backend_states_) {
+    backend_state->UpdateExecStats(fragment_stats_);
+  }
 
-    InstanceComparator comparator;
-    // Per fragment instances have been collected, output summaries
-    for (int i = (executor() != NULL ? 1 : 0); i < fragment_profiles_.size(); 
++i) {
-      fragment_profiles_[i].root_profile->SortChildren(comparator);
-      SummaryStats& completion_times = fragment_profiles_[i].completion_times;
-      SummaryStats& rates = fragment_profiles_[i].rates;
-
-      stringstream times_label;
-      times_label
-        << "min:" << PrettyPrinter::Print(
-            accumulators::min(completion_times), TUnit::TIME_NS)
-        << "  max:" << PrettyPrinter::Print(
-            accumulators::max(completion_times), TUnit::TIME_NS)
-        << "  mean: " << PrettyPrinter::Print(
-            accumulators::mean(completion_times), TUnit::TIME_NS)
-        << "  stddev:" << PrettyPrinter::Print(
-            sqrt(accumulators::variance(completion_times)), TUnit::TIME_NS);
-
-      stringstream rates_label;
-      rates_label
-        << "min:" << PrettyPrinter::Print(
-            accumulators::min(rates), TUnit::BYTES_PER_SECOND)
-        << "  max:" << PrettyPrinter::Print(
-            accumulators::max(rates), TUnit::BYTES_PER_SECOND)
-        << "  mean:" << PrettyPrinter::Print(
-            accumulators::mean(rates), TUnit::BYTES_PER_SECOND)
-        << "  stddev:" << PrettyPrinter::Print(
-            sqrt(accumulators::variance(rates)), TUnit::BYTES_PER_SECOND);
-
-      fragment_profiles_[i].averaged_profile->AddInfoString(
-          "completion times", times_label.str());
-      fragment_profiles_[i].averaged_profile->AddInfoString(
-          "execution rates", rates_label.str());
-      fragment_profiles_[i].averaged_profile->AddInfoString(
-          "num instances", 
lexical_cast<string>(fragment_profiles_[i].num_instances));
-    }
+  for (FragmentStats* fragment_stats: fragment_stats_) {
+    fragment_stats->AddSplitStats();
+    // TODO: output the split info string and detailed stats to VLOG_FILE 
again?
+    fragment_stats->AddExecStats();
+  }
 
-    // Add per node peak memory usage as InfoString
-    // Map from Impalad address to peak memory usage of this query
-    typedef unordered_map<TNetworkAddress, int64_t> PerNodePeakMemoryUsage;
-    PerNodePeakMemoryUsage per_node_peak_mem_usage;
-    for (InstanceState* state: fragment_instance_states_) {
-      int64_t initial_usage = 0;
-      int64_t* mem_usage = FindOrInsert(&per_node_peak_mem_usage,
-          state->impalad_address(), initial_usage);
-      RuntimeProfile::Counter* mem_usage_counter =
-          
state->profile()->GetCounter(PlanFragmentExecutor::PER_HOST_PEAK_MEM_COUNTER);
-      if (mem_usage_counter != NULL && mem_usage_counter->value() > 
*mem_usage) {
-        per_node_peak_mem_usage[state->impalad_address()] = 
mem_usage_counter->value();
-      }
-    }
-    stringstream info;
-    for (PerNodePeakMemoryUsage::value_type entry: per_node_peak_mem_usage) {
-      info << entry.first << "("
-           << PrettyPrinter::Print(entry.second, TUnit::BYTES) << ") ";
-    }
-    query_profile_->AddInfoString("Per Node Peak Memory Usage", info.str());
+  stringstream info;
+  for (BackendState* backend_state: backend_states_) {
+    info << backend_state->impalad_address() << "("
+         << PrettyPrinter::Print(backend_state->GetPeakConsumption(), 
TUnit::BYTES)
+         << ") ";
   }
+  query_profile_->AddInfoString("Per Node Peak Memory Usage", info.str());
 }
 
 string Coordinator::GetErrorLog() {
   ErrorLogMap merged;
-  for (InstanceState* state: fragment_instance_states_) {
-    lock_guard<mutex> l(*state->lock());
-    if (state->error_log()->size() > 0)  MergeErrorMaps(&merged, 
*state->error_log());
+  for (BackendState* state: backend_states_) {
+    state->MergeErrorLog(&merged);
   }
   return PrintErrorMapToString(merged);
 }
 
-void Coordinator::SetExecPlanFragmentParams(
-    const FInstanceExecParams& params, TExecPlanFragmentParams* rpc_params) {
-  rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1);
-  rpc_params->__set_query_ctx(query_ctx_);
-
-  TPlanFragmentCtx fragment_ctx;
-  TPlanFragmentInstanceCtx fragment_instance_ctx;
-
-  fragment_ctx.__set_fragment(params.fragment());
-  SetExecPlanDescriptorTable(params.fragment(), rpc_params);
-
-  // Remove filters that weren't selected during filter routing table 
construction.
-  if (filter_mode_ != TRuntimeFilterMode::OFF) {
-    DCHECK(schedule_.request().query_ctx.client_request.query_options.mt_dop 
== 0);
-    int instance_idx = GetInstanceIdx(params.instance_id);
-    for (TPlanNode& plan_node: rpc_params->fragment_ctx.fragment.plan.nodes) {
-      if (plan_node.__isset.runtime_filters) {
-        vector<TRuntimeFilterDesc> required_filters;
-        for (const TRuntimeFilterDesc& desc: plan_node.runtime_filters) {
-          FilterRoutingTable::iterator filter_it =
-              filter_routing_table_.find(desc.filter_id);
-          if (filter_it == filter_routing_table_.end()) continue;
-          const FilterState& f = filter_it->second;
-          if (plan_node.__isset.hash_join_node) {
-            if (f.src_fragment_instance_state_idxs().find(instance_idx) ==
-                f.src_fragment_instance_state_idxs().end()) {
-              DCHECK(desc.is_broadcast_join);
-              continue;
-            }
-          }
-          // We don't need a target-side check here, because a filter is 
either sent to
-          // all its targets or none, and the none case is handled by checking 
if the
-          // filter is in the routing table.
-          required_filters.push_back(desc);
-        }
-        plan_node.__set_runtime_filters(required_filters);
-      }
-    }
-  }
-
-  fragment_instance_ctx.__set_request_pool(schedule_.request_pool());
-  
fragment_instance_ctx.__set_per_node_scan_ranges(params.per_node_scan_ranges);
-  fragment_instance_ctx.__set_per_exch_num_senders(
-      params.fragment_exec_params.per_exch_num_senders);
-  fragment_instance_ctx.__set_destinations(
-      params.fragment_exec_params.destinations);
-  fragment_instance_ctx.__set_sender_id(params.sender_id);
-  fragment_instance_ctx.fragment_instance_id = params.instance_id;
-  fragment_instance_ctx.per_fragment_instance_idx = 
params.per_fragment_instance_idx;
-  rpc_params->__set_fragment_ctx(fragment_ctx);
-  rpc_params->__set_fragment_instance_ctx(fragment_instance_ctx);
-}
-
-void Coordinator::SetExecPlanDescriptorTable(const TPlanFragment& fragment,
-    TExecPlanFragmentParams* rpc_params) {
-  DCHECK(rpc_params->__isset.query_ctx);
-  TDescriptorTable thrift_desc_tbl;
-
-  // Always add the Tuple and Slot descriptors.
-  thrift_desc_tbl.__set_tupleDescriptors(desc_tbl_.tupleDescriptors);
-  thrift_desc_tbl.__set_slotDescriptors(desc_tbl_.slotDescriptors);
-
-  // Collect the TTupleId(s) for ScanNode(s).
-  unordered_set<TTupleId> tuple_ids;
-  for (const TPlanNode& plan_node: fragment.plan.nodes) {
-    switch (plan_node.node_type) {
-      case TPlanNodeType::HDFS_SCAN_NODE:
-        tuple_ids.insert(plan_node.hdfs_scan_node.tuple_id);
-        break;
-      case TPlanNodeType::KUDU_SCAN_NODE:
-        tuple_ids.insert(plan_node.kudu_scan_node.tuple_id);
-        break;
-      case TPlanNodeType::HBASE_SCAN_NODE:
-        tuple_ids.insert(plan_node.hbase_scan_node.tuple_id);
-        break;
-      case TPlanNodeType::DATA_SOURCE_NODE:
-        tuple_ids.insert(plan_node.data_source_node.tuple_id);
-        break;
-      case TPlanNodeType::HASH_JOIN_NODE:
-      case TPlanNodeType::AGGREGATION_NODE:
-      case TPlanNodeType::SORT_NODE:
-      case TPlanNodeType::EMPTY_SET_NODE:
-      case TPlanNodeType::EXCHANGE_NODE:
-      case TPlanNodeType::UNION_NODE:
-      case TPlanNodeType::SELECT_NODE:
-      case TPlanNodeType::NESTED_LOOP_JOIN_NODE:
-      case TPlanNodeType::ANALYTIC_EVAL_NODE:
-      case TPlanNodeType::SINGULAR_ROW_SRC_NODE:
-      case TPlanNodeType::UNNEST_NODE:
-      case TPlanNodeType::SUBPLAN_NODE:
-        // Do nothing
-        break;
-      default:
-        DCHECK(false) << "Invalid node type: " << plan_node.node_type;
-    }
-  }
-
-  // Collect TTableId(s) matching the TTupleId(s).
-  unordered_set<TTableId> table_ids;
-  for (const TTupleId& tuple_id: tuple_ids) {
-    for (const TTupleDescriptor& tuple_desc: desc_tbl_.tupleDescriptors) {
-      if (tuple_desc.__isset.tableId &&  tuple_id == tuple_desc.id) {
-        table_ids.insert(tuple_desc.tableId);
-      }
-    }
-  }
-
-  // Collect the tableId for the table sink.
-  if (fragment.__isset.output_sink && fragment.output_sink.__isset.table_sink
-      && fragment.output_sink.type == TDataSinkType::TABLE_SINK) {
-    table_ids.insert(fragment.output_sink.table_sink.target_table_id);
-  }
-
-  // For DataStreamSinks that partition according to the partitioning scheme 
of a Kudu
-  // table, we need the corresponding tableId.
-  if (fragment.__isset.output_sink && fragment.output_sink.__isset.stream_sink
-      && fragment.output_sink.type == TDataSinkType::DATA_STREAM_SINK
-      && fragment.output_sink.stream_sink.output_partition.type == 
TPartitionType::KUDU) {
-    TDataPartition partition = 
fragment.output_sink.stream_sink.output_partition;
-    DCHECK_EQ(partition.partition_exprs.size(), 1);
-    DCHECK(partition.partition_exprs[0].nodes[0].__isset.kudu_partition_expr);
-    table_ids.insert(
-        
partition.partition_exprs[0].nodes[0].kudu_partition_expr.target_table_id);
-  }
-
-  // Iterate over all TTableDescriptor(s) and add the ones that are needed.
-  for (const TTableDescriptor& table_desc: desc_tbl_.tableDescriptors) {
-    if (table_ids.find(table_desc.id) == table_ids.end()) continue;
-    thrift_desc_tbl.tableDescriptors.push_back(table_desc);
-    thrift_desc_tbl.__isset.tableDescriptors = true;
-  }
-
-  rpc_params->query_ctx.__set_desc_tbl(thrift_desc_tbl);
-}
-
-namespace {
-
-// Make a PublishFilter rpc to 'impalad' for given fragment_instance_id
-// and params.
-// This takes by-value parameters because we cannot guarantee that the 
originating
-// coordinator won't be destroyed while this executes.
-// TODO: switch to references when we fix the lifecycle problems of 
coordinators.
-void DistributeFilters(shared_ptr<TPublishFilterParams> params,
-    TNetworkAddress impalad, TUniqueId fragment_instance_id) {
-  Status status;
-  ImpalaBackendConnection backend_client(
-      ExecEnv::GetInstance()->impalad_client_cache(), impalad, &status);
-  if (!status.ok()) return;
-  // Make a local copy of the shared 'master' set of parameters
-  TPublishFilterParams local_params(*params);
-  local_params.dst_instance_id = fragment_instance_id;
-  local_params.__set_bloom_filter(params->bloom_filter);
-  TPublishFilterResult res;
-  backend_client.DoRpc(&ImpalaBackendClient::PublishFilter, local_params, 
&res);
-};
-
-}
-
 // TODO: call this as soon as it's clear that we won't reference the state
 // anymore, ie, in CancelInternal() and when GetNext() hits eos
 void Coordinator::TearDown() {
-  DCHECK(!torn_down_) << "Coordinator::TearDown() may not be called twice";
+  DCHECK(!torn_down_) << "Coordinator::TearDown() must not be called twice";
   torn_down_ = true;
   if (filter_routing_table_.size() > 0) {
     query_profile_->AddInfoString("Final filter table", FilterDebugString());
@@ -1925,7 +1114,7 @@ void Coordinator::TearDown() {
 void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
       << "UpdateFilter() called although runtime filters are disabled";
-  DCHECK(exec_complete_barrier_.get() != NULL)
+  DCHECK(exec_complete_barrier_.get() != nullptr)
       << "Filters received before fragments started!";
   exec_complete_barrier_->Wait();
   DCHECK(filter_routing_table_complete_)
@@ -1933,7 +1122,7 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& 
params) {
 
   // Make a 'master' copy that will be shared by all concurrent delivery RPC 
attempts.
   shared_ptr<TPublishFilterParams> rpc_params(new TPublishFilterParams());
-  unordered_set<int> target_fragment_instance_state_idxs;
+  unordered_set<int> target_fragment_idxs;
   {
     lock_guard<SpinLock> l(filter_lock_);
     FilterRoutingTable::iterator it = 
filter_routing_table_.find(params.filter_id);
@@ -1971,13 +1160,11 @@ void Coordinator::UpdateFilter(const 
TUpdateFilterParams& params) {
       // Don't publish the filter to targets that are in the same fragment as 
the join
       // that produced it.
       if (target.is_local) continue;
-      target_fragment_instance_state_idxs.insert(
-          target.fragment_instance_state_idxs.begin(),
-          target.fragment_instance_state_idxs.end());
+      

<TRUNCATED>

Reply via email to