IMPALA-2905: Handle coordinator fragment lifecycle like all others

The plan-root fragment instance that runs on the coordinator should be
handled like all others: started via RPC and run asynchronously. Without
this, the fragment requires special-case code throughout the
coordinator, and does not show up in system metrics etc.

This patch adds a new sink type, PlanRootSink, to the root fragment
instance so that the coordinator can pull row batches that are pushed by
the root instance. The coordinator signals completion to the fragment
instance via closing the consumer side of the sink, whereupon the
instance is free to complete.

Since the root instance now runs asynchronously wrt to the coordinator,
we add several coordination methods to allow the coordinator to wait for
a point in the instance's execution to be hit - e.g. to wait until the
instance has been opened.

Done in this patch:

* Add PlanRootSink
* Add coordination to PFE to allow coordinator to observe lifecycle
* Make FragmentMgr a singleton
* Removed dead code from Coordinator::Wait() and elsewhere.
* Moved result output exprs out of QES and into PlanRootSink.
* Remove special-case limit-based teardown of coordinator fragment, and
  supporting functions in PlanFragmentExecutor.
* Simplified lifecycle of PlanFragmentExecutor by separating Open() into
  Open() and Exec(), the latter of which drives the sink by reading
  rows from the plan tree.
* Add child profile to PlanFragmentExecutor to measure time spent in
  each lifecycle phase.
* Removed dependency between InitExecProfiles() and starting root
  fragment.
* Removed mostly dead-code handling of LIMIT 0 queries.
* Ensured that SET returns a result set in all cases.
* Fix test_get_log() HS2 test. Errors are only guaranteed to be visible
  after fetch calls return EOS, but test was assuming this would happen
  after first fetch.

Change-Id: Ibb0064ec2f085fa3a5598ea80894fb489a01e4df
Reviewed-on: http://gerrit.cloudera.org:8080/4402
Tested-by: Internal Jenkins
Reviewed-by: Henry Robinson <he...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9f61397f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9f61397f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9f61397f

Branch: refs/heads/hadoop-next
Commit: 9f61397fc4d638aa78b37db2cd5b9c35b6deed94
Parents: 05b91a9
Author: Henry Robinson <he...@cloudera.com>
Authored: Wed Oct 5 11:48:01 2016 -0700
Committer: Henry Robinson <he...@cloudera.com>
Committed: Sun Oct 16 15:55:29 2016 +0000

----------------------------------------------------------------------
 be/src/exec/CMakeLists.txt                      |   1 +
 be/src/exec/data-sink.cc                        |   4 +
 be/src/exec/plan-root-sink.cc                   | 171 ++++++
 be/src/exec/plan-root-sink.h                    | 133 +++++
 be/src/runtime/coordinator.cc                   | 560 +++++++------------
 be/src/runtime/coordinator.h                    | 142 ++---
 be/src/runtime/exec-env.cc                      |  19 +-
 be/src/runtime/exec-env.h                       |   2 +
 be/src/runtime/plan-fragment-executor.cc        | 264 ++++-----
 be/src/runtime/plan-fragment-executor.h         | 149 ++---
 be/src/scheduling/query-schedule.cc             |  32 +-
 be/src/scheduling/query-schedule.h              |  27 +-
 be/src/scheduling/simple-scheduler.cc           |   5 -
 be/src/service/fragment-exec-state.cc           |   6 +-
 be/src/service/fragment-exec-state.h            |   8 +-
 be/src/service/fragment-mgr.cc                  |   8 +-
 be/src/service/impala-beeswax-server.cc         |  52 +-
 be/src/service/impala-hs2-server.cc             |  36 +-
 be/src/service/impala-internal-service.h        |  15 +-
 be/src/service/impala-server.cc                 |   5 +-
 be/src/service/impala-server.h                  |  47 --
 be/src/service/query-exec-state.cc              | 107 +---
 be/src/service/query-exec-state.h               |  11 +-
 be/src/service/query-result-set.h               |  64 +++
 be/src/testutil/in-process-servers.cc           |   4 +
 common/thrift/DataSinks.thrift                  |   7 +-
 .../org/apache/impala/analysis/QueryStmt.java   |   6 +
 .../org/apache/impala/planner/PlanRootSink.java |  39 ++
 .../java/org/apache/impala/planner/Planner.java |   2 +
 .../apache/impala/planner/PlannerContext.java   |   1 +
 .../queries/PlannerTest/aggregation.test        | 104 ++++
 .../queries/PlannerTest/analytic-fns.test       | 118 ++++
 .../PlannerTest/complex-types-file-formats.test |  14 +
 .../queries/PlannerTest/conjunct-ordering.test  |  30 +
 .../queries/PlannerTest/constant.test           |   4 +
 .../queries/PlannerTest/data-source-tables.test |  10 +
 .../PlannerTest/disable-preaggregations.test    |   4 +
 .../queries/PlannerTest/distinct-estimate.test  |   8 +
 .../queries/PlannerTest/distinct.test           |  54 ++
 .../queries/PlannerTest/empty.test              |  58 ++
 .../queries/PlannerTest/hbase.test              | 118 ++++
 .../queries/PlannerTest/hdfs.test               | 226 ++++++++
 .../queries/PlannerTest/implicit-joins.test     |  28 +
 .../queries/PlannerTest/inline-view-limit.test  |  58 ++
 .../queries/PlannerTest/inline-view.test        | 116 ++++
 .../queries/PlannerTest/join-order.test         |  72 +++
 .../queries/PlannerTest/joins.test              | 178 ++++++
 .../queries/PlannerTest/kudu-selectivity.test   |  16 +
 .../queries/PlannerTest/kudu.test               |  32 ++
 .../PlannerTest/mem-limit-broadcast-join.test   |   2 +
 .../queries/PlannerTest/nested-collections.test | 144 +++++
 .../queries/PlannerTest/nested-loop-join.test   |  12 +
 .../queries/PlannerTest/order.test              | 104 ++++
 .../queries/PlannerTest/outer-joins.test        |  54 ++
 .../PlannerTest/partition-key-scans.test        |  38 ++
 .../PlannerTest/predicate-propagation.test      |  90 +++
 .../PlannerTest/runtime-filter-propagation.test |  86 +++
 .../queries/PlannerTest/small-query-opt.test    |  42 ++
 .../queries/PlannerTest/subquery-rewrite.test   | 144 +++++
 .../queries/PlannerTest/topn.test               |  48 ++
 .../queries/PlannerTest/tpcds-all.test          | 146 +++++
 .../queries/PlannerTest/tpch-all.test           | 132 +++++
 .../queries/PlannerTest/tpch-kudu.test          |  44 ++
 .../queries/PlannerTest/tpch-nested.test        |  88 +++
 .../queries/PlannerTest/tpch-views.test         |  44 ++
 .../queries/PlannerTest/union.test              | 198 +++++++
 .../queries/PlannerTest/values.test             |  16 +
 .../queries/PlannerTest/views.test              |  48 ++
 .../queries/PlannerTest/with-clause.test        |  58 ++
 tests/custom_cluster/test_client_ssl.py         |   1 +
 tests/failure/test_failpoints.py                |   2 +-
 tests/hs2/test_hs2.py                           |  16 +-
 tests/hs2/test_json_endpoints.py                |   4 +
 tests/shell/util.py                             |   5 +-
 74 files changed, 3831 insertions(+), 910 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 571198f..fce5c81 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -77,6 +77,7 @@ add_library(Exec
   partitioned-hash-join-builder-ir.cc
   partitioned-hash-join-node.cc
   partitioned-hash-join-node-ir.cc
+  plan-root-sink.cc
   kudu-scanner.cc
   kudu-scan-node.cc
   kudu-table-sink.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index b6ec0ee..c95b854 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -26,6 +26,7 @@
 #include "exec/hdfs-table-sink.h"
 #include "exec/kudu-table-sink.h"
 #include "exec/kudu-util.h"
+#include "exec/plan-root-sink.h"
 #include "exprs/expr.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
@@ -94,6 +95,9 @@ Status DataSink::CreateDataSink(ObjectPool* pool,
       }
 
       break;
+    case TDataSinkType::PLAN_ROOT_SINK:
+      sink->reset(new PlanRootSink(row_desc, output_exprs, thrift_sink));
+      break;
     default:
       stringstream error_msg;
       map<int, const char*>::const_iterator i =

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/exec/plan-root-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
new file mode 100644
index 0000000..bd73953
--- /dev/null
+++ b/be/src/exec/plan-root-sink.cc
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/plan-root-sink.h"
+
+#include "exprs/expr-context.h"
+#include "exprs/expr.h"
+#include "runtime/row-batch.h"
+#include "runtime/tuple-row.h"
+#include "service/query-result-set.h"
+
+#include <memory>
+#include <boost/thread/mutex.hpp>
+
+using namespace std;
+using boost::unique_lock;
+using boost::mutex;
+
+namespace impala {
+
+const string PlanRootSink::NAME = "PLAN_ROOT_SINK";
+
+PlanRootSink::PlanRootSink(const RowDescriptor& row_desc,
+    const std::vector<TExpr>& output_exprs, const TDataSink& thrift_sink)
+  : DataSink(row_desc), thrift_output_exprs_(output_exprs) {}
+
+Status PlanRootSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
+  RETURN_IF_ERROR(DataSink::Prepare(state, mem_tracker));
+  RETURN_IF_ERROR(
+      Expr::CreateExprTrees(state->obj_pool(), thrift_output_exprs_, 
&output_expr_ctxs_));
+  RETURN_IF_ERROR(
+      Expr::Prepare(output_expr_ctxs_, state, row_desc_, 
expr_mem_tracker_.get()));
+
+  return Status::OK();
+}
+
+Status PlanRootSink::Open(RuntimeState* state) {
+  RETURN_IF_ERROR(Expr::Open(output_expr_ctxs_, state));
+  return Status::OK();
+}
+
+namespace {
+
+/// Validates that all collection-typed slots in the given batch are set to 
NULL.
+/// See SubplanNode for details on when collection-typed slots are set to NULL.
+/// TODO: This validation will become obsolete when we can return collection 
values.
+/// We will then need a different mechanism to assert the correct behavior of 
the
+/// SubplanNode with respect to setting collection-slots to NULL.
+void ValidateCollectionSlots(const RowDescriptor& row_desc, RowBatch* batch) {
+#ifndef NDEBUG
+  if (!row_desc.HasVarlenSlots()) return;
+  for (int i = 0; i < batch->num_rows(); ++i) {
+    TupleRow* row = batch->GetRow(i);
+    for (int j = 0; j < row_desc.tuple_descriptors().size(); ++j) {
+      const TupleDescriptor* tuple_desc = row_desc.tuple_descriptors()[j];
+      if (tuple_desc->collection_slots().empty()) continue;
+      for (int k = 0; k < tuple_desc->collection_slots().size(); ++k) {
+        const SlotDescriptor* slot_desc = tuple_desc->collection_slots()[k];
+        int tuple_idx = row_desc.GetTupleIdx(slot_desc->parent()->id());
+        const Tuple* tuple = row->GetTuple(tuple_idx);
+        if (tuple == NULL) continue;
+        DCHECK(tuple->IsNull(slot_desc->null_indicator_offset()));
+      }
+    }
+  }
+#endif
+}
+}
+
+Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
+  ValidateCollectionSlots(row_desc_, batch);
+  int current_batch_row = 0;
+  do {
+    unique_lock<mutex> l(lock_);
+    while (results_ == nullptr && !consumer_done_) sender_cv_.wait(l);
+    if (consumer_done_ || batch == nullptr) {
+      eos_ = true;
+      return Status::OK();
+    }
+
+    // Otherwise the consumer is ready. Fill out the rows.
+    DCHECK(results_ != nullptr);
+    // List of expr values to hold evaluated rows from the query
+    vector<void*> result_row;
+    result_row.resize(output_expr_ctxs_.size());
+
+    // List of scales for floating point values in result_row
+    vector<int> scales;
+    scales.resize(result_row.size());
+
+    int num_to_fetch = batch->num_rows() - current_batch_row;
+    if (num_rows_requested_ > 0) num_to_fetch = min(num_to_fetch, 
num_rows_requested_);
+    for (int i = 0; i < num_to_fetch; ++i) {
+      TupleRow* row = batch->GetRow(current_batch_row);
+      GetRowValue(row, &result_row, &scales);
+      RETURN_IF_ERROR(results_->AddOneRow(result_row, scales));
+      ++current_batch_row;
+    }
+    // Signal the consumer.
+    results_ = nullptr;
+    ExprContext::FreeLocalAllocations(output_expr_ctxs_);
+    consumer_cv_.notify_all();
+  } while (current_batch_row < batch->num_rows());
+  return Status::OK();
+}
+
+Status PlanRootSink::FlushFinal(RuntimeState* state) {
+  unique_lock<mutex> l(lock_);
+  sender_done_ = true;
+  eos_ = true;
+  consumer_cv_.notify_all();
+  return Status::OK();
+}
+
+void PlanRootSink::Close(RuntimeState* state) {
+  unique_lock<mutex> l(lock_);
+  // No guarantee that FlushFinal() has been called, so need to mark 
sender_done_ here as
+  // well.
+  sender_done_ = true;
+  consumer_cv_.notify_all();
+  // Wait for consumer to be done, in case sender tries to tear-down this sink 
while the
+  // sender is still reading from it.
+  while (!consumer_done_) sender_cv_.wait(l);
+  Expr::Close(output_expr_ctxs_, state);
+  DataSink::Close(state);
+}
+
+void PlanRootSink::CloseConsumer() {
+  unique_lock<mutex> l(lock_);
+  consumer_done_ = true;
+  sender_cv_.notify_all();
+}
+
+Status PlanRootSink::GetNext(
+    RuntimeState* state, QueryResultSet* results, int num_results, bool* eos) {
+  unique_lock<mutex> l(lock_);
+  DCHECK(!consumer_done_);
+
+  results_ = results;
+  num_rows_requested_ = num_results;
+  sender_cv_.notify_all();
+
+  while (!eos_ && results_ != nullptr && !sender_done_) consumer_cv_.wait(l);
+  *eos = eos_;
+  RETURN_IF_ERROR(state->CheckQueryState());
+  return Status::OK();
+}
+
+void PlanRootSink::GetRowValue(
+    TupleRow* row, vector<void*>* result, vector<int>* scales) {
+  DCHECK(result->size() >= output_expr_ctxs_.size());
+  for (int i = 0; i < output_expr_ctxs_.size(); ++i) {
+    (*result)[i] = output_expr_ctxs_[i]->GetValue(row);
+    (*scales)[i] = output_expr_ctxs_[i]->root()->output_scale();
+  }
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/exec/plan-root-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
new file mode 100644
index 0000000..cc7c045
--- /dev/null
+++ b/be/src/exec/plan-root-sink.h
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_PLAN_ROOT_SINK_H
+#define IMPALA_EXEC_PLAN_ROOT_SINK_H
+
+#include "exec/data-sink.h"
+
+#include <boost/thread/condition_variable.hpp>
+
+namespace impala {
+
+class TupleRow;
+class RowBatch;
+class QueryResultSet;
+class ExprContext;
+
+/// Sink which manages the handoff between a 'sender' (a fragment instance) 
that produces
+/// batches by calling Send(), and a 'consumer' (e.g. the coordinator) which 
consumes rows
+/// formed by computing a set of output expressions over the input batches, by 
calling
+/// GetNext(). Send() and GetNext() are called concurrently.
+///
+/// The consumer calls GetNext() with a QueryResultSet and a requested fetch
+/// size. GetNext() shares these fields with Send(), and then signals Send() 
to begin
+/// populating the result set. GetNext() returns when either the sender has 
sent all of
+/// its rows, or the requested fetch size has been satisfied.
+///
+/// Send() fills in as many rows as are requested from the current batch. When 
the batch
+/// is exhausted - which may take several calls to GetNext() - control is 
returned to the
+/// sender to produce another row batch.
+///
+/// Consumers must call CloseConsumer() when finished to allow the fragment to 
shut
+/// down. Senders must call Close() to signal to the consumer that no more 
batches will be
+/// produced.
+///
+/// The sink is thread safe up to a single producer and single consumer.
+///
+/// TODO: The consumer drives the sender in lock-step with GetNext() calls, 
forcing a
+/// context-switch on every invocation. Measure the impact of this, and 
consider moving to
+/// a fully asynchronous implementation with a queue to manage buffering 
between sender
+/// and consumer. See IMPALA-4268.
+class PlanRootSink : public DataSink {
+ public:
+  PlanRootSink(const RowDescriptor& row_desc, const std::vector<TExpr>& 
output_exprs,
+      const TDataSink& thrift_sink);
+
+  virtual std::string GetName() { return NAME; }
+
+  virtual Status Prepare(RuntimeState* state, MemTracker* tracker);
+
+  virtual Status Open(RuntimeState* state);
+
+  /// Sends a new batch. Ownership of 'batch' remains with the sender. Blocks 
until the
+  /// consumer has consumed 'batch' by calling GetNext().
+  virtual Status Send(RuntimeState* state, RowBatch* batch);
+
+  /// Sets eos and notifies consumer.
+  virtual Status FlushFinal(RuntimeState* state);
+
+  /// To be called by sender only. Signals to the consumer that no more 
batches will be
+  /// produced, then blocks until the consumer calls CloseConsumer().
+  virtual void Close(RuntimeState* state);
+
+  /// Populates 'result_set' with up to 'num_rows' rows produced by the 
fragment instance
+  /// that calls Send(). *eos is set to 'true' when there are no more rows to 
consume.
+  Status GetNext(
+      RuntimeState* state, QueryResultSet* result_set, int num_rows, bool* 
eos);
+
+  /// Signals to the producer that the sink will no longer be used. It's an 
error to call
+  /// GetNext() after this returns. May be called more than once; only the 
first call has
+  /// any effect.
+  void CloseConsumer();
+
+  static const std::string NAME;
+
+ private:
+  /// Protects all members, including the condition variables.
+  boost::mutex lock_;
+
+  /// Waited on by the sender only. Signalled when the consumer has written 
results_ and
+  /// num_rows_requested_, and so the sender may begin satisfying that request 
for rows
+  /// from its current batch. Also signalled when CloseConsumer() is called, 
to unblock
+  /// the sender.
+  boost::condition_variable sender_cv_;
+
+  /// Waited on by the consumer only. Signalled when the sender has finished 
serving a
+  /// request for rows. Also signalled by Close() and FlushFinal() to signal 
to the
+  /// consumer that no more rows are coming.
+  boost::condition_variable consumer_cv_;
+
+  /// Signals to producer that the consumer is done, and the sink may be torn 
down.
+  bool consumer_done_ = false;
+
+  /// Signals to consumer that the sender is done, and that there are no more 
row batches
+  /// to consume.
+  bool sender_done_ = false;
+
+  /// The current result set passed to GetNext(), to fill in Send(). Not owned 
by this
+  /// sink. Reset to nullptr after Send() completes the request to signal to 
the consumer
+  /// that it can return.
+  QueryResultSet* results_ = nullptr;
+
+  /// Set by GetNext() to indicate to Send() how many rows it should write to 
results_.
+  int num_rows_requested_ = 0;
+
+  /// Set to true in Send() and FlushFinal() when the Sink() has finished 
producing rows.
+  bool eos_ = false;
+
+  /// Output expressions to map plan row batches onto result set rows.
+  std::vector<TExpr> thrift_output_exprs_;
+  std::vector<ExprContext*> output_expr_ctxs_;
+
+  /// Writes a single row into 'result' and 'scales' by evaluating 
output_expr_ctxs_ over
+  /// 'row'.
+  void GetRowValue(TupleRow* row, std::vector<void*>* result, 
std::vector<int>* scales);
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index df4ad7b..4214d4d 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -39,22 +39,28 @@
 #include <errno.h>
 
 #include "common/logging.h"
-#include "exprs/expr.h"
 #include "exec/data-sink.h"
+#include "exec/plan-root-sink.h"
+#include "exec/scan-node.h"
+#include "gen-cpp/Frontend_types.h"
+#include "gen-cpp/ImpalaInternalService.h"
+#include "gen-cpp/ImpalaInternalService_constants.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "gen-cpp/Partitions_types.h"
+#include "gen-cpp/PlanNodes_types.h"
+#include "runtime/backend-client.h"
 #include "runtime/client-cache.h"
-#include "runtime/data-stream-sender.h"
 #include "runtime/data-stream-mgr.h"
+#include "runtime/data-stream-sender.h"
 #include "runtime/exec-env.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/parallel-executor.h"
 #include "runtime/plan-fragment-executor.h"
 #include "runtime/row-batch.h"
-#include "runtime/backend-client.h"
-#include "runtime/parallel-executor.h"
 #include "runtime/tuple-row.h"
 #include "scheduling/scheduler.h"
-#include "exec/data-sink.h"
-#include "exec/scan-node.h"
+#include "service/fragment-exec-state.h"
 #include "util/bloom-filter.h"
 #include "util/container-util.h"
 #include "util/counting-barrier.h"
@@ -67,12 +73,6 @@
 #include "util/summary-util.h"
 #include "util/table-printer.h"
 #include "util/uid-util.h"
-#include "gen-cpp/ImpalaInternalService.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
-#include "gen-cpp/Frontend_types.h"
-#include "gen-cpp/PlanNodes_types.h"
-#include "gen-cpp/Partitions_types.h"
-#include "gen-cpp/ImpalaInternalService_constants.h"
 
 #include "common/names.h"
 
@@ -240,8 +240,8 @@ class Coordinator::FragmentInstanceState {
   mutex lock_;
 
   /// If the status indicates an error status, execution of this fragment has 
either been
-  /// aborted by the remote impalad (which then reported the error) or 
cancellation has
-  /// been initiated; either way, execution must not be cancelled
+  /// aborted by the executing impalad (which then reported the error) or 
cancellation has
+  /// been initiated; either way, execution must not be cancelled.
   Status status_;
 
   /// If true, ExecPlanFragment() rpc has been sent.
@@ -377,15 +377,13 @@ Coordinator::Coordinator(const QuerySchedule& schedule, 
ExecEnv* exec_env,
     exec_env_(exec_env),
     has_called_wait_(false),
     returned_all_results_(false),
-    executor_(NULL), // Set in Prepare()
     query_mem_tracker_(), // Set in Exec()
     num_remaining_fragment_instances_(0),
     obj_pool_(new ObjectPool()),
     query_events_(events),
     filter_routing_table_complete_(false),
-    filter_mode_(schedule.query_options().runtime_filter_mode),
-    torn_down_(false) {
-}
+    filter_mode_(schedule_.query_options().runtime_filter_mode),
+    torn_down_(false) {}
 
 Coordinator::~Coordinator() {
   DCHECK(torn_down_) << "TearDown() must be called before Coordinator is 
destroyed";
@@ -448,14 +446,15 @@ static void ProcessQueryOptions(
       << "because nodes cannot be cancelled in Close()";
 }
 
-Status Coordinator::Exec(vector<ExprContext*>* output_expr_ctxs) {
+Status Coordinator::Exec() {
   const TQueryExecRequest& request = schedule_.request();
   DCHECK(request.fragments.size() > 0 || request.mt_plan_exec_info.size() > 0);
+
   needs_finalization_ = request.__isset.finalize_params;
   if (needs_finalization_) finalize_params_ = request.finalize_params;
 
   VLOG_QUERY << "Exec() query_id=" << schedule_.query_id()
-      << " stmt=" << request.query_ctx.request.stmt;
+             << " stmt=" << request.query_ctx.request.stmt;
   stmt_type_ = request.stmt_type;
   query_id_ = schedule_.query_id();
   desc_tbl_ = request.desc_tbl;
@@ -468,18 +467,6 @@ Status Coordinator::Exec(vector<ExprContext*>* 
output_expr_ctxs) {
 
   SCOPED_TIMER(query_profile_->total_time_counter());
 
-  // After the coordinator fragment is started, it may call UpdateFilter() 
asynchronously,
-  // which waits on this barrier for completion.
-  // TODO: remove special treatment of coord fragment
-  int num_remote_instances = schedule_.GetNumRemoteFInstances();
-  if (num_remote_instances > 0) {
-    exec_complete_barrier_.reset(new CountingBarrier(num_remote_instances));
-  }
-  num_remaining_fragment_instances_ = num_remote_instances;
-
-  // TODO: move initial setup into a separate function; right now part of it
-  // (InitExecProfile()) depends on the coordinator fragment having been 
started
-
   // initialize progress updater
   const string& str = Substitute("Query $0", PrintId(query_id_));
   progress_.Init(str, schedule_.num_scan_ranges());
@@ -489,64 +476,51 @@ Status Coordinator::Exec(vector<ExprContext*>* 
output_expr_ctxs) {
   // execution at Impala daemons where it hasn't even started
   lock_guard<mutex> l(lock_);
 
-  bool has_coordinator_fragment = schedule_.GetCoordFragment() != NULL;
-  if (has_coordinator_fragment) {
-    // Start this before starting any more plan
-    // fragments, otherwise they start sending data before the local exchange 
node had a
-    // chance to register with the stream mgr.
-    // TODO: This is no longer necessary (see IMPALA-1599). Consider starting 
all
-    // fragments in the same way with no coordinator special case.
-    RETURN_IF_ERROR(PrepareCoordFragment(output_expr_ctxs));
-  } else {
-    // The coordinator instance may require a query mem tracker even if there 
is no
-    // coordinator fragment. For example, result-caching tracks memory via the 
query mem
-    // tracker.
-    // If there is a fragment, the fragment executor created above initializes 
the query
-    // mem tracker. If not, the query mem tracker is created here.
-    int64_t query_limit = -1;
-    if (query_ctx_.request.query_options.__isset.mem_limit &&
-        query_ctx_.request.query_options.mem_limit > 0) {
-      query_limit = query_ctx_.request.query_options.mem_limit;
-    }
-    MemTracker* pool_tracker = MemTracker::GetRequestPoolMemTracker(
-        schedule_.request_pool(), exec_env_->process_mem_tracker());
-    query_mem_tracker_ =
-        MemTracker::GetQueryMemTracker(query_id_, query_limit, pool_tracker);
-
-    executor_.reset(NULL);
-  }
+  // The coordinator may require a query mem tracker for result-caching, which 
tracks
+  // memory via the query mem tracker.
+  int64_t query_limit = -1;
+  if (query_ctx_.request.query_options.__isset.mem_limit
+      && query_ctx_.request.query_options.mem_limit > 0) {
+    query_limit = query_ctx_.request.query_options.mem_limit;
+  }
+  MemTracker* pool_tracker = MemTracker::GetRequestPoolMemTracker(
+      schedule_.request_pool(), exec_env_->process_mem_tracker());
+  query_mem_tracker_ =
+      MemTracker::GetQueryMemTracker(query_id_, query_limit, pool_tracker);
+  DCHECK(query_mem_tracker() != nullptr);
   filter_mem_tracker_.reset(
       new MemTracker(-1, "Runtime Filter (Coordinator)", query_mem_tracker(), 
false));
 
-  // initialize execution profile structures
+  // Initialize the execution profile structures.
   bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0;
   if (is_mt_execution) {
     MtInitExecProfiles();
     MtInitExecSummary();
+    MtStartFInstances();
   } else {
     InitExecProfile(request);
+    StartFragments();
   }
 
-  if (num_remote_instances > 0) {
-    // pre-size fragment_instance_states_ in order to directly address by 
instance idx
-    // when creating FragmentInstanceStates (instead of push_back())
-    int num_fragment_instances = schedule_.GetTotalFInstances();
-    DCHECK_GT(num_fragment_instances, 0);
-    fragment_instance_states_.resize(num_fragment_instances);
+  RETURN_IF_ERROR(FinishInstanceStartup());
 
-    if (is_mt_execution) {
-      MtStartRemoteFInstances();
-    } else {
-      StartRemoteFragments();
-    }
-    RETURN_IF_ERROR(FinishRemoteInstanceStartup());
+  // Grab executor and wait until Prepare() has finished so that runtime state 
etc. will
+  // be set up.
+  if (schedule_.GetCoordFragment() != nullptr) {
+    // Coordinator fragment instance has same ID as query.
+    shared_ptr<FragmentMgr::FragmentExecState> root_fragment_instance =
+        
ExecEnv::GetInstance()->fragment_mgr()->GetFragmentExecState(query_id_);
+    DCHECK(root_fragment_instance.get() != nullptr);
+    executor_ = root_fragment_instance->executor();
 
-    // If we have a coordinator fragment and remote fragments (the common 
case), release
-    // the thread token on the coordinator fragment. This fragment spends most 
of the time
-    // waiting and doing very little work. Holding on to the token causes 
underutilization
-    // of the machine. If there are 12 queries on this node, that's 12 tokens 
reserved for
-    // no reason.
-    if (has_coordinator_fragment) executor_->ReleaseThreadToken();
+    // When WaitForPrepare() returns OK(), the executor's root sink will be 
set up. At
+    // that point, the coordinator must be sure to call 
root_sink()->CloseConsumer(); the
+    // fragment instance's executor will not complete until that point.
+    // TODO: Consider moving this to Wait().
+    Status prepare_status = executor_->WaitForPrepare();
+    root_sink_ = executor_->root_sink();
+    RETURN_IF_ERROR(prepare_status);
+    DCHECK(root_sink_ != nullptr);
   }
 
   PrintFragmentInstanceInfo();
@@ -610,60 +584,14 @@ void Coordinator::UpdateFilterRoutingTable(const 
vector<TPlanNode>& plan_nodes,
   }
 }
 
-Status Coordinator::PrepareCoordFragment(vector<ExprContext*>* 
output_expr_ctxs) {
-  const TQueryExecRequest& request = schedule_.request();
-  bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0;
-  if (!is_mt_execution && filter_mode_ != TRuntimeFilterMode::OFF) {
-    UpdateFilterRoutingTable(schedule_.GetCoordFragment()->plan.nodes, 1, 0);
-    if (schedule_.GetNumFragmentInstances() == 0) 
MarkFilterRoutingTableComplete();
-  }
+void Coordinator::StartFragments() {
+  int num_fragment_instances = schedule_.GetNumFragmentInstances();
+  DCHECK_GT(num_fragment_instances, 0);
 
-  // create rpc params and FragmentInstanceState for the coordinator fragment
-  TExecPlanFragmentParams rpc_params;
-  FragmentInstanceState* coord_state = nullptr;
-  if (is_mt_execution) {
-    const FInstanceExecParams& coord_params = 
schedule_.GetCoordInstanceExecParams();
-    MtSetExecPlanFragmentParams(coord_params, &rpc_params);
-    coord_state = obj_pool()->Add(
-        new FragmentInstanceState(coord_params, obj_pool()));
-  } else {
-    const TPlanFragment& coord_fragment = *schedule_.GetCoordFragment();
-    SetExecPlanFragmentParams(
-        coord_fragment, schedule_.exec_params()[0], 0, &rpc_params);
-    coord_state = obj_pool()->Add(
-        new FragmentInstanceState(
-          coord_fragment.idx, schedule_.exec_params()[0], 0, obj_pool()));
-    // apparently this was never called for the coordinator fragment
-    // TODO: fix this
-    //exec_state->ComputeTotalSplitSize(
-        //rpc_params.fragment_instance_ctx.per_node_scan_ranges);
-  }
-  // register state before calling Prepare(), in case it fails
-  DCHECK_EQ(GetInstanceIdx(coord_state->fragment_instance_id()), 0);
-  fragment_instance_states_.push_back(coord_state);
-  DCHECK(coord_state != nullptr);
-  DCHECK_EQ(fragment_instance_states_.size(), 1);
-  executor_.reset(new PlanFragmentExecutor(
-      exec_env_, PlanFragmentExecutor::ReportStatusCallback()));
-  RETURN_IF_ERROR(executor_->Prepare(rpc_params));
-  coord_state->set_profile(executor_->profile());
-
-  // Prepare output_expr_ctxs before optimizing the LLVM module. The other 
exprs of this
-  // coordinator fragment have been prepared in executor_->Prepare().
-  DCHECK(output_expr_ctxs != NULL);
-  RETURN_IF_ERROR(Expr::CreateExprTrees(
-      runtime_state()->obj_pool(), schedule_.GetCoordFragment()->output_exprs,
-      output_expr_ctxs));
-  MemTracker* output_expr_tracker = runtime_state()->obj_pool()->Add(new 
MemTracker(
-      -1, "Output exprs", runtime_state()->instance_mem_tracker(), false));
-  RETURN_IF_ERROR(Expr::Prepare(
-      *output_expr_ctxs, runtime_state(), row_desc(), output_expr_tracker));
+  fragment_instance_states_.resize(num_fragment_instances);
+  exec_complete_barrier_.reset(new CountingBarrier(num_fragment_instances));
+  num_remaining_fragment_instances_ = num_fragment_instances;
 
-  return Status::OK();
-}
-
-void Coordinator::StartRemoteFragments() {
-  int num_fragment_instances = schedule_.GetNumFragmentInstances();
   DebugOptions debug_options;
   ProcessQueryOptions(schedule_.query_options(), &debug_options);
   const TQueryExecRequest& request = schedule_.request();
@@ -671,19 +599,14 @@ void Coordinator::StartRemoteFragments() {
   VLOG_QUERY << "starting " << num_fragment_instances << " fragment instances 
for query "
              << query_id_;
   query_events_->MarkEvent(
-      Substitute("Ready to start $0 remote fragment instances", 
num_fragment_instances));
+      Substitute("Ready to start $0 fragments", num_fragment_instances));
 
-  bool has_coordinator_fragment =
-      request.fragments[0].partition.type == TPartitionType::UNPARTITIONED;
-  int instance_state_idx = has_coordinator_fragment ? 1 : 0;
-  int first_remote_fragment_idx = has_coordinator_fragment ? 1 : 0;
+  int instance_state_idx = 0;
   if (filter_mode_ != TRuntimeFilterMode::OFF) {
-    // Populate the runtime filter routing table. This should happen before 
starting
-    // the remote fragments.
-    // This code anticipates the indices of the instance states created later 
on in
-    // ExecRemoteFragment()
-    for (int fragment_idx = first_remote_fragment_idx;
-         fragment_idx < request.fragments.size(); ++fragment_idx) {
+    // Populate the runtime filter routing table. This should happen before 
starting the
+    // fragment instances. This code anticipates the indices of the instance 
states
+    // created later on in ExecRemoteFragment()
+    for (int fragment_idx = 0; fragment_idx < request.fragments.size(); 
++fragment_idx) {
       const FragmentExecParams& params = schedule_.exec_params()[fragment_idx];
       int num_hosts = params.hosts.size();
       DCHECK_GT(num_hosts, 0);
@@ -697,8 +620,7 @@ void Coordinator::StartRemoteFragments() {
   int num_instances = 0;
   // Start one fragment instance per fragment per host (number of hosts 
running each
   // fragment may not be constant).
-  for (int fragment_idx = first_remote_fragment_idx;
-       fragment_idx < request.fragments.size(); ++fragment_idx) {
+  for (int fragment_idx = 0; fragment_idx < request.fragments.size(); 
++fragment_idx) {
     const FragmentExecParams& params = schedule_.exec_params()[fragment_idx];
     int num_hosts = params.hosts.size();
     DCHECK_GT(num_hosts, 0);
@@ -719,11 +641,17 @@ void Coordinator::StartRemoteFragments() {
   }
   exec_complete_barrier_->Wait();
   query_events_->MarkEvent(
-      Substitute("All $0 remote fragments instances started", num_instances));
+      Substitute("All $0 fragments instances started", num_instances));
 }
 
-void Coordinator::MtStartRemoteFInstances() {
+void Coordinator::MtStartFInstances() {
   int num_fragment_instances = schedule_.GetNumFragmentInstances();
+  DCHECK_GT(num_fragment_instances, 0);
+
+  fragment_instance_states_.resize(num_fragment_instances);
+  exec_complete_barrier_.reset(new CountingBarrier(num_fragment_instances));
+  num_remaining_fragment_instances_ = num_fragment_instances;
+
   DebugOptions debug_options;
   ProcessQueryOptions(schedule_.query_options(), &debug_options);
   const TQueryExecRequest& request = schedule_.request();
@@ -731,7 +659,7 @@ void Coordinator::MtStartRemoteFInstances() {
   VLOG_QUERY << "starting " << num_fragment_instances << " fragment instances 
for query "
              << query_id_;
   query_events_->MarkEvent(
-      Substitute("Ready to start $0 remote fragment instances", 
num_fragment_instances));
+      Substitute("Ready to start $0 fragment instances", 
num_fragment_instances));
 
   // TODO: populate the runtime filter routing table
   // this requires local aggregation of filters prior to sending
@@ -739,7 +667,6 @@ void Coordinator::MtStartRemoteFInstances() {
 
   int num_instances = 0;
   for (const MtFragmentExecParams& fragment_params: 
schedule_.mt_fragment_exec_params()) {
-    if (fragment_params.is_coord_fragment) continue;
     for (int i = 0; i < fragment_params.instance_exec_params.size();
         ++i, ++num_instances) {
       const FInstanceExecParams& instance_params =
@@ -760,10 +687,10 @@ void Coordinator::MtStartRemoteFInstances() {
   VLOG_QUERY << "started " << num_fragment_instances << " fragment instances 
for query "
       << query_id_;
   query_events_->MarkEvent(
-      Substitute("All $0 remote fragment instances started", num_instances));
+      Substitute("All $0 fragment instances started", num_instances));
 }
 
-Status Coordinator::FinishRemoteInstanceStartup() {
+Status Coordinator::FinishInstanceStartup() {
   Status status = Status::OK();
   const TMetricDef& def =
       MakeTMetricDef("fragment-latencies", TMetricKind::HISTOGRAM, 
TUnit::TIME_MS);
@@ -868,7 +795,7 @@ Status Coordinator::UpdateStatus(const Status& status, 
const TUniqueId& instance
   {
     lock_guard<mutex> l(lock_);
 
-    // The query is done and we are just waiting for remote fragments to clean 
up.
+    // The query is done and we are just waiting for fragment instances to 
clean up.
     // Ignore their cancelled updates.
     if (returned_all_results_ && status.IsCancelled()) return query_status_;
 
@@ -1187,7 +1114,8 @@ Status Coordinator::WaitForAllInstances() {
   if (query_status_.ok()) {
     VLOG_QUERY << "All fragment instances finished successfully.";
   } else {
-    VLOG_QUERY << "All fragment instances finished due to one or more errors.";
+    VLOG_QUERY << "All fragment instances finished due to one or more errors. "
+               << query_status_.GetDetail();
   }
 
   return query_status_;
@@ -1198,143 +1126,84 @@ Status Coordinator::Wait() {
   SCOPED_TIMER(query_profile_->total_time_counter());
   if (has_called_wait_) return Status::OK();
   has_called_wait_ = true;
-  Status return_status = Status::OK();
-  if (executor_.get() != NULL) {
-    // Open() may block
-    return_status = UpdateStatus(executor_->Open(),
-        runtime_state()->fragment_instance_id(), FLAGS_hostname);
-
-    if (return_status.ok()) {
-      // If the coordinator fragment has a sink, it will have finished 
executing at this
-      // point.  It's safe therefore to copy the set of files to move and 
updated
-      // partitions into the query-wide set.
-      RuntimeState* state = runtime_state();
-      DCHECK(state != NULL);
-
-      // No other instances should have updated these structures if the 
coordinator has a
-      // fragment. (Instances have a sink only if the coordinator does not)
-      DCHECK_EQ(files_to_move_.size(), 0);
-      DCHECK_EQ(per_partition_status_.size(), 0);
-
-      // Because there are no other updates, safe to copy the maps rather than 
merge them.
-      files_to_move_ = *state->hdfs_files_to_move();
-      per_partition_status_ = *state->per_partition_status();
-    }
-  } else {
-    // Query finalization can only happen when all instances have reported
-    // relevant state. They only have relevant state to report in the parallel
-    // INSERT case, otherwise all the relevant state is from the coordinator
-    // fragment which will be available after Open() returns.
-    // Ignore the returned status if finalization is required., since 
FinalizeQuery() will
-    // pick it up and needs to execute regardless.
-    Status status = WaitForAllInstances();
-    if (!needs_finalization_ && !status.ok()) return status;
-  }
 
-  // Query finalization is required only for HDFS table sinks
-  if (needs_finalization_) {
-    RETURN_IF_ERROR(FinalizeQuery());
+  if (stmt_type_ == TStmtType::QUERY) {
+    DCHECK(executor_ != nullptr);
+    return UpdateStatus(executor_->WaitForOpen(), 
runtime_state()->fragment_instance_id(),
+        FLAGS_hostname);
   }
 
-  if (stmt_type_ == TStmtType::DML) {
-    query_profile_->AddInfoString("Insert Stats",
-        DataSink::OutputInsertStats(per_partition_status_, "\n"));
-    // For DML queries, when Wait is done, the query is complete.  Report 
aggregate
-    // query profiles at this point.
-    // TODO: make sure ReportQuerySummary gets called on error
-    ReportQuerySummary();
-  }
+  DCHECK_EQ(stmt_type_, TStmtType::DML);
+  // Query finalization can only happen when all backends have reported
+  // relevant state. They only have relevant state to report in the parallel
+  // INSERT case, otherwise all the relevant state is from the coordinator
+  // fragment which will be available after Open() returns.
+  // Ignore the returned status if finalization is required., since 
FinalizeQuery() will
+  // pick it up and needs to execute regardless.
+  Status status = WaitForAllInstances();
+  if (!needs_finalization_ && !status.ok()) return status;
 
-  if (filter_routing_table_.size() > 0) {
-    query_profile_->AddInfoString("Final filter table", FilterDebugString());
-  }
+  // Query finalization is required only for HDFS table sinks
+  if (needs_finalization_) RETURN_IF_ERROR(FinalizeQuery());
 
-  return return_status;
+  query_profile_->AddInfoString(
+      "Insert Stats", DataSink::OutputInsertStats(per_partition_status_, 
"\n"));
+  // For DML queries, when Wait is done, the query is complete.  Report 
aggregate
+  // query profiles at this point.
+  // TODO: make sure ReportQuerySummary gets called on error
+  ReportQuerySummary();
+
+  return status;
 }
 
-Status Coordinator::GetNext(RowBatch** batch, RuntimeState* state) {
+Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
   VLOG_ROW << "GetNext() query_id=" << query_id_;
   DCHECK(has_called_wait_);
   SCOPED_TIMER(query_profile_->total_time_counter());
 
-  if (executor_.get() == NULL) {
-    // If there is no local fragment, we produce no output, and execution will
-    // have finished after Wait.
-    *batch = NULL;
-    return GetStatus();
-  }
-
-  // do not acquire lock_ here, otherwise we could block and prevent an async
-  // Cancel() from proceeding
-  Status status = executor_->GetNext(batch);
+  DCHECK(root_sink_ != nullptr)
+      << "GetNext() called without result sink. Perhaps Prepare() failed and 
was not "
+      << "checked?";
+  Status status = root_sink_->GetNext(runtime_state(), results, max_rows, eos);
 
   // if there was an error, we need to return the query's error status rather 
than
   // the status we just got back from the local executor (which may well be 
CANCELLED
   // in that case).  Coordinator fragment failed in this case so we log the 
query_id.
-  RETURN_IF_ERROR(UpdateStatus(status, runtime_state()->fragment_instance_id(),
-      FLAGS_hostname));
+  RETURN_IF_ERROR(
+      UpdateStatus(status, runtime_state()->fragment_instance_id(), 
FLAGS_hostname));
 
-  if (*batch == NULL) {
+  if (*eos) {
     returned_all_results_ = true;
-    if (executor_->ReachedLimit()) {
-      // We've reached the query limit, cancel the remote fragments.  The
-      // Exchange node on our fragment is no longer receiving rows so the
-      // remote fragments must be explicitly cancelled.
-      CancelRemoteFragments();
-      RuntimeState* state = runtime_state();
-      if (state != NULL) {
-        // Cancel the streams receiving batches.  The exchange nodes that would
-        // normally read from the streams are done.
-        state->stream_mgr()->Cancel(state->fragment_instance_id());
-      }
-    }
-
-    // Don't return final NULL until all instances have completed.
-    // GetNext must wait for all instances to complete before
-    // ultimately signalling the end of execution via a NULL
-    // batch. After NULL is returned, the coordinator may tear down
-    // query state, and perform post-query finalization which might
-    // depend on the reports from all instances.
+    // Trigger tear-down of coordinator fragment by closing the consumer. Must 
do before
+    // WaitForAllInstances().
+    root_sink_->CloseConsumer();
+    root_sink_ = nullptr;
+
+    // Don't return final NULL until all instances have completed.  GetNext 
must wait for
+    // all instances to complete before ultimately signalling the end of 
execution via a
+    // NULL batch. After NULL is returned, the coordinator may tear down query 
state, and
+    // perform post-query finalization which might depend on the reports from 
all
+    // instances.
+    //
+    // TODO: Waiting should happen in TearDown() (and then we wouldn't need to 
call
+    // CloseConsumer() here). See IMPALA-4275 for details.
     RETURN_IF_ERROR(WaitForAllInstances());
     if (query_status_.ok()) {
       // If the query completed successfully, report aggregate query profiles.
       ReportQuerySummary();
     }
-  } else {
-#ifndef NDEBUG
-    ValidateCollectionSlots(*batch);
-#endif
   }
 
   return Status::OK();
 }
 
-void Coordinator::ValidateCollectionSlots(RowBatch* batch) {
-  const RowDescriptor& row_desc = executor_->row_desc();
-  if (!row_desc.HasVarlenSlots()) return;
-  for (int i = 0; i < batch->num_rows(); ++i) {
-    TupleRow* row = batch->GetRow(i);
-    for (int j = 0; j < row_desc.tuple_descriptors().size(); ++j) {
-      const TupleDescriptor* tuple_desc = row_desc.tuple_descriptors()[j];
-      if (tuple_desc->collection_slots().empty()) continue;
-      for (int k = 0; k < tuple_desc->collection_slots().size(); ++k) {
-        const SlotDescriptor* slot_desc = tuple_desc->collection_slots()[k];
-        int tuple_idx = row_desc.GetTupleIdx(slot_desc->parent()->id());
-        const Tuple* tuple = row->GetTuple(tuple_idx);
-        if (tuple == NULL) continue;
-        DCHECK(tuple->IsNull(slot_desc->null_indicator_offset()));
-      }
-    }
-  }
-}
-
 void Coordinator::PrintFragmentInstanceInfo() {
   for (FragmentInstanceState* state: fragment_instance_states_) {
     SummaryStats& acc = 
fragment_profiles_[state->fragment_idx()].bytes_assigned;
     acc(state->total_split_size());
   }
 
-  for (int id = (executor_.get() == NULL ? 0 : 1); id < 
fragment_profiles_.size(); ++id) {
+  for (int id = (executor_ == NULL ? 0 : 1); id < fragment_profiles_.size(); 
++id) {
     SummaryStats& acc = fragment_profiles_[id].bytes_assigned;
     double min = accumulators::min(acc);
     double max = accumulators::max(acc);
@@ -1360,6 +1229,7 @@ void Coordinator::PrintFragmentInstanceInfo() {
 
 void Coordinator::InitExecProfile(const TQueryExecRequest& request) {
   // Initialize the structure to collect execution summary of every plan node.
+  fragment_profiles_.resize(request.fragments.size());
   exec_summary_.__isset.nodes = true;
   for (int i = 0; i < request.fragments.size(); ++i) {
     if (!request.fragments[i].__isset.plan) continue;
@@ -1394,46 +1264,30 @@ void Coordinator::InitExecProfile(const 
TQueryExecRequest& request) {
     }
   }
 
-  if (executor_.get() != NULL) {
-    // register coordinator's fragment profile now, before those of the 
backends,
-    // so it shows up at the top
-    query_profile_->AddChild(executor_->profile());
-    executor_->profile()->set_name(Substitute("Coordinator Fragment $0",
-        request.fragments[0].display_name));
-    CollectScanNodeCounters(executor_->profile(), &coordinator_counters_);
-  }
-
   // Initialize the runtime profile structure. This adds the per fragment 
average
   // profiles followed by the per fragment instance profiles.
-  bool has_coordinator_fragment =
-      request.fragments[0].partition.type == TPartitionType::UNPARTITIONED;
-  fragment_profiles_.resize(request.fragments.size());
   for (int i = 0; i < request.fragments.size(); ++i) {
-    fragment_profiles_[i].num_instances = 0;
-
-    // Special case fragment idx 0 if there is a coordinator. There is only one
-    // instance of this profile so the average is just the coordinator profile.
-    if (i == 0 && has_coordinator_fragment) {
-      fragment_profiles_[i].averaged_profile = executor_->profile();
-      fragment_profiles_[i].num_instances = 1;
-      continue;
-    }
-    fragment_profiles_[i].averaged_profile =
-        obj_pool()->Add(new RuntimeProfile(obj_pool(),
-            Substitute("Averaged Fragment $0", 
request.fragments[i].display_name), true));
-    // Insert the avg profiles in ascending fragment number order. If
-    // there is a coordinator fragment, it's been placed in
-    // fragment_profiles_[0].averaged_profile, ensuring that this code
-    // will put the first averaged profile immediately after it. If
-    // there is no coordinator fragment, the first averaged profile
-    // will be inserted as the first child of query_profile_, and then
-    // all other averaged fragments will follow.
-    query_profile_->AddChild(fragment_profiles_[i].averaged_profile, true,
-        (i > 0) ? fragment_profiles_[i-1].averaged_profile : NULL);
-
+    // Insert the avg profiles in ascending fragment number order. If there is 
a
+    // coordinator fragment, it's been placed in 
fragment_profiles_[0].averaged_profile,
+    // ensuring that this code will put the first averaged profile immediately 
after
+    // it. If there is no coordinator fragment, the first averaged profile 
will be
+    // inserted as the first child of query_profile_, and then all other 
averaged
+    // fragments will follow.
+    bool is_coordinator_fragment = (i == 0 && schedule_.GetCoordFragment() != 
nullptr);
+    string profile_name =
+        Substitute(is_coordinator_fragment ? "Coordinator Fragment $0" : 
"Fragment $0",
+            request.fragments[i].display_name);
     fragment_profiles_[i].root_profile =
-        obj_pool()->Add(new RuntimeProfile(obj_pool(),
-            Substitute("Fragment $0", request.fragments[i].display_name)));
+        obj_pool()->Add(new RuntimeProfile(obj_pool(), profile_name));
+    if (is_coordinator_fragment) {
+      fragment_profiles_[i].averaged_profile = nullptr;
+    } else {
+      fragment_profiles_[i].averaged_profile = obj_pool()->Add(new 
RuntimeProfile(
+          obj_pool(),
+          Substitute("Averaged Fragment $0", 
request.fragments[i].display_name), true));
+      query_profile_->AddChild(fragment_profiles_[i].averaged_profile, true,
+          (i > 0) ? fragment_profiles_[i - 1].averaged_profile : NULL);
+    }
     // Note: we don't start the wall timer here for the fragment
     // profile; it's uninteresting and misleading.
     query_profile_->AddChild(fragment_profiles_[i].root_profile);
@@ -1490,40 +1344,23 @@ void Coordinator::MtInitExecProfiles() {
   schedule_.GetTPlanFragments(&fragments);
   fragment_profiles_.resize(fragments.size());
 
-  // start with coordinator fragment, if there is one
   const TPlanFragment* coord_fragment = schedule_.GetCoordFragment();
-  if (coord_fragment != NULL) {
-    DCHECK(executor_.get() != NULL);
-    PerFragmentProfileData* data = &fragment_profiles_[coord_fragment->idx];
-    data->num_instances = 1;
-    // TODO: fix this; this is not an averaged profile; we should follow the 
exact
-    // same structure we have for all other profiles (average + root + single
-    // instance profile)
-    data->averaged_profile = executor_->profile();
-
-    // register coordinator's fragment profile in the query profile now, 
before those
-    // of the fragment instances, so it shows up at the top
-    query_profile_->AddChild(executor_->profile());
-    executor_->profile()->set_name(Substitute("Coordinator Fragment $0",
-        coord_fragment->display_name));
-    CollectScanNodeCounters(executor_->profile(), &coordinator_counters_);
-  }
 
   // Initialize the runtime profile structure. This adds the per fragment 
average
   // profiles followed by the per fragment instance profiles.
   for (const TPlanFragment* fragment: fragments) {
-    if (fragment == coord_fragment) continue;
+    string profile_name =
+        (fragment == coord_fragment) ? "Coordinator Fragment $0" : "Fragment 
$0";
     PerFragmentProfileData* data = &fragment_profiles_[fragment->idx];
     data->num_instances =
         
schedule_.GetFragmentExecParams(fragment->idx).instance_exec_params.size();
-
-    data->averaged_profile =
-        obj_pool()->Add(new RuntimeProfile(obj_pool(),
-          Substitute("Averaged Fragment $0", fragment->display_name), true));
-    query_profile_->AddChild(data->averaged_profile, true);
-    data->root_profile =
-        obj_pool()->Add(new RuntimeProfile(obj_pool(),
-          Substitute("Fragment $0", fragment->display_name)));
+    if (fragment != coord_fragment) {
+      data->averaged_profile = obj_pool()->Add(new RuntimeProfile(
+          obj_pool(), Substitute("Averaged Fragment $0", 
fragment->display_name), true));
+      query_profile_->AddChild(data->averaged_profile, true);
+    }
+    data->root_profile = obj_pool()->Add(
+        new RuntimeProfile(obj_pool(), Substitute(profile_name, 
fragment->display_name)));
     // Note: we don't start the wall timer here for the fragment profile;
     // it's uninteresting and misleading.
     query_profile_->AddChild(data->root_profile);
@@ -1637,8 +1474,6 @@ void Coordinator::ExecRemoteFragment(const 
FragmentExecParams& fragment_exec_par
             << " instance_id=" << PrintId(exec_state->fragment_instance_id())
             << " host=" << exec_state->impalad_address();
 
-  // Guard against concurrent UpdateExecStatus() that may arrive after RPC 
returns.
-  lock_guard<mutex> l(*exec_state->lock());
   int64_t start = MonotonicMillis();
 
   Status client_connect_status;
@@ -1685,32 +1520,27 @@ void Coordinator::Cancel(const Status* cause) {
   // if the query status indicates an error, cancellation has already been 
initiated
   if (!query_status_.ok()) return;
   // prevent others from cancelling a second time
+
+  // TODO: This should default to OK(), not CANCELLED if there is no cause (or 
callers
+  // should explicitly pass Status::OK()). Fragment instances may be cancelled 
at the end
+  // of a successful query. Need to clean up relationship between 
query_status_ here and
+  // in QueryExecState. See IMPALA-4279.
   query_status_ = (cause != NULL && !cause->ok()) ? *cause : Status::CANCELLED;
   CancelInternal();
 }
 
 void Coordinator::CancelInternal() {
   VLOG_QUERY << "Cancel() query_id=" << query_id_;
-  DCHECK(!query_status_.ok());
-
-  // cancel local fragment
-  if (executor_.get() != NULL) executor_->Cancel();
-
-  CancelRemoteFragments();
+  CancelFragmentInstances();
 
   // Report the summary with whatever progress the query made before being 
cancelled.
   ReportQuerySummary();
 }
 
-void Coordinator::CancelRemoteFragments() {
+void Coordinator::CancelFragmentInstances() {
+  int num_cancelled = 0;
   for (FragmentInstanceState* exec_state: fragment_instance_states_) {
     DCHECK(exec_state != nullptr);
-    if (exec_state->fragment_idx() == 0) continue;  // the coord fragment
-
-    // If a fragment failed before we finished issuing all remote fragments,
-    // this function will have been called before we finished populating
-    // fragment_instance_states_. Skip any such uninitialized exec states.
-    if (exec_state == NULL) continue;
 
     // lock each exec_state individually to synchronize correctly with
     // UpdateFragmentExecStatus() (which doesn't get the global lock_
@@ -1735,7 +1565,7 @@ void Coordinator::CancelRemoteFragments() {
     ImpalaBackendConnection backend_client(
         exec_env_->impalad_client_cache(), exec_state->impalad_address(), 
&status);
     if (!status.ok()) continue;
-
+    ++num_cancelled;
     TCancelPlanFragmentParams params;
     params.protocol_version = ImpalaInternalServiceVersion::V1;
     params.__set_fragment_instance_id(exec_state->fragment_instance_id());
@@ -1765,8 +1595,11 @@ void Coordinator::CancelRemoteFragments() {
       exec_state->status()->AddDetail(join(res.status.error_msgs, "; "));
     }
   }
+  VLOG_QUERY << Substitute(
+      "CancelFragmentInstances() query_id=$0, tried to cancel $1 fragment 
instances",
+      PrintId(query_id_), num_cancelled);
 
-  // notify that we completed with an error
+  // Notify that we completed with an error.
   instance_completion_cv_.notify_all();
 }
 
@@ -1799,11 +1632,11 @@ Status Coordinator::UpdateFragmentExecStatus(const 
TReportExecStatusParams& para
       // We can't update this backend's profile if ReportQuerySummary() is 
running,
       // because it depends on all profiles not changing during its execution 
(when it
       // calls SortChildren()). ReportQuerySummary() only gets called after
-      // WaitForAllInstances() returns or at the end of 
CancelRemoteFragments().
+      // WaitForAllInstances() returns or at the end of 
CancelFragmentInstances().
       // WaitForAllInstances() only returns after all backends have completed 
(in which
       // case we wouldn't be in this function), or when there's an error, in 
which case
-      // CancelRemoteFragments() is called. CancelRemoteFragments sets all 
exec_state's
-      // statuses to cancelled.
+      // CancelFragmentInstances() is called. CancelFragmentInstances sets all
+      // exec_state's statuses to cancelled.
       // TODO: We're losing this profile information. Call ReportQuerySummary 
only after
       // all backends have completed.
       exec_state->profile()->Update(cumulative_profile);
@@ -1899,18 +1732,12 @@ Status Coordinator::UpdateFragmentExecStatus(const 
TReportExecStatusParams& para
   return Status::OK();
 }
 
-const RowDescriptor& Coordinator::row_desc() const {
-  DCHECK(executor_.get() != NULL);
-  return executor_->row_desc();
-}
-
 RuntimeState* Coordinator::runtime_state() {
-  return executor_.get() == NULL ? NULL : executor_->runtime_state();
+  return executor_ == NULL ? NULL : executor_->runtime_state();
 }
 
 MemTracker* Coordinator::query_mem_tracker() {
-  return executor_.get() == NULL ? query_mem_tracker_.get() :
-      executor_->runtime_state()->query_mem_tracker();
+  return query_mem_tracker_.get();
 }
 
 bool Coordinator::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) {
@@ -1941,7 +1768,9 @@ void 
Coordinator::UpdateAverageProfile(FragmentInstanceState* instance_state) {
   PerFragmentProfileData* data = &fragment_profiles_[fragment_idx];
 
   // No locks are taken since UpdateAverage() and AddChild() take their own 
locks
-  data->averaged_profile->UpdateAverage(instance_state->profile());
+  if (data->averaged_profile != nullptr) {
+    data->averaged_profile->UpdateAverage(instance_state->profile());
+  }
   data->root_profile->AddChild(instance_state->profile());
 }
 
@@ -2000,18 +1829,18 @@ void Coordinator::UpdateExecSummary(const 
FragmentInstanceState& instance_state)
 
 // This function appends summary information to the query_profile_ before
 // outputting it to VLOG.  It adds:
-//   1. Averaged remote fragment profiles (TODO: add outliers)
-//   2. Summary of remote fragment durations (min, max, mean, stddev)
-//   3. Summary of remote fragment rates (min, max, mean, stddev)
+//   1. Averaged fragment instance profiles (TODO: add outliers)
+//   2. Summary of fragment instance durations (min, max, mean, stddev)
+//   3. Summary of fragment instance rates (min, max, mean, stddev)
 // TODO: add histogram/percentile
 void Coordinator::ReportQuerySummary() {
-  // In this case, the query did not even get to start on all the remote nodes,
-  // some of the state that is used below might be uninitialized.  In this 
case,
+  // In this case, the query did not even get to start all fragment instances.
+  // Some of the state that is used below might be uninitialized.  In this 
case,
   // the query has made so little progress, reporting a summary is not very 
useful.
   if (!has_called_wait_) return;
 
   if (!fragment_instance_states_.empty()) {
-    // Average all remote fragments for each fragment.
+    // Average all fragment instances for each fragment.
     for (FragmentInstanceState* state: fragment_instance_states_) {
       // TODO: make profiles uniform across all fragments so we don't have
       // to keep special-casing the coord fragment
@@ -2028,7 +1857,7 @@ void Coordinator::ReportQuerySummary() {
 
     InstanceComparator comparator;
     // Per fragment instances have been collected, output summaries
-    for (int i = (executor_.get() != NULL ? 1 : 0); i < 
fragment_profiles_.size(); ++i) {
+    for (int i = (executor_ != NULL ? 1 : 0); i < fragment_profiles_.size(); 
++i) {
       fragment_profiles_[i].root_profile->SortChildren(comparator);
       SummaryStats& completion_times = fragment_profiles_[i].completion_times;
       SummaryStats& rates = fragment_profiles_[i].rates;
@@ -2088,16 +1917,6 @@ void Coordinator::ReportQuerySummary() {
 
 string Coordinator::GetErrorLog() {
   ErrorLogMap merged;
-  {
-    lock_guard<mutex> l(lock_);
-    // TODO-MT: use FragmentInstanceState::error_log_ instead
-    // as part of getting rid of the special-casing of the coordinator instance
-    if (executor_.get() != NULL && executor_->runtime_state() != NULL) {
-      ErrorLogMap runtime_error_log;
-      executor_->runtime_state()->GetErrors(&runtime_error_log);
-      MergeErrorMaps(&merged, runtime_error_log);
-    }
-  }
   for (FragmentInstanceState* state: fragment_instance_states_) {
     lock_guard<mutex> l(*state->lock());
     if (state->error_log()->size() > 0)  MergeErrorMaps(&merged, 
*state->error_log());
@@ -2285,11 +2104,20 @@ void DistributeFilters(shared_ptr<TPublishFilterParams> 
params,
 void Coordinator::TearDown() {
   DCHECK(!torn_down_) << "Coordinator::TearDown() may not be called twice";
   torn_down_ = true;
-  lock_guard<SpinLock> l(filter_lock_);
-  for (auto& filter: filter_routing_table_) {
-    FilterState* state = &filter.second;
-    state->Disable(filter_mem_tracker_.get());
+  if (filter_routing_table_.size() > 0) {
+    query_profile_->AddInfoString("Final filter table", FilterDebugString());
   }
+
+  {
+    lock_guard<SpinLock> l(filter_lock_);
+    for (auto& filter : filter_routing_table_) {
+      FilterState* state = &filter.second;
+      state->Disable(filter_mem_tracker_.get());
+    }
+  }
+
+  // Need to protect against failed Prepare(), where root_sink() would not be 
set.
+  if (root_sink_ != nullptr) root_sink_->CloseConsumer();
 }
 
 void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index bb67377..f73cf42 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -34,16 +34,18 @@
 #include <boost/thread/mutex.hpp>
 #include <boost/thread/condition_variable.hpp>
 
+#include "common/global-types.h"
 #include "common/hdfs.h"
 #include "common/status.h"
-#include "common/global-types.h"
-#include "util/progress-updater.h"
-#include "util/histogram-metric.h"
-#include "util/runtime-profile.h"
+#include "gen-cpp/Frontend_types.h"
+#include "gen-cpp/Types_types.h"
 #include "runtime/runtime-state.h"
 #include "scheduling/simple-scheduler.h"
-#include "gen-cpp/Types_types.h"
-#include "gen-cpp/Frontend_types.h"
+#include "service/fragment-exec-state.h"
+#include "service/fragment-mgr.h"
+#include "util/histogram-metric.h"
+#include "util/progress-updater.h"
+#include "util/runtime-profile.h"
 
 namespace impala {
 
@@ -67,28 +69,33 @@ class TRuntimeProfileTree;
 class RuntimeProfile;
 class TablePrinter;
 class TPlanFragment;
+class QueryResultSet;
 
 struct DebugOptions;
 
-/// Query coordinator: handles execution of plan fragments on remote nodes, 
given
-/// a TQueryExecRequest. As part of that, it handles all interactions with the
-/// executing backends; it is also responsible for implementing all client 
requests
-/// regarding the query, including cancellation.
-/// The coordinator fragment is executed locally in the calling thread, all 
other
-/// fragments are sent to remote nodes. The coordinator also monitors
-/// the execution status of the remote fragments and aborts the entire query 
if an error
-/// occurs, either in any of the remote fragments or in the local fragment.
+/// Query coordinator: handles execution of fragment instances on remote 
nodes, given a
+/// TQueryExecRequest. As part of that, it handles all interactions with the 
executing
+/// backends; it is also responsible for implementing all client requests 
regarding the
+/// query, including cancellation.
+///
+/// The coordinator monitors the execution status of fragment instances and 
aborts the
+/// entire query if an error is reported by any of them.
+///
+/// Queries that have results have those results fetched by calling GetNext(). 
Results
+/// rows are produced by a fragment instance that always executes on the same 
machine as
+/// the coordinator.
+///
 /// Once a query has finished executing and all results have been returned 
either to the
 /// caller of GetNext() or a data sink, execution_completed() will return 
true. If the
-/// query is aborted, execution_completed should also be set to true.
-/// Coordinator is thread-safe, with the exception of GetNext().
+/// query is aborted, execution_completed should also be set to true. 
Coordinator is
+/// thread-safe, with the exception of GetNext().
 //
 /// A typical sequence of calls for a single query (calls under the same 
numbered
 /// item can happen concurrently):
 /// 1. client: Exec()
 /// 2. client: Wait()/client: Cancel()/backend: UpdateFragmentExecStatus()
 /// 3. client: GetNext()*/client: Cancel()/backend: UpdateFragmentExecStatus()
-//
+///
 /// The implementation ensures that setting an overall error status and 
initiating
 /// cancellation of local and all remote fragments is atomic.
 ///
@@ -104,14 +111,10 @@ class Coordinator {
       RuntimeProfile::EventSequence* events);
   ~Coordinator();
 
-  /// Initiate asynchronous execution of a query with the given schedule. 
Returns as soon
-  /// as all plan fragments have started executing at their respective 
backends.
-  /// 'schedule' must contain at least a coordinator plan fragment (ie, can't
-  /// be for a query like 'SELECT 1').
-  /// Populates and prepares output_expr_ctxs from the coordinator's fragment 
if there is
-  /// one, and LLVM optimizes them together with the fragment's other exprs.
+  /// Initiate asynchronous execution of a query with the given schedule. When 
it returns,
+  /// all fragment instances have started executing at their respective 
backends.
   /// A call to Exec() must precede all other member function calls.
-  Status Exec(std::vector<ExprContext*>* output_expr_ctxs);
+  Status Exec();
 
   /// Blocks until result rows are ready to be retrieved via GetNext(), or, if 
the
   /// query doesn't return rows, until the query finishes or is cancelled.
@@ -120,25 +123,25 @@ class Coordinator {
   /// Wait() calls concurrently.
   Status Wait();
 
-  /// Returns tuples from the coordinator fragment. Any returned tuples are 
valid until
-  /// the next GetNext() call. If *batch is NULL, execution has completed and 
GetNext()
-  /// must not be called again.
-  /// GetNext() will not set *batch=NULL until all fragment instances have
-  /// either completed or have failed.
-  /// It is safe to call GetNext() even in the case where there is no 
coordinator fragment
-  /// (distributed INSERT).
-  /// '*batch' is owned by the underlying PlanFragmentExecutor and must not be 
deleted.
-  /// *state is owned by the caller, and must not be deleted.
-  /// Returns an error status if an error was encountered either locally or by
-  /// any of the remote fragments or if the query was cancelled.
-  /// GetNext() is not thread-safe: multiple threads must not make concurrent
-  /// GetNext() calls (but may call any of the other member functions 
concurrently
-  /// with GetNext()).
-  Status GetNext(RowBatch** batch, RuntimeState* state);
+  /// Fills 'results' with up to 'max_rows' rows. May return fewer than 
'max_rows'
+  /// rows, but will not return more.
+  ///
+  /// If *eos is true, execution has completed and GetNext() must not be called
+  /// again.
+  ///
+  /// GetNext() will not set *eos=true until all fragment instances have 
either completed
+  /// or have failed.
+  ///
+  /// Returns an error status if an error was encountered either locally or by 
any of the
+  /// remote fragments or if the query was cancelled.
+  ///
+  /// GetNext() is not thread-safe: multiple threads must not make concurrent 
GetNext()
+  /// calls (but may call any of the other member functions concurrently with 
GetNext()).
+  Status GetNext(QueryResultSet* results, int max_rows, bool* eos);
 
   /// Cancel execution of query. This includes the execution of the local plan 
fragment,
-  /// if any, as well as all plan fragments on remote nodes. Sets 
query_status_ to
-  /// the given cause if non-NULL. Otherwise, sets query_status_ to 
Status::CANCELLED.
+  /// if any, as well as all plan fragments on remote nodes. Sets 
query_status_ to the
+  /// given cause if non-NULL. Otherwise, sets query_status_ to 
Status::CANCELLED.
   /// Idempotent.
   void Cancel(const Status* cause = NULL);
 
@@ -151,12 +154,18 @@ class Coordinator {
   /// to CancelInternal().
   Status UpdateFragmentExecStatus(const TReportExecStatusParams& params);
 
-  /// only valid *after* calling Exec(), and may return NULL if there is no 
executor
+  /// Only valid *after* calling Exec(). Return nullptr if the running query 
does not
+  /// produce any rows.
+  ///
+  /// TODO: The only dependency on this is QueryExecState, used to track 
memory for the
+  /// result cache. Remove this dependency, possibly by moving result caching 
inside this
+  /// class.
   RuntimeState* runtime_state();
-  const RowDescriptor& row_desc() const;
 
   /// Only valid after Exec(). Returns runtime_state()->query_mem_tracker() if 
there
   /// is a coordinator fragment, or query_mem_tracker_ (initialized in Exec()) 
otherwise.
+  ///
+  /// TODO: Remove, see runtime_state().
   MemTracker* query_mem_tracker();
 
   /// Get cumulative profile aggregated over all fragments of the query.
@@ -275,8 +284,18 @@ class Coordinator {
   /// Once this is set to true, errors from remote fragments are ignored.
   bool returned_all_results_;
 
-  /// execution state of coordinator fragment
-  boost::scoped_ptr<PlanFragmentExecutor> executor_;
+  /// Non-null if and only if the query produces results for the client; i.e. 
is of
+  /// TStmtType::QUERY. Coordinator uses these to pull results from plan tree 
and return
+  /// them to the client in GetNext(), and also to access the fragment 
instance's runtime
+  /// state.
+  ///
+  /// Result rows are materialized by this fragment instance in its own 
thread. They are
+  /// materialized into a QueryResultSet provided to the coordinator during 
GetNext().
+  ///
+  /// Not owned by this class, created during fragment instance start-up by
+  /// FragmentExecState and set here in Exec().
+  PlanFragmentExecutor* executor_ = nullptr;
+  PlanRootSink* root_sink_ = nullptr;
 
   /// Query mem tracker for this coordinator initialized in Exec(). Only valid 
if there
   /// is no coordinator fragment (i.e. executor_ == NULL). If executor_ is not 
NULL,
@@ -383,7 +402,7 @@ class Coordinator {
   RuntimeProfile::Counter* finalization_timer_;
 
   /// Barrier that is released when all calls to ExecRemoteFragment() have
-  /// returned, successfully or not. Initialised during StartRemoteFragments().
+  /// returned, successfully or not. Initialised during Exec().
   boost::scoped_ptr<CountingBarrier> exec_complete_barrier_;
 
   /// Represents a runtime filter target.
@@ -465,10 +484,9 @@ class Coordinator {
   /// Runs cancel logic. Assumes that lock_ is held.
   void CancelInternal();
 
-  /// Cancels remote fragments. Assumes that lock_ is held.  This can be 
called when
-  /// the query is not being cancelled in the case where the query limit is
-  /// reached.
-  void CancelRemoteFragments();
+  /// Cancels all fragment instances. Assumes that lock_ is held. This may be 
called when
+  /// the query is not being cancelled in the case where the query limit is 
reached.
+  void CancelFragmentInstances();
 
   /// Acquires lock_ and updates query_status_ with 'status' if it's not 
already
   /// an error status, and returns the current query_status_.
@@ -531,30 +549,18 @@ class Coordinator {
   void PopulatePathPermissionCache(hdfsFS fs, const std::string& path_str,
       PermissionCache* permissions_cache);
 
-  /// Validates that all collection-typed slots in the given batch are set to 
NULL.
-  /// See SubplanNode for details on when collection-typed slots are set to 
NULL.
-  /// TODO: This validation will become obsolete when we can return collection 
values.
-  /// We will then need a different mechanism to assert the correct behavior 
of the
-  /// SubplanNode with respect to setting collection-slots to NULL.
-  void ValidateCollectionSlots(RowBatch* batch);
-
-  /// Prepare coordinator fragment for execution (update filter routing table,
-  /// prepare executor, set up output exprs) and create its 
FragmentInstanceState.
-  Status PrepareCoordFragment(std::vector<ExprContext*>* output_expr_ctxs);
-
-  /// Starts all remote fragments contained in the schedule by issuing RPCs in 
parallel,
+  /// Starts all fragment instances contained in the schedule by issuing RPCs 
in parallel,
   /// and then waiting for all of the RPCs to complete.
-  void StartRemoteFragments();
+  void StartFragments();
 
-  /// Starts all remote fragment instances contained in the schedule by 
issuing RPCs in
-  /// parallel and then waiting for all of the RPCs to complete. Also sets up 
and
-  /// registers the state for all non-coordinator fragment instance.
-  void MtStartRemoteFInstances();
+  /// Starts all fragment instances contained in the schedule by issuing RPCs 
in parallel
+  /// and then waiting for all of the RPCs to complete.
+  void MtStartFInstances();
 
   /// Calls CancelInternal() and returns an error if there was any error 
starting the
   /// fragments.
   /// Also updates query_profile_ with the startup latency histogram.
-  Status FinishRemoteInstanceStartup();
+  Status FinishInstanceStartup();
 
   /// Build the filter routing table by iterating over all plan nodes and 
collecting the
   /// filters that they either produce or consume. The source and target 
fragment

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 1b3fa14..db0d242 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -24,6 +24,8 @@
 #include <gutil/strings/substitute.h>
 
 #include "common/logging.h"
+#include "gen-cpp/CatalogService.h"
+#include "gen-cpp/ImpalaInternalService.h"
 #include "runtime/backend-client.h"
 #include "runtime/client-cache.h"
 #include "runtime/coordinator.h"
@@ -36,25 +38,24 @@
 #include "runtime/thread-resource-mgr.h"
 #include "runtime/tmp-file-mgr.h"
 #include "scheduling/request-pool-service.h"
-#include "service/frontend.h"
 #include "scheduling/simple-scheduler.h"
+#include "service/fragment-mgr.h"
+#include "service/frontend.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/debug-util.h"
+#include "util/debug-util.h"
 #include "util/default-path-handlers.h"
 #include "util/hdfs-bulk-ops.h"
 #include "util/mem-info.h"
+#include "util/mem-info.h"
+#include "util/memory-metrics.h"
+#include "util/memory-metrics.h"
 #include "util/metrics.h"
 #include "util/network-util.h"
 #include "util/parse-util.h"
-#include "util/memory-metrics.h"
-#include "util/webserver.h"
-#include "util/mem-info.h"
-#include "util/debug-util.h"
-#include "util/memory-metrics.h"
 #include "util/pretty-printer.h"
 #include "util/thread-pool.h"
-#include "gen-cpp/ImpalaInternalService.h"
-#include "gen-cpp/CatalogService.h"
+#include "util/webserver.h"
 
 #include "common/names.h"
 
@@ -145,6 +146,7 @@ ExecEnv::ExecEnv()
     fragment_exec_thread_pool_(new 
CallableThreadPool("coordinator-fragment-rpc",
         "worker", FLAGS_coordinator_rpc_threads, 
numeric_limits<int32_t>::max())),
     async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 
10000)),
+    fragment_mgr_(new FragmentMgr()),
     enable_webserver_(FLAGS_enable_webserver),
     is_fe_tests_(false),
     backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) {
@@ -197,6 +199,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, 
int subscriber_port,
     fragment_exec_thread_pool_(new 
CallableThreadPool("coordinator-fragment-rpc",
         "worker", FLAGS_coordinator_rpc_threads, 
numeric_limits<int32_t>::max())),
     async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 
10000)),
+    fragment_mgr_(new FragmentMgr()),
     enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
     is_fe_tests_(false),
     backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 303876f..718c6d0 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -96,6 +96,7 @@ class ExecEnv {
   Frontend* frontend() { return frontend_.get(); };
   RequestPoolService* request_pool_service() { return 
request_pool_service_.get(); }
   CallableThreadPool* rpc_pool() { return async_rpc_pool_.get(); }
+  FragmentMgr* fragment_mgr() { return fragment_mgr_.get(); }
 
   void set_enable_webserver(bool enable) { enable_webserver_ = enable; }
 
@@ -137,6 +138,7 @@ class ExecEnv {
   boost::scoped_ptr<Frontend> frontend_;
   boost::scoped_ptr<CallableThreadPool> fragment_exec_thread_pool_;
   boost::scoped_ptr<CallableThreadPool> async_rpc_pool_;
+  boost::scoped_ptr<FragmentMgr> fragment_mgr_;
 
   /// Not owned by this class
   ImpalaServer* impala_server_;

Reply via email to