IMPALA-110: Support for multiple DISTINCT

This patch adds support for having multiple aggregate functions in a
single SELECT block that use DISTINCT over different sets of columns.

Planner design:
- The existing tree-based plan shape with a two-phased
  aggregation is maintained.
- Existing plans are not changed.
- Aggregates are grouped into 'aggregation classes' based on their
  expressions in the distinct portion which may be empty for
  non-distinct aggregates.
- The aggregation framework is generalized to simultaneously process
  multiple aggregation classes within the tree-based plan. This
  process splits the results of different aggregation classes into
  separate rows, so a final aggregation is needed to transpose the
  results into the desired form.
- Main challenge: Each aggregation class consumes and produces
  different tuples, so conceptually a union-type of tuples flows
  through the runtime. The tuple union is represented by a TupleRow
  with one tuple per aggregation class. Only one tuple in such a
  TupleRow is non-NULL.
- Backend exec nodes in the aggregation plan will be aware of this
  tuple-union either explicitly in their implementation or by relying
  on expressions that distinguish the aggregation classes.
- To distinguish the aggregation classes, e.g. in hash exchanges,
  CASE expressions are crafted to hash/group on the appropriate slots.

Deferred FE work:
- Beautify/condense the long CASE exprs
- Push applicable conjuncts into individual aggregators before
  the transposition step
- Added a few testing TODOs to reduce the size of this patch
- Decide whether we want to change existing plans to the new model

Execution design:
- Previous patches separated out aggregation logic from the exec node
  into Aggregators. This is extended to support multiple Aggregators
  per node, with different grouping and aggregating functions.
- There is a fast path for aggregations with only one aggregator,
  which leaves the execution essentially unchanged from before.
- When there are multiple aggregators, the first aggregation node in
  the plan replicates its input to each aggregator. The output of this
  step is rows where only a single tuple is non-null, corresponding to
  the aggregator that produced the row.
- A new expr is introduced, ValidTupleId, which takes one of these
  rows and returns which tuple is non-null.
- For additional aggregation nodes, the input is split apart into
  'mini-batches' according to which aggregator the row corresponds to.

Testing:
- Added analyzer and planner tests
- Added end-to-end queries tests
- Ran hdfs/core tests
- Added support in the query generator and ran in a loop.

Change-Id: I055402eaef6d81e5f70e850d9f8a621e766830a4
Reviewed-on: http://gerrit.cloudera.org:8080/10771
Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/df53ec23
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/df53ec23
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/df53ec23

Branch: refs/heads/master
Commit: df53ec2385190bba2b3cefb43b094cde6d33642f
Parents: 91673fe
Author: Thomas Tauber-Marshall <tmarsh...@cloudera.com>
Authored: Tue May 15 16:41:18 2018 +0000
Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Committed: Wed Sep 26 03:54:49 2018 +0000

----------------------------------------------------------------------
 be/src/codegen/gen_ir_descriptions.py           |    2 +-
 be/src/exec/CMakeLists.txt                      |    1 +
 be/src/exec/aggregation-node-base.cc            |  115 ++
 be/src/exec/aggregation-node-base.h             |   67 +
 be/src/exec/aggregation-node.cc                 |  110 +-
 be/src/exec/aggregation-node.h                  |   19 +-
 be/src/exec/aggregator.cc                       |   34 +-
 be/src/exec/aggregator.h                        |   38 +-
 be/src/exec/exec-node.cc                        |    2 +-
 be/src/exec/grouping-aggregator-ir.cc           |   16 +-
 be/src/exec/grouping-aggregator.cc              |   63 +-
 be/src/exec/grouping-aggregator.h               |   78 +-
 be/src/exec/non-grouping-aggregator.cc          |   17 +-
 be/src/exec/non-grouping-aggregator.h           |   20 +-
 be/src/exec/streaming-aggregation-node.cc       |  136 +-
 be/src/exec/streaming-aggregation-node.h        |   23 +-
 be/src/exprs/CMakeLists.txt                     |    1 +
 be/src/exprs/aggregate-functions-ir.cc          |   93 ++
 be/src/exprs/aggregate-functions.h              |   10 +
 be/src/exprs/scalar-expr.cc                     |    4 +
 be/src/exprs/valid-tuple-id.cc                  |   67 +
 be/src/exprs/valid-tuple-id.h                   |   52 +
 be/src/runtime/row-batch.h                      |    8 +
 common/thrift/Exprs.thrift                      |    3 +-
 common/thrift/PlanNodes.thrift                  |   60 +-
 .../apache/impala/analysis/AggregateInfo.java   |  173 +--
 .../impala/analysis/AggregateInfoBase.java      |   14 +-
 .../java/org/apache/impala/analysis/Expr.java   |    1 +
 .../impala/analysis/MultiAggregateInfo.java     |  688 ++++++++++
 .../apache/impala/analysis/NumericLiteral.java  |    4 +
 .../org/apache/impala/analysis/SelectStmt.java  |  113 +-
 .../apache/impala/analysis/StmtRewriter.java    |   35 +-
 .../org/apache/impala/analysis/UnionStmt.java   |   19 +-
 .../impala/analysis/ValidTupleIdExpr.java       |   96 ++
 .../impala/catalog/AggregateFunction.java       |   22 +
 .../org/apache/impala/catalog/BuiltinsDb.java   |   81 ++
 .../apache/impala/planner/AggregationNode.java  |  442 ++++--
 .../impala/planner/DistributedPlanner.java      |  130 +-
 .../impala/planner/SingleNodePlanner.java       |  109 +-
 .../impala/analysis/AnalyzeExprsTest.java       |  106 +-
 .../impala/analysis/AnalyzeStmtsTest.java       |   87 +-
 .../apache/impala/analysis/AnalyzerTest.java    |   60 -
 .../org/apache/impala/planner/PlannerTest.java  |   25 +
 .../queries/PlannerTest/distinct.test           |   45 +
 .../PlannerTest/multiple-distinct-limit.test    |  181 +++
 .../multiple-distinct-materialization.test      |  972 +++++++++++++
 .../multiple-distinct-predicates.test           |  498 +++++++
 .../queries/PlannerTest/multiple-distinct.test  | 1280 ++++++++++++++++++
 .../QueryTest/multiple-distinct-aggs.test       |  424 ++++++
 .../queries/QueryTest/spilling-aggs.test        |   60 +
 tests/query_test/test_aggregation.py            |    8 +-
 51 files changed, 5881 insertions(+), 831 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py 
b/be/src/codegen/gen_ir_descriptions.py
index 99a97ae..8175b59 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -52,7 +52,7 @@ ir_functions = [
   ["NON_GROUPING_AGG_ADD_BATCH_IMPL",
    "_ZN6impala21NonGroupingAggregator12AddBatchImplEPNS_8RowBatchE"],
   ["GROUPING_AGG_ADD_BATCH_STREAMING_IMPL",
-   
"_ZN6impala18GroupingAggregator21AddBatchStreamingImplEbNS_13TPrefetchMode4typeEPNS_8RowBatchES4_PNS_12HashTableCtxEPi"],
+   
"_ZN6impala18GroupingAggregator21AddBatchStreamingImplEibNS_13TPrefetchMode4typeEPNS_8RowBatchES4_PNS_12HashTableCtxEPi"],
   ["AVG_UPDATE_BIGINT",
    
"_ZN6impala18AggregateFunctions9AvgUpdateIN10impala_udf9BigIntValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE"],
   ["AVG_UPDATE_DOUBLE",

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 4544b95..385aa49 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -26,6 +26,7 @@ set(EXECUTABLE_OUTPUT_PATH 
"${BUILD_OUTPUT_ROOT_DIRECTORY}/exec")
 
 add_library(Exec
   aggregation-node.cc
+  aggregation-node-base.cc
   aggregator.cc
   analytic-eval-node.cc
   base-sequence-scanner.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/aggregation-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node-base.cc 
b/be/src/exec/aggregation-node-base.cc
new file mode 100644
index 0000000..5de54be
--- /dev/null
+++ b/be/src/exec/aggregation-node-base.cc
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/aggregation-node-base.h"
+
+#include "exec/grouping-aggregator.h"
+#include "exec/non-grouping-aggregator.h"
+#include "runtime/runtime-state.h"
+#include "util/runtime-profile-counters.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+AggregationNodeBase::AggregationNodeBase(
+    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
+  : ExecNode(pool, tnode, descs), 
replicate_input_(tnode.agg_node.replicate_input) {}
+
+Status AggregationNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) {
+  // The conjuncts will be evaluated in the Aggregator, so don't pass them to 
the
+  // ExecNode. TODO: remove this once we assign conjuncts directly to 
Aggregators.
+  TPlanNode tnode_no_conjuncts(tnode);
+  tnode_no_conjuncts.__set_conjuncts(std::vector<TExpr>());
+  RETURN_IF_ERROR(ExecNode::Init(tnode_no_conjuncts, state));
+
+  int num_ags = tnode.agg_node.aggregators.size();
+  for (int i = 0; i < num_ags; ++i) {
+    const TAggregator& agg = tnode.agg_node.aggregators[i];
+    unique_ptr<Aggregator> node;
+    if (agg.grouping_exprs.empty()) {
+      node.reset(new NonGroupingAggregator(this, pool_, agg, 
state->desc_tbl(), i));
+    } else {
+      node.reset(new GroupingAggregator(this, pool_, agg, state->desc_tbl(),
+          tnode.agg_node.estimated_input_cardinality, i));
+    }
+    aggs_.push_back(std::move(node));
+    RETURN_IF_ERROR(aggs_[i]->Init(agg, state, tnode.conjuncts));
+    runtime_profile_->AddChild(aggs_[i]->runtime_profile());
+  }
+  DCHECK(aggs_.size() > 0);
+  return Status::OK();
+}
+
+Status AggregationNodeBase::Prepare(RuntimeState* state) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  RETURN_IF_ERROR(ExecNode::Prepare(state));
+  for (auto& agg : aggs_) {
+    agg->SetDebugOptions(debug_options_);
+    RETURN_IF_ERROR(agg->Prepare(state));
+  }
+  state->CheckAndAddCodegenDisabledMessage(runtime_profile());
+  return Status::OK();
+}
+
+void AggregationNodeBase::Codegen(RuntimeState* state) {
+  DCHECK(state->ShouldCodegen());
+  ExecNode::Codegen(state);
+  if (IsNodeCodegenDisabled()) return;
+  for (auto& agg : aggs_) agg->Codegen(state);
+}
+
+Status AggregationNodeBase::Reset(RuntimeState* state) {
+  for (auto& agg : aggs_) RETURN_IF_ERROR(agg->Reset(state));
+  curr_output_agg_idx_ = 0;
+  return ExecNode::Reset(state);
+}
+
+Status AggregationNodeBase::SplitMiniBatches(
+    RowBatch* batch, vector<unique_ptr<RowBatch>>* mini_batches) {
+  int num_tuples = child(0)->row_desc()->tuple_descriptors().size();
+  int num_rows = batch->num_rows();
+  int last_agg_idx = 0;
+  for (int i = 0; i < num_rows; ++i) {
+    TupleRow* src_row = batch->GetRow(i);
+    int dst_agg_idx = -1;
+    // Optimization that bets that the index of non-null agg will be the same 
from one
+    // tuple to the next.
+    if (src_row->GetTuple(last_agg_idx) != nullptr) {
+      dst_agg_idx = last_agg_idx;
+    } else {
+      for (int j = 0; j < num_tuples; ++j) {
+        if (src_row->GetTuple(j) != nullptr) {
+          dst_agg_idx = j;
+          last_agg_idx = j;
+          break;
+        }
+      }
+    }
+
+    DCHECK_GE(dst_agg_idx, 0);
+    DCHECK_LT(dst_agg_idx, num_tuples);
+
+    RowBatch* mini_batch = (*mini_batches)[dst_agg_idx].get();
+    TupleRow* dst_row = mini_batch->GetRow(mini_batch->AddRow());
+    batch->CopyRow(src_row, dst_row);
+    mini_batch->CommitLastRow();
+  }
+  return Status::OK();
+}
+
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/aggregation-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node-base.h 
b/be/src/exec/aggregation-node-base.h
new file mode 100644
index 0000000..7dfab34
--- /dev/null
+++ b/be/src/exec/aggregation-node-base.h
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_AGGREGATION_NODE_BASE_H
+#define IMPALA_EXEC_AGGREGATION_NODE_BASE_H
+
+#include <memory>
+
+#include "exec/aggregator.h"
+#include "exec/exec-node.h"
+
+namespace impala {
+
+/// Base class containing common code for the ExecNodes that do aggregation,
+/// AggregationNode and StreamingAggregationNode.
+class AggregationNodeBase : public ExecNode {
+ public:
+  AggregationNodeBase(
+      ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status Prepare(RuntimeState* state) override;
+  virtual void Codegen(RuntimeState* state) override;
+  virtual Status Reset(RuntimeState* state) override;
+
+ protected:
+  /// If true, the input to this node should be passed into each Aggregator in 
'aggs_'.
+  /// Otherwise, the input should be divided between the Aggregators using
+  /// AggregationNodeBase::SplitMiniBatches().
+  const bool replicate_input_;
+
+  /////////////////////////////////////////
+  /// BEGIN: Members that must be Reset()
+
+  /// Performs the actual work of aggregating input rows.
+  std::vector<std::unique_ptr<Aggregator>> aggs_;
+
+  /// The index in 'aggs_' of the Aggregator which we are currently returning 
rows from in
+  /// GetNext().
+  int curr_output_agg_idx_ = 0;
+
+  /// END: Members that must be Reset()
+  /////////////////////////////////////////
+
+  /// Splits the rows of 'batch' up according to which tuple of the row is 
non-null such
+  /// that a row with tuple 'i' non-null is copied into the batch 
'mini_batches[i]'.
+  /// It is expected that all rows of 'batch' have exactly 1 non-null tuple.
+  Status SplitMiniBatches(
+      RowBatch* batch, std::vector<std::unique_ptr<RowBatch>>* mini_batches);
+};
+} // namespace impala
+
+#endif // IMPALA_EXEC_AGGREGATION_NODE_BASE_H

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc
index 9f76768..eabe7da 100644
--- a/be/src/exec/aggregation-node.cc
+++ b/be/src/exec/aggregation-node.cc
@@ -19,11 +19,10 @@
 
 #include <sstream>
 
-#include "exec/grouping-aggregator.h"
-#include "exec/non-grouping-aggregator.h"
 #include "gutil/strings/substitute.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
 #include "util/debug-util.h"
 #include "util/runtime-profile-counters.h"
 
@@ -35,38 +34,10 @@ namespace impala {
 
 AggregationNode::AggregationNode(
     ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-  : ExecNode(pool, tnode, descs) {}
-
-Status AggregationNode::Init(const TPlanNode& tnode, RuntimeState* state) {
-  // The conjuncts will be evaluated in the Aggregator, so don't pass them to 
the
-  // ExecNode. TODO: remove this once we assign conjuncts directly to 
Aggregators.
-  TPlanNode tnode_no_conjuncts(tnode);
-  tnode_no_conjuncts.__set_conjuncts(std::vector<TExpr>());
-  RETURN_IF_ERROR(ExecNode::Init(tnode_no_conjuncts, state));
-  if (tnode.agg_node.grouping_exprs.empty()) {
-    aggregator_.reset(new NonGroupingAggregator(this, pool_, tnode, 
state->desc_tbl()));
-  } else {
-    aggregator_.reset(new GroupingAggregator(this, pool_, tnode, 
state->desc_tbl()));
+  : AggregationNodeBase(pool, tnode, descs) {
+  for (int i = 0; i < tnode.agg_node.aggregators.size(); ++i) {
+    DCHECK(!tnode.agg_node.aggregators[i].use_streaming_preaggregation);
   }
-  RETURN_IF_ERROR(aggregator_->Init(tnode, state));
-  runtime_profile_->AddChild(aggregator_->runtime_profile());
-  return Status::OK();
-}
-
-Status AggregationNode::Prepare(RuntimeState* state) {
-  SCOPED_TIMER(runtime_profile_->total_time_counter());
-  RETURN_IF_ERROR(ExecNode::Prepare(state));
-  aggregator_->SetDebugOptions(debug_options_);
-  RETURN_IF_ERROR(aggregator_->Prepare(state));
-  state->CheckAndAddCodegenDisabledMessage(runtime_profile());
-  return Status::OK();
-}
-
-void AggregationNode::Codegen(RuntimeState* state) {
-  DCHECK(state->ShouldCodegen());
-  ExecNode::Codegen(state);
-  if (IsNodeCodegenDisabled()) return;
-  aggregator_->Codegen(state);
 }
 
 Status AggregationNode::Open(RuntimeState* state) {
@@ -74,17 +45,53 @@ Status AggregationNode::Open(RuntimeState* state) {
   // Open the child before consuming resources in this node.
   RETURN_IF_ERROR(child(0)->Open(state));
   RETURN_IF_ERROR(ExecNode::Open(state));
+  for (auto& agg : aggs_) RETURN_IF_ERROR(agg->Open(state));
+  RowBatch child_batch(child(0)->row_desc(), state->batch_size(), 
mem_tracker());
+
+  int num_aggs = aggs_.size();
+  // Create mini batches.
+  vector<unique_ptr<RowBatch>> mini_batches;
+  if (!replicate_input_ && num_aggs > 1) {
+    for (int i = 0; i < num_aggs; ++i) {
+      mini_batches.push_back(make_unique<RowBatch>(
+          child(0)->row_desc(), state->batch_size(), mem_tracker()));
+    }
+  }
 
-  RETURN_IF_ERROR(aggregator_->Open(state));
-
-  RowBatch batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
   // Read all the rows from the child and process them.
   bool eos = false;
   do {
     RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(children_[0]->GetNext(state, &batch, &eos));
-    RETURN_IF_ERROR(aggregator_->AddBatch(state, &batch));
-    batch.Reset();
+    RETURN_IF_ERROR(children_[0]->GetNext(state, &child_batch, &eos));
+
+    if (num_aggs == 1) {
+      RETURN_IF_ERROR(aggs_[0]->AddBatch(state, &child_batch));
+      child_batch.Reset();
+      continue;
+    }
+
+    if (replicate_input_) {
+      for (auto& agg : aggs_) RETURN_IF_ERROR(agg->AddBatch(state, 
&child_batch));
+      child_batch.Reset();
+      continue;
+    }
+
+    // Separate input batch into mini batches destined for the different aggs.
+    int num_tuples = child(0)->row_desc()->tuple_descriptors().size();
+    DCHECK_EQ(num_aggs, num_tuples);
+    int num_rows = child_batch.num_rows();
+    if (num_rows > 0) {
+      RETURN_IF_ERROR(SplitMiniBatches(&child_batch, &mini_batches));
+
+      for (int i = 0; i < num_tuples; ++i) {
+        RowBatch* mini_batch = mini_batches[i].get();
+        if (mini_batch->num_rows() > 0) {
+          RETURN_IF_ERROR(aggs_[i]->AddBatch(state, mini_batch));
+          mini_batch->Reset();
+        }
+      }
+    }
+    child_batch.Reset();
   } while (!eos);
 
   // The child can be closed at this point in most cases because we have 
consumed all of
@@ -93,7 +100,7 @@ Status AggregationNode::Open(RuntimeState* state) {
   // child again,
   if (!IsInSubplan()) child(0)->Close(state);
 
-  RETURN_IF_ERROR(aggregator_->InputDone());
+  for (auto& agg : aggs_) RETURN_IF_ERROR(agg->InputDone());
   return Status::OK();
 }
 
@@ -102,35 +109,38 @@ Status AggregationNode::GetNext(RuntimeState* state, 
RowBatch* row_batch, bool*
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   RETURN_IF_CANCELLED(state);
 
-  if (ReachedLimit()) {
+  if (curr_output_agg_idx_ >= aggs_.size() || ReachedLimit()) {
     *eos = true;
     return Status::OK();
   }
 
-  RETURN_IF_ERROR(aggregator_->GetNext(state, row_batch, eos));
+  // With multiple Aggregators, each will only set a single tuple per row. We 
rely on the
+  // other tuples to be null to detect which Aggregator set which row.
+  if (aggs_.size() > 1) row_batch->ClearTuplePointers();
+
+  bool pagg_eos = false;
+  RETURN_IF_ERROR(aggs_[curr_output_agg_idx_]->GetNext(state, row_batch, 
&pagg_eos));
+  if (pagg_eos) ++curr_output_agg_idx_;
+
+  *eos = ReachedLimit() || (pagg_eos && curr_output_agg_idx_ >= aggs_.size());
   num_rows_returned_ += row_batch->num_rows();
   COUNTER_SET(rows_returned_counter_, num_rows_returned_);
   return Status::OK();
 }
 
-Status AggregationNode::Reset(RuntimeState* state) {
-  RETURN_IF_ERROR(aggregator_->Reset(state));
-  return ExecNode::Reset(state);
-}
-
 void AggregationNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   // All expr mem allocations should happen in the Aggregator.
   DCHECK(expr_results_pool() == nullptr
       || expr_results_pool()->total_allocated_bytes() == 0);
-  aggregator_->Close(state);
+  for (auto& agg : aggs_) agg->Close(state);
   ExecNode::Close(state);
 }
 
 void AggregationNode::DebugString(int indentation_level, stringstream* out) 
const {
   *out << string(indentation_level * 2, ' ');
-  *out << "AggregationNode("
-       << "aggregator=" << aggregator_->DebugString();
+  *out << "AggregationNode(";
+  for (auto& agg : aggs_) agg->DebugString(indentation_level, out);
   ExecNode::DebugString(indentation_level, out);
   *out << ")";
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node.h b/be/src/exec/aggregation-node.h
index 527a62d..dfec577 100644
--- a/be/src/exec/aggregation-node.h
+++ b/be/src/exec/aggregation-node.h
@@ -20,8 +20,7 @@
 
 #include <memory>
 
-#include "exec/aggregator.h"
-#include "exec/exec-node.h"
+#include "exec/aggregation-node-base.h"
 
 namespace impala {
 
@@ -31,29 +30,15 @@ class RuntimeState;
 /// Node for doing partitioned hash aggregation.
 /// This node consumes the input from child(0) during Open() and then passes 
it to the
 /// Aggregator, which does the actual work of aggregating.
-class AggregationNode : public ExecNode {
+class AggregationNode : public AggregationNodeBase {
  public:
   AggregationNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
 
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
-  virtual Status Prepare(RuntimeState* state) override;
-  virtual void Codegen(RuntimeState* state) override;
   virtual Status Open(RuntimeState* state) override;
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) 
override;
-  virtual Status Reset(RuntimeState* state) override;
   virtual void Close(RuntimeState* state) override;
 
   virtual void DebugString(int indentation_level, std::stringstream* out) 
const override;
-
- private:
-  /////////////////////////////////////////
-  /// BEGIN: Members that must be Reset()
-
-  /// Performs the actual work of aggregating input rows.
-  std::unique_ptr<Aggregator> aggregator_;
-
-  /// END: Members that must be Reset()
-  /////////////////////////////////////////
 };
 } // namespace impala
 

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/aggregator.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregator.cc b/be/src/exec/aggregator.cc
index 9e79bf4..a63d364 100644
--- a/be/src/exec/aggregator.cc
+++ b/be/src/exec/aggregator.cc
@@ -43,41 +43,41 @@ namespace impala {
 
 const char* Aggregator::LLVM_CLASS_NAME = "class.impala::Aggregator";
 
-Aggregator::Aggregator(ExecNode* exec_node, ObjectPool* pool, const TPlanNode& 
tnode,
-    const DescriptorTbl& descs, const std::string& name)
+Aggregator::Aggregator(ExecNode* exec_node, ObjectPool* pool,
+    const TAggregator& taggregator, const DescriptorTbl& descs, const 
std::string& name,
+    int agg_idx)
   : id_(exec_node->id()),
     exec_node_(exec_node),
+    agg_idx_(agg_idx),
     pool_(pool),
-    intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id),
+    intermediate_tuple_id_(taggregator.intermediate_tuple_id),
     intermediate_tuple_desc_(descs.GetTupleDescriptor(intermediate_tuple_id_)),
-    output_tuple_id_(tnode.agg_node.output_tuple_id),
+    output_tuple_id_(taggregator.output_tuple_id),
     output_tuple_desc_(descs.GetTupleDescriptor(output_tuple_id_)),
     row_desc_(*exec_node->row_desc()),
     input_row_desc_(*exec_node->child(0)->row_desc()),
-    needs_finalize_(tnode.agg_node.need_finalize),
-    runtime_profile_(RuntimeProfile::Create(pool_, name)),
-    num_rows_returned_(0),
-    rows_returned_counter_(nullptr),
-    build_timer_(nullptr) {}
+    needs_finalize_(taggregator.need_finalize),
+    runtime_profile_(RuntimeProfile::Create(pool_, name)) {}
 
 Aggregator::~Aggregator() {}
 
-Status Aggregator::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status Aggregator::Init(const TAggregator& taggregator, RuntimeState* state,
+    const std::vector<TExpr>& conjuncts) {
   DCHECK(intermediate_tuple_desc_ != nullptr);
   DCHECK(output_tuple_desc_ != nullptr);
   DCHECK_EQ(intermediate_tuple_desc_->slots().size(), 
output_tuple_desc_->slots().size());
 
-  int j = num_grouping_exprs();
-  for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i, ++j) {
+  int j = GetNumGroupingExprs();
+  for (int i = 0; i < taggregator.aggregate_functions.size(); ++i, ++j) {
     SlotDescriptor* intermediate_slot_desc = 
intermediate_tuple_desc_->slots()[j];
     SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j];
     AggFn* agg_fn;
-    RETURN_IF_ERROR(AggFn::Create(tnode.agg_node.aggregate_functions[i], 
input_row_desc_,
+    RETURN_IF_ERROR(AggFn::Create(taggregator.aggregate_functions[i], 
input_row_desc_,
         *intermediate_slot_desc, *output_slot_desc, state, &agg_fn));
     agg_fns_.push_back(agg_fn);
   }
 
-  RETURN_IF_ERROR(ScalarExpr::Create(tnode.conjuncts, row_desc_, state, 
&conjuncts_));
+  RETURN_IF_ERROR(ScalarExpr::Create(conjuncts, row_desc_, state, 
&conjuncts_));
   return Status::OK();
 }
 
@@ -123,7 +123,7 @@ void Aggregator::Close(RuntimeState* state) {
 void Aggregator::InitAggSlots(
     const vector<AggFnEvaluator*>& agg_fn_evals, Tuple* intermediate_tuple) {
   vector<SlotDescriptor*>::const_iterator slot_desc =
-      intermediate_tuple_desc_->slots().begin() + num_grouping_exprs();
+      intermediate_tuple_desc_->slots().begin() + GetNumGroupingExprs();
   for (int i = 0; i < agg_fn_evals.size(); ++i, ++slot_desc) {
     // To minimize branching on the UpdateTuple path, initialize the result 
value so that
     // the Add() UDA function can ignore the NULL bit of its destination 
value. E.g. for
@@ -184,7 +184,7 @@ Tuple* Aggregator::GetOutputTuple(
   // Copy grouping values from tuple to dst.
   // TODO: Codegen this.
   if (dst != tuple) {
-    int num_grouping_slots = num_grouping_exprs();
+    int num_grouping_slots = GetNumGroupingExprs();
     for (int i = 0; i < num_grouping_slots; ++i) {
       SlotDescriptor* src_slot_desc = intermediate_tuple_desc_->slots()[i];
       SlotDescriptor* dst_slot_desc = output_tuple_desc_->slots()[i];
@@ -568,7 +568,7 @@ Status Aggregator::CodegenUpdateTuple(LlvmCodeGen* codegen, 
llvm::Function** fn)
 
   // Loop over each expr and generate the IR for that slot.  If the expr is not
   // count(*), generate a helper IR function to update the slot and call that.
-  int j = num_grouping_exprs();
+  int j = GetNumGroupingExprs();
   for (int i = 0; i < agg_fns_.size(); ++i, ++j) {
     SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[j];
     AggFn* agg_fn = agg_fns_[i];

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/aggregator.h
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h
index 1ea3c47..bd2f10e 100644
--- a/be/src/exec/aggregator.h
+++ b/be/src/exec/aggregator.h
@@ -48,7 +48,7 @@ class RuntimeState;
 class ScalarExpr;
 class ScalarExprEvaluator;
 class SlotDescriptor;
-class TPlanNode;
+class TAggregator;
 class Tuple;
 class TupleDescriptor;
 class TupleRow;
@@ -60,13 +60,15 @@ class TupleRow;
 /// be called and the results can be fetched with GetNext().
 class Aggregator {
  public:
-  Aggregator(ExecNode* exec_node, ObjectPool* pool, const TPlanNode& tnode,
-      const DescriptorTbl& descs, const std::string& name);
+  /// 'agg_idx' is the index of 'taggregator' in the parent TAggregationNode.
+  Aggregator(ExecNode* exec_node, ObjectPool* pool, const TAggregator& 
taggregator,
+      const DescriptorTbl& descs, const std::string& name, int agg_idx);
   virtual ~Aggregator();
 
   /// Aggregators follow the same lifecycle as ExecNodes, except that after 
Open() and
   /// before GetNext() rows should be added with AddBatch(), followed by 
InputDone()[
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) 
WARN_UNUSED_RESULT;
+  virtual Status Init(const TAggregator& taggregator, RuntimeState* state,
+      const std::vector<TExpr>& conjuncts) WARN_UNUSED_RESULT;
   virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
   virtual void Codegen(RuntimeState* state) = 0;
   virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT;
@@ -77,10 +79,21 @@ class Aggregator {
 
   /// Adds all of the rows in 'batch' to the aggregation.
   virtual Status AddBatch(RuntimeState* state, RowBatch* batch) = 0;
+
+  /// Used to insert input rows if this is a streaming pre-agg. Tries to 
aggregate all of
+  /// the rows of 'child_batch', but if there isn't enough memory available 
rows will be
+  /// streamed through and returned in 'out_batch'. If 'eos' is true, 
'child_batch' was
+  /// fully processed, otherwise 'out_batch' was filled up and 
AddBatchStreaming() should
+  /// be called again with the same 'child_batch' and a new 'out_batch'.
+  /// AddBatch() and AddBatchStreaming() should not be called on the same 
Aggregator.
+  virtual Status AddBatchStreaming(RuntimeState* state, RowBatch* out_batch,
+      RowBatch* child_batch, bool* eos) WARN_UNUSED_RESULT = 0;
+
   /// Indicates that all batches have been added. Must be called before 
GetNext().
-  virtual Status InputDone() = 0;
+  virtual Status InputDone() WARN_UNUSED_RESULT = 0;
+
+  virtual int GetNumGroupingExprs() = 0;
 
-  virtual int num_grouping_exprs() = 0;
   RuntimeProfile* runtime_profile() { return runtime_profile_; }
 
   virtual void SetDebugOptions(const TDebugOptions& debug_options) = 0;
@@ -92,8 +105,13 @@ class Aggregator {
 
  protected:
   /// The id of the ExecNode this Aggregator corresponds to.
-  int id_;
+  const int id_;
   ExecNode* exec_node_;
+
+  /// The index of this Aggregator within the AggregationNode. When returning 
output, this
+  /// Aggregator should only write tuples at 'agg_idx_' within the row.
+  const int agg_idx_;
+
   ObjectPool* pool_;
 
   /// Account for peak memory used by this aggregator.
@@ -154,11 +172,11 @@ class Aggregator {
   /// Runtime profile for this aggregator. Owned by 'pool_'.
   RuntimeProfile* const runtime_profile_;
 
-  int64_t num_rows_returned_;
-  RuntimeProfile::Counter* rows_returned_counter_;
+  int64_t num_rows_returned_ = 0;
+  RuntimeProfile::Counter* rows_returned_counter_ = nullptr;
 
   /// Time spent processing the child rows
-  RuntimeProfile::Counter* build_timer_;
+  RuntimeProfile::Counter* build_timer_ = nullptr;
 
   /// Initializes the aggregate function slots of an intermediate tuple.
   /// Any var-len data is allocated from the FunctionContexts.

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index bbb9a4b..b9406dd 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -274,7 +274,7 @@ Status ExecNode::CreateNode(ObjectPool* pool, const 
TPlanNode& tnode,
       }
       break;
     case TPlanNodeType::AGGREGATION_NODE:
-      if (tnode.agg_node.use_streaming_preaggregation) {
+      if (tnode.agg_node.aggregators[0].use_streaming_preaggregation) {
         *node = pool->Add(new StreamingAggregationNode(pool, tnode, descs));
       } else {
         *node = pool->Add(new AggregationNode(pool, tnode, descs));

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/grouping-aggregator-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/grouping-aggregator-ir.cc 
b/be/src/exec/grouping-aggregator-ir.cc
index d3dbf17..9f7c399 100644
--- a/be/src/exec/grouping-aggregator-ir.cc
+++ b/be/src/exec/grouping-aggregator-ir.cc
@@ -152,18 +152,18 @@ Status 
GroupingAggregator::AddIntermediateTuple(Partition* __restrict__ partitio
   }
 }
 
-Status GroupingAggregator::AddBatchStreamingImpl(bool needs_serialize,
+Status GroupingAggregator::AddBatchStreamingImpl(int agg_idx, bool 
needs_serialize,
     TPrefetchMode::type prefetch_mode, RowBatch* in_batch, RowBatch* out_batch,
     HashTableCtx* __restrict__ ht_ctx, int 
remaining_capacity[PARTITION_FANOUT]) {
   DCHECK(is_streaming_preagg_);
-  DCHECK_EQ(out_batch->num_rows(), 0);
-  DCHECK_LE(in_batch->num_rows(), out_batch->capacity());
+  DCHECK(!out_batch->AtCapacity());
 
   RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
   HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
   const int num_rows = in_batch->num_rows();
   const int cache_size = expr_vals_cache->capacity();
-  for (int group_start = 0; group_start < num_rows; group_start += cache_size) 
{
+  for (int group_start = streaming_idx_; group_start < num_rows;
+       group_start += cache_size) {
     EvalAndHashPrefetchGroup<false>(in_batch, group_start, prefetch_mode, 
ht_ctx);
 
     FOREACH_ROW_LIMIT(in_batch, group_start, cache_size, in_batch_iter) {
@@ -184,15 +184,21 @@ Status GroupingAggregator::AddBatchStreamingImpl(bool 
needs_serialize,
           return std::move(add_batch_status_);
         }
         UpdateTuple(agg_fn_evals_.data(), intermediate_tuple, in_row);
-        out_batch_iterator.Get()->SetTuple(0, intermediate_tuple);
+        out_batch_iterator.Get()->SetTuple(agg_idx, intermediate_tuple);
         out_batch_iterator.Next();
         out_batch->CommitLastRow();
+        if (out_batch->AtCapacity() && in_batch_iter.RowNum() + 1 < num_rows) {
+          streaming_idx_ = in_batch_iter.RowNum() + 1;
+          goto ret;
+        }
       }
       DCHECK(add_batch_status_.ok());
       expr_vals_cache->NextRow();
     }
     DCHECK(expr_vals_cache->AtEnd());
   }
+  streaming_idx_ = 0;
+ret:
   if (needs_serialize) {
     FOREACH_ROW(out_batch, 0, out_batch_iter) {
       AggFnEvaluator::Serialize(agg_fn_evals_, 
out_batch_iter.Get()->GetTuple(0));

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/grouping-aggregator.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/grouping-aggregator.cc 
b/be/src/exec/grouping-aggregator.cc
index 0ce2a71..9b38a0c 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -36,6 +36,7 @@
 #include "runtime/tuple-row.h"
 #include "runtime/tuple.h"
 #include "util/runtime-profile-counters.h"
+#include "util/string-parser.h"
 
 #include "gen-cpp/PlanNodes_types.h"
 
@@ -85,40 +86,24 @@ static const int STREAMING_HT_MIN_REDUCTION_SIZE =
     sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]);
 
 GroupingAggregator::GroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
-    const TPlanNode& tnode, const DescriptorTbl& descs)
-  : Aggregator(exec_node, pool, tnode, descs, "GroupingAggregator"),
+    const TAggregator& taggregator, const DescriptorTbl& descs,
+    int64_t estimated_input_cardinality, int agg_idx)
+  : Aggregator(exec_node, pool, taggregator, descs,
+        Substitute("GroupingAggregator $0", agg_idx), agg_idx),
     intermediate_row_desc_(intermediate_tuple_desc_, false),
-    is_streaming_preagg_(tnode.agg_node.use_streaming_preaggregation),
-    needs_serialize_(false),
-    output_partition_(nullptr),
-    resource_profile_(exec_node->resource_profile()),
-    num_input_rows_(0),
+    is_streaming_preagg_(taggregator.use_streaming_preaggregation),
+    resource_profile_(taggregator.resource_profile),
     is_in_subplan_(exec_node->IsInSubplan()),
     limit_(exec_node->limit()),
-    add_batch_impl_fn_(nullptr),
-    add_batch_streaming_impl_fn_(nullptr),
-    ht_resize_timer_(nullptr),
-    get_results_timer_(nullptr),
-    num_hash_buckets_(nullptr),
-    partitions_created_(nullptr),
-    max_partition_level_(nullptr),
-    num_row_repartitioned_(nullptr),
-    num_repartitions_(nullptr),
-    num_spilled_partitions_(nullptr),
-    largest_partition_percent_(nullptr),
-    streaming_timer_(nullptr),
-    num_passthrough_rows_(nullptr),
-    preagg_estimated_reduction_(nullptr),
-    preagg_streaming_ht_min_reduction_(nullptr),
-    estimated_input_cardinality_(tnode.agg_node.estimated_input_cardinality),
-    partition_eos_(false),
+    estimated_input_cardinality_(estimated_input_cardinality),
     partition_pool_(new ObjectPool()) {
   DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS);
 }
 
-Status GroupingAggregator::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status GroupingAggregator::Init(const TAggregator& taggregator, RuntimeState* 
state,
+    const std::vector<TExpr>& conjuncts) {
   RETURN_IF_ERROR(ScalarExpr::Create(
-      tnode.agg_node.grouping_exprs, input_row_desc_, state, 
&grouping_exprs_));
+      taggregator.grouping_exprs, input_row_desc_, state, &grouping_exprs_));
 
   // Construct build exprs from intermediate_row_desc_
   for (int i = 0; i < grouping_exprs_.size(); ++i) {
@@ -133,7 +118,7 @@ Status GroupingAggregator::Init(const TPlanNode& tnode, 
RuntimeState* state) {
     if (build_expr->type().IsVarLenStringType()) 
string_grouping_exprs_.push_back(i);
   }
 
-  RETURN_IF_ERROR(Aggregator::Init(tnode, state));
+  RETURN_IF_ERROR(Aggregator::Init(taggregator, state, conjuncts));
   for (int i = 0; i < agg_fns_.size(); ++i) {
     needs_serialize_ |= agg_fns_[i]->SupportsSerialize();
   }
@@ -283,7 +268,7 @@ Status GroupingAggregator::GetRowsFromPartition(
     Tuple* output_tuple = GetOutputTuple(output_partition_->agg_fn_evals,
         intermediate_tuple, row_batch->tuple_data_pool());
     output_iterator_.Next();
-    row->SetTuple(0, output_tuple);
+    row->SetTuple(agg_idx_, output_tuple);
     DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size());
     if (ExecNode::EvalConjuncts(conjunct_evals_.data(), conjuncts_.size(), 
row)) {
       row_batch->CommitLastRow();
@@ -382,6 +367,7 @@ void GroupingAggregator::CleanupHashTbl(
 Status GroupingAggregator::Reset(RuntimeState* state) {
   DCHECK(!is_streaming_preagg_) << "Cannot reset preaggregation";
   partition_eos_ = false;
+  streaming_idx_ = 0;
   // Reset the HT and the partitions for this grouping agg.
   ht_ctx_->set_level(0);
   ClosePartitions();
@@ -399,7 +385,6 @@ void GroupingAggregator::Close(RuntimeState* state) {
   }
   ScalarExpr::Close(grouping_exprs_);
   ScalarExpr::Close(build_exprs_);
-
   reservation_manager_.Close(state);
   if (reservation_tracker_ != nullptr) reservation_tracker_->Close();
   // Must be called after tuple_pool_ is freed, so that mem_tracker_ can be 
closed.
@@ -422,7 +407,8 @@ Status GroupingAggregator::AddBatch(RuntimeState* state, 
RowBatch* batch) {
 }
 
 Status GroupingAggregator::AddBatchStreaming(
-    RuntimeState* state, RowBatch* out_batch, RowBatch* child_batch) {
+    RuntimeState* state, RowBatch* out_batch, RowBatch* child_batch, bool* 
eos) {
+  RETURN_IF_ERROR(QueryMaintenance(state));
   SCOPED_TIMER(streaming_timer_);
   RETURN_IF_ERROR(QueryMaintenance(state));
   num_input_rows_ += child_batch->num_rows();
@@ -457,12 +443,13 @@ Status GroupingAggregator::AddBatchStreaming(
 
   TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
   if (add_batch_streaming_impl_fn_ != nullptr) {
-    RETURN_IF_ERROR(add_batch_streaming_impl_fn_(this, needs_serialize_, 
prefetch_mode,
-        child_batch, out_batch, ht_ctx_.get(), remaining_capacity));
+    RETURN_IF_ERROR(add_batch_streaming_impl_fn_(this, agg_idx_, 
needs_serialize_,
+        prefetch_mode, child_batch, out_batch, ht_ctx_.get(), 
remaining_capacity));
   } else {
-    RETURN_IF_ERROR(AddBatchStreamingImpl(needs_serialize_, prefetch_mode, 
child_batch,
-        out_batch, ht_ctx_.get(), remaining_capacity));
+    RETURN_IF_ERROR(AddBatchStreamingImpl(agg_idx_, needs_serialize_, 
prefetch_mode,
+        child_batch, out_batch, ht_ctx_.get(), remaining_capacity));
   }
+  *eos = (streaming_idx_ == 0);
 
   num_rows_returned_ += out_batch->num_rows();
   COUNTER_SET(num_passthrough_rows_, num_rows_returned_);
@@ -1040,12 +1027,16 @@ Status GroupingAggregator::CodegenAddBatchStreamingImpl(
   llvm::Function* add_batch_streaming_impl_fn = codegen->GetFunction(ir_fn, 
true);
   DCHECK(add_batch_streaming_impl_fn != nullptr);
 
+  // Make agg_idx arg constant.
+  llvm::Value* agg_idx_arg = codegen->GetArgument(add_batch_streaming_impl_fn, 
2);
+  agg_idx_arg->replaceAllUsesWith(codegen->GetI32Constant(agg_idx_));
+
   // Make needs_serialize arg constant so dead code can be optimised out.
-  llvm::Value* needs_serialize_arg = 
codegen->GetArgument(add_batch_streaming_impl_fn, 2);
+  llvm::Value* needs_serialize_arg = 
codegen->GetArgument(add_batch_streaming_impl_fn, 3);
   
needs_serialize_arg->replaceAllUsesWith(codegen->GetBoolConstant(needs_serialize_));
 
   // Replace prefetch_mode with constant so branches can be optimised out.
-  llvm::Value* prefetch_mode_arg = 
codegen->GetArgument(add_batch_streaming_impl_fn, 3);
+  llvm::Value* prefetch_mode_arg = 
codegen->GetArgument(add_batch_streaming_impl_fn, 4);
   
prefetch_mode_arg->replaceAllUsesWith(codegen->GetI32Constant(prefetch_mode));
 
   llvm::Function* update_tuple_fn;

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/grouping-aggregator.h
----------------------------------------------------------------------
diff --git a/be/src/exec/grouping-aggregator.h 
b/be/src/exec/grouping-aggregator.h
index fb95cd7..6f630d4 100644
--- a/be/src/exec/grouping-aggregator.h
+++ b/be/src/exec/grouping-aggregator.h
@@ -36,6 +36,7 @@ class AggFnEvaluator;
 class LlvmCodeGen;
 class RowBatch;
 class RuntimeState;
+class TAggregator;
 class Tuple;
 
 /// Aggregator for doing grouping aggregations. Input is passed to the 
aggregator through
@@ -113,10 +114,12 @@ class Tuple;
 /// TODO: support an Init() method with an initial value in the UDAF interface.
 class GroupingAggregator : public Aggregator {
  public:
-  GroupingAggregator(ExecNode* exec_node, ObjectPool* pool, const TPlanNode& 
tnode,
-      const DescriptorTbl& descs);
+  GroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
+      const TAggregator& taggregator, const DescriptorTbl& descs,
+      int64_t estimated_input_cardinality, int agg_idx);
 
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status Init(const TAggregator& taggregator, RuntimeState* state,
+      const std::vector<TExpr>& conjuncts) override;
   virtual Status Prepare(RuntimeState* state) override;
   virtual void Codegen(RuntimeState* state) override;
   virtual Status Open(RuntimeState* state) override;
@@ -125,15 +128,11 @@ class GroupingAggregator : public Aggregator {
   virtual void Close(RuntimeState* state) override;
 
   virtual Status AddBatch(RuntimeState* state, RowBatch* batch) override;
-  /// Used to insert input rows if this is a streaming pre-agg. Tries to 
aggregate all of
-  /// the rows of 'child_batch', but if there isn't enough memory available 
rows will be
-  /// streamed through and returned in 'out_batch'. AddBatch() and 
AddBatchStreaming()
-  /// should not be called on the same GroupingAggregator.
-  Status AddBatchStreaming(
-      RuntimeState* state, RowBatch* out_batch, RowBatch* child_batch);
-  virtual Status InputDone() override WARN_UNUSED_RESULT;
+  virtual Status AddBatchStreaming(RuntimeState* state, RowBatch* out_batch,
+      RowBatch* child_batch, bool* eos) override;
+  virtual Status InputDone() override;
 
-  virtual int num_grouping_exprs() override { return grouping_exprs_.size(); }
+  virtual int GetNumGroupingExprs() override { return grouping_exprs_.size(); }
 
   virtual void SetDebugOptions(const TDebugOptions& debug_options) override;
 
@@ -184,7 +183,7 @@ class GroupingAggregator : public Aggregator {
   const bool is_streaming_preagg_;
 
   /// True if any of the evaluators require the serialize step.
-  bool needs_serialize_;
+  bool needs_serialize_ = false;
 
   /// Exprs used to evaluate input rows
   std::vector<ScalarExpr*> grouping_exprs_;
@@ -211,7 +210,7 @@ class GroupingAggregator : public Aggregator {
   /// The current partition and iterator to the next row in its hash table 
that we need
   /// to return in GetNext(). If 'output_iterator_' is not AtEnd() then
   /// 'output_partition_' is not nullptr.
-  Partition* output_partition_;
+  Partition* output_partition_ = nullptr;
   HashTable::Iterator output_iterator_;
 
   /// Resource information sent from the frontend.
@@ -222,7 +221,7 @@ class GroupingAggregator : public Aggregator {
   BufferPool::ClientHandle* buffer_pool_client();
 
   /// The number of rows that have been passed to AddBatch() or 
AddBatchStreaming().
-  int64_t num_input_rows_;
+  int64_t num_input_rows_ = 0;
 
   /// True if this aggregator is being executed in a subplan.
   const bool is_in_subplan_;
@@ -233,52 +232,52 @@ class GroupingAggregator : public Aggregator {
   typedef Status (*AddBatchImplFn)(
       GroupingAggregator*, RowBatch*, TPrefetchMode::type, HashTableCtx*);
   /// Jitted AddBatchImpl function pointer. Null if codegen is disabled.
-  AddBatchImplFn add_batch_impl_fn_;
+  AddBatchImplFn add_batch_impl_fn_ = nullptr;
 
-  typedef Status (*AddBatchStreamingImplFn)(GroupingAggregator*, bool,
+  typedef Status (*AddBatchStreamingImplFn)(GroupingAggregator*, int, bool,
       TPrefetchMode::type, RowBatch*, RowBatch*, HashTableCtx*, 
int[PARTITION_FANOUT]);
   /// Jitted AddBatchStreamingImpl function pointer.  Null if codegen is 
disabled.
-  AddBatchStreamingImplFn add_batch_streaming_impl_fn_;
+  AddBatchStreamingImplFn add_batch_streaming_impl_fn_ = nullptr;
 
   /// Total time spent resizing hash tables.
-  RuntimeProfile::Counter* ht_resize_timer_;
+  RuntimeProfile::Counter* ht_resize_timer_ = nullptr;
 
   /// Time spent returning the aggregated rows
-  RuntimeProfile::Counter* get_results_timer_;
+  RuntimeProfile::Counter* get_results_timer_ = nullptr;
 
   /// Total number of hash buckets across all partitions.
-  RuntimeProfile::Counter* num_hash_buckets_;
+  RuntimeProfile::Counter* num_hash_buckets_ = nullptr;
 
   /// Total number of partitions created.
-  RuntimeProfile::Counter* partitions_created_;
+  RuntimeProfile::Counter* partitions_created_ = nullptr;
 
   /// Level of max partition (i.e. number of repartitioning steps).
-  RuntimeProfile::HighWaterMarkCounter* max_partition_level_;
+  RuntimeProfile::HighWaterMarkCounter* max_partition_level_ = nullptr;
 
   /// Number of rows that have been repartitioned.
-  RuntimeProfile::Counter* num_row_repartitioned_;
+  RuntimeProfile::Counter* num_row_repartitioned_ = nullptr;
 
   /// Number of partitions that have been repartitioned.
-  RuntimeProfile::Counter* num_repartitions_;
+  RuntimeProfile::Counter* num_repartitions_ = nullptr;
 
   /// Number of partitions that have been spilled.
-  RuntimeProfile::Counter* num_spilled_partitions_;
+  RuntimeProfile::Counter* num_spilled_partitions_ = nullptr;
 
   /// The largest fraction after repartitioning. This is expected to be
   /// 1 / PARTITION_FANOUT. A value much larger indicates skew.
-  RuntimeProfile::HighWaterMarkCounter* largest_partition_percent_;
+  RuntimeProfile::HighWaterMarkCounter* largest_partition_percent_ = nullptr;
 
   /// Time spent in streaming preagg algorithm.
-  RuntimeProfile::Counter* streaming_timer_;
+  RuntimeProfile::Counter* streaming_timer_ = nullptr;
 
   /// The number of rows passed through without aggregation.
-  RuntimeProfile::Counter* num_passthrough_rows_;
+  RuntimeProfile::Counter* num_passthrough_rows_ = nullptr;
 
   /// The estimated reduction of the preaggregation.
-  RuntimeProfile::Counter* preagg_estimated_reduction_;
+  RuntimeProfile::Counter* preagg_estimated_reduction_ = nullptr;
 
   /// Expose the minimum reduction factor to continue growing the hash tables.
-  RuntimeProfile::Counter* preagg_streaming_ht_min_reduction_;
+  RuntimeProfile::Counter* preagg_streaming_ht_min_reduction_ = nullptr;
 
   /// The estimated number of input rows from the planner.
   int64_t estimated_input_cardinality_;
@@ -289,7 +288,14 @@ class GroupingAggregator : public Aggregator {
   /// BEGIN: Members that must be Reset()
 
   /// If true, no more rows to output from partitions.
-  bool partition_eos_;
+  bool partition_eos_ = false;
+
+  /// When streaming rows through unaggregated, if the out batch reaches 
capacity before
+  /// the input batch is fully processed, 'streaming_idx_' indicates the 
position within
+  /// the input batch to resume at in the next call to AddBatchStreaming(). 
This is used
+  /// in the case where there are multiple aggregators, as the out batch 
passed to
+  /// AddBatchStreaming() may already have rows passed through by another 
aggregator.
+  int32_t streaming_idx_ = 0;
 
   /// Used for hash-related functionality, such as evaluating rows and 
calculating hashes.
   /// It also owns the evaluators for the grouping and build expressions used 
during hash
@@ -503,8 +509,8 @@ class GroupingAggregator : public Aggregator {
   /// into the hash table or added to 'out_batch' in the intermediate tuple 
format.
   /// 'in_batch' is processed entirely, and 'out_batch' must have enough 
capacity to
   /// store all of the rows in 'in_batch'.
-  /// 'needs_serialize' is an argument so that codegen can replace it with a 
constant,
-  ///     rather than using the member variable 'needs_serialize_'.
+  /// 'agg_idx' and 'needs_serialize' are arguments so that codegen can 
replace them with
+  ///     constants, rather than using the member variables of the same names.
   /// 'prefetch_mode' specifies the prefetching mode in use. If it's not 
PREFETCH_NONE,
   ///     hash table buckets will be prefetched based on the hash values 
computed. Note
   ///     that 'prefetch_mode' will be substituted with constants during 
codegen time.
@@ -512,9 +518,9 @@ class GroupingAggregator : public Aggregator {
   ///     additional rows that can be added to the hash table per partition. 
It is updated
   ///     by AddBatchStreamingImpl() when it inserts new rows.
   /// 'ht_ctx' is passed in as a way to avoid aliasing of 'this' confusing the 
optimiser.
-  Status AddBatchStreamingImpl(bool needs_serialize, TPrefetchMode::type 
prefetch_mode,
-      RowBatch* in_batch, RowBatch* out_batch, HashTableCtx* ht_ctx,
-      int remaining_capacity[PARTITION_FANOUT]) WARN_UNUSED_RESULT;
+  Status AddBatchStreamingImpl(int agg_idx, bool needs_serialize,
+      TPrefetchMode::type prefetch_mode, RowBatch* in_batch, RowBatch* 
out_batch,
+      HashTableCtx* ht_ctx, int remaining_capacity[PARTITION_FANOUT]) 
WARN_UNUSED_RESULT;
 
   /// Tries to add intermediate to the hash table 'hash_tbl' of 'partition' 
for streaming
   /// aggregation. The input row must have been evaluated with 'ht_ctx', with 
'hash' set

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/non-grouping-aggregator.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/non-grouping-aggregator.cc 
b/be/src/exec/non-grouping-aggregator.cc
index 1ee4e46..2bcaa98 100644
--- a/be/src/exec/non-grouping-aggregator.cc
+++ b/be/src/exec/non-grouping-aggregator.cc
@@ -36,11 +36,9 @@
 namespace impala {
 
 NonGroupingAggregator::NonGroupingAggregator(ExecNode* exec_node, ObjectPool* 
pool,
-    const TPlanNode& tnode, const DescriptorTbl& descs)
-  : Aggregator(exec_node, pool, tnode, descs, "NonGroupingAggregator"),
-    add_batch_impl_fn_(nullptr),
-    singleton_output_tuple_(nullptr),
-    singleton_output_tuple_returned_(true) {}
+    const TAggregator& taggregator, const DescriptorTbl& descs, int agg_idx)
+  : Aggregator(exec_node, pool, taggregator, descs,
+        Substitute("NonGroupingAggregator $0", agg_idx), agg_idx) {}
 
 Status NonGroupingAggregator::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(Aggregator::Prepare(state));
@@ -84,6 +82,7 @@ Status NonGroupingAggregator::GetNext(
 void NonGroupingAggregator::GetSingletonOutput(RowBatch* row_batch) {
   int row_idx = row_batch->AddRow();
   TupleRow* row = row_batch->GetRow(row_idx);
+  row_batch->ClearRow(row);
   // The output row batch may reference memory allocated by Serialize() or 
Finalize(),
   // allocating that memory directly from the row batch's pool means we can 
safely return
   // the batch.
@@ -91,7 +90,7 @@ void NonGroupingAggregator::GetSingletonOutput(RowBatch* 
row_batch) {
       ScopedResultsPool::Create(agg_fn_evals_, row_batch->tuple_data_pool());
   Tuple* output_tuple = GetOutputTuple(
       agg_fn_evals_, singleton_output_tuple_, row_batch->tuple_data_pool());
-  row->SetTuple(0, output_tuple);
+  row->SetTuple(agg_idx_, output_tuple);
   if (ExecNode::EvalConjuncts(conjunct_evals_.data(), conjunct_evals_.size(), 
row)) {
     row_batch->CommitLastRow();
     ++num_rows_returned_;
@@ -128,6 +127,12 @@ Status NonGroupingAggregator::AddBatch(RuntimeState* 
state, RowBatch* batch) {
   return Status::OK();
 }
 
+Status NonGroupingAggregator::AddBatchStreaming(
+    RuntimeState* state, RowBatch* out_batch, RowBatch* child_batch, bool* 
eos) {
+  *eos = true;
+  return AddBatch(state, child_batch);
+}
+
 Tuple* NonGroupingAggregator::ConstructSingletonOutputTuple(
     const vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool) {
   Tuple* output_tuple = Tuple::Create(intermediate_tuple_desc_->byte_size(), 
pool);

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/non-grouping-aggregator.h
----------------------------------------------------------------------
diff --git a/be/src/exec/non-grouping-aggregator.h 
b/be/src/exec/non-grouping-aggregator.h
index 41b3e0d..49a663d 100644
--- a/be/src/exec/non-grouping-aggregator.h
+++ b/be/src/exec/non-grouping-aggregator.h
@@ -33,7 +33,7 @@ class LlvmCodeGen;
 class ObjectPool;
 class RowBatch;
 class RuntimeState;
-class TPlanNode;
+class TAggregator;
 class Tuple;
 
 /// Aggregator for doing non-grouping aggregations. Input is passed to the 
aggregator
@@ -41,8 +41,8 @@ class Tuple;
 /// not support streaming preaggregation.
 class NonGroupingAggregator : public Aggregator {
  public:
-  NonGroupingAggregator(ExecNode* exec_node, ObjectPool* pool, const 
TPlanNode& tnode,
-      const DescriptorTbl& descs);
+  NonGroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
+      const TAggregator& taggregator, const DescriptorTbl& descs, int agg_idx);
 
   virtual Status Prepare(RuntimeState* state) override;
   virtual void Codegen(RuntimeState* state) override;
@@ -52,9 +52,15 @@ class NonGroupingAggregator : public Aggregator {
   virtual void Close(RuntimeState* state) override;
 
   virtual Status AddBatch(RuntimeState* state, RowBatch* batch) override;
+
+  /// NonGroupingAggregators behave the same in streaming and non-streaming 
contexts, so
+  /// this just calls AddBatch.
+  virtual Status AddBatchStreaming(RuntimeState* state, RowBatch* out_batch,
+      RowBatch* child_batch, bool* eos) override;
+
   virtual Status InputDone() override { return Status::OK(); }
 
-  virtual int num_grouping_exprs() override { return 0; }
+  virtual int GetNumGroupingExprs() override { return 0; }
 
   /// NonGroupingAggregator doesn't create a buffer pool client so it doesn't 
need the
   /// debug options.
@@ -72,7 +78,7 @@ class NonGroupingAggregator : public Aggregator {
 
   typedef Status (*AddBatchImplFn)(NonGroupingAggregator*, RowBatch*);
   /// Jitted AddBatchImpl function pointer. Null if codegen is disabled.
-  AddBatchImplFn add_batch_impl_fn_;
+  AddBatchImplFn add_batch_impl_fn_ = nullptr;
 
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
@@ -80,8 +86,8 @@ class NonGroupingAggregator : public Aggregator {
   /// Result of aggregation w/o GROUP BY.
   /// Note: can be NULL even if there is no grouping if the result tuple is 0 
width
   /// e.g. select 1 from table group by col.
-  Tuple* singleton_output_tuple_;
-  bool singleton_output_tuple_returned_;
+  Tuple* singleton_output_tuple_ = nullptr;
+  bool singleton_output_tuple_returned_ = true;
 
   /// END: Members that must be Reset()
   /////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/streaming-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/streaming-aggregation-node.cc 
b/be/src/exec/streaming-aggregation-node.cc
index 7280849..7cbebb8 100644
--- a/be/src/exec/streaming-aggregation-node.cc
+++ b/be/src/exec/streaming-aggregation-node.cc
@@ -22,6 +22,7 @@
 #include "gutil/strings/substitute.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
 #include "util/runtime-profile-counters.h"
 
 #include "gen-cpp/PlanNodes_types.h"
@@ -32,35 +33,12 @@ namespace impala {
 
 StreamingAggregationNode::StreamingAggregationNode(
     ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-  : ExecNode(pool, tnode, descs), child_eos_(false) {
+  : AggregationNodeBase(pool, tnode, descs) {
   DCHECK(tnode.conjuncts.empty()) << "Preaggs have no conjuncts";
-  DCHECK(!tnode.agg_node.grouping_exprs.empty()) << "Streaming preaggs do 
grouping";
   DCHECK(limit_ == -1) << "Preaggs have no limits";
-}
-
-Status StreamingAggregationNode::Init(const TPlanNode& tnode, RuntimeState* 
state) {
-  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
-  aggregator_.reset(new GroupingAggregator(this, pool_, tnode, 
state->desc_tbl()));
-  RETURN_IF_ERROR(aggregator_->Init(tnode, state));
-  runtime_profile_->AddChild(aggregator_->runtime_profile());
-  return Status::OK();
-}
-
-Status StreamingAggregationNode::Prepare(RuntimeState* state) {
-  SCOPED_TIMER(runtime_profile_->total_time_counter());
-  RETURN_IF_ERROR(ExecNode::Prepare(state));
-  aggregator_->SetDebugOptions(debug_options_);
-  RETURN_IF_ERROR(aggregator_->Prepare(state));
-  state->CheckAndAddCodegenDisabledMessage(runtime_profile());
-  return Status::OK();
-}
-
-void StreamingAggregationNode::Codegen(RuntimeState* state) {
-  DCHECK(state->ShouldCodegen());
-  ExecNode::Codegen(state);
-  if (IsNodeCodegenDisabled()) return;
-
-  aggregator_->Codegen(state);
+  for (int i = 0; i < tnode.agg_node.aggregators.size(); ++i) {
+    DCHECK(tnode.agg_node.aggregators[i].use_streaming_preaggregation);
+  }
 }
 
 Status StreamingAggregationNode::Open(RuntimeState* state) {
@@ -69,7 +47,7 @@ Status StreamingAggregationNode::Open(RuntimeState* state) {
   RETURN_IF_ERROR(child(0)->Open(state));
   RETURN_IF_ERROR(ExecNode::Open(state));
 
-  RETURN_IF_ERROR(aggregator_->Open(state));
+  for (auto& agg : aggs_) RETURN_IF_ERROR(agg->Open(state));
 
   // Streaming preaggregations do all processing in GetNext().
   return Status::OK();
@@ -86,15 +64,22 @@ Status StreamingAggregationNode::GetNext(
     return Status::OK();
   }
 
-  bool aggregator_eos = false;
-  if (!child_eos_) {
+  // With multiple Aggregators, each will only set a single tuple per row. We 
rely on the
+  // other tuples to be null to detect which Aggregator set which row.
+  if (aggs_.size() > 1) row_batch->ClearTuplePointers();
+
+  if (!child_eos_ || !child_batch_processed_) {
     // For streaming preaggregations, we process rows from the child as we go.
     RETURN_IF_ERROR(GetRowsStreaming(state, row_batch));
+    *eos = false;
   } else {
-    RETURN_IF_ERROR(aggregator_->GetNext(state, row_batch, &aggregator_eos));
+    bool aggregator_eos = false;
+    RETURN_IF_ERROR(
+        aggs_[curr_output_agg_idx_]->GetNext(state, row_batch, 
&aggregator_eos));
+    if (aggregator_eos) ++curr_output_agg_idx_;
+    *eos = curr_output_agg_idx_ >= aggs_.size();
   }
 
-  *eos = aggregator_eos && child_eos_;
   num_rows_returned_ += row_batch->num_rows();
   COUNTER_SET(rows_returned_counter_, num_rows_returned_);
   return Status::OK();
@@ -102,27 +87,94 @@ Status StreamingAggregationNode::GetNext(
 
 Status StreamingAggregationNode::GetRowsStreaming(
     RuntimeState* state, RowBatch* out_batch) {
-  DCHECK(!child_eos_);
-
   if (child_batch_ == nullptr) {
     child_batch_.reset(
         new RowBatch(child(0)->row_desc(), state->batch_size(), 
mem_tracker()));
   }
 
+  int num_aggs = aggs_.size();
+  // Create mini batches.
+  vector<unique_ptr<RowBatch>> mini_batches;
+  if (!replicate_input_ && num_aggs > 1) {
+    for (int i = 0; i < num_aggs; ++i) {
+      mini_batches.push_back(make_unique<RowBatch>(
+          child(0)->row_desc(), state->batch_size(), mem_tracker()));
+    }
+  }
+
   do {
     DCHECK_EQ(out_batch->num_rows(), 0);
     RETURN_IF_CANCELLED(state);
 
-    RETURN_IF_ERROR(child(0)->GetNext(state, child_batch_.get(), &child_eos_));
-
-    RETURN_IF_ERROR(aggregator_->AddBatchStreaming(state, out_batch, 
child_batch_.get()));
-    child_batch_->Reset(); // All rows from child_batch_ were processed.
+    if (child_batch_processed_) {
+      DCHECK_EQ(child_batch_->num_rows(), 0);
+      RETURN_IF_ERROR(child(0)->GetNext(state, child_batch_.get(), 
&child_eos_));
+      child_batch_processed_ = false;
+    }
+
+    if (num_aggs == 1) {
+      RETURN_IF_ERROR(aggs_[0]->AddBatchStreaming(
+          state, out_batch, child_batch_.get(), &child_batch_processed_));
+      // We're not guaranteed to be able to stream the entirety of 
'child_batch_' into
+      // 'out_batch' as AddBatchStreaming() will attach all var-len data to 
'out_batch'
+      // and 'child_batch_' may have been referencing data that wasn't 
attached to it.
+      if (child_batch_processed_) {
+        child_batch_->Reset();
+      }
+      continue;
+    }
+
+    if (replicate_input_) {
+      bool eos = false;
+      while (replicate_agg_idx_ < num_aggs) {
+        RETURN_IF_ERROR(aggs_[replicate_agg_idx_]->AddBatchStreaming(
+            state, out_batch, child_batch_.get(), &eos));
+        if (eos) ++replicate_agg_idx_;
+        if (out_batch->AtCapacity()) break;
+        // If out_batch isn't full, we must have processed the entire input.
+        DCHECK(eos);
+      }
+      if (replicate_agg_idx_ == num_aggs) {
+        replicate_agg_idx_ = 0;
+        child_batch_processed_ = true;
+        child_batch_->Reset();
+      }
+      continue;
+    }
+
+    // Separate input batch into mini batches destined for the different aggs.
+    int num_tuples = child(0)->row_desc()->tuple_descriptors().size();
+    DCHECK_EQ(num_aggs, num_tuples);
+    int num_rows = child_batch_->num_rows();
+    DCHECK_LE(num_rows, out_batch->capacity());
+    if (num_rows > 0) {
+      RETURN_IF_ERROR(SplitMiniBatches(child_batch_.get(), &mini_batches));
+
+      for (int i = 0; i < num_tuples; ++i) {
+        RowBatch* mini_batch = mini_batches[i].get();
+        if (mini_batch->num_rows() > 0) {
+          bool eos;
+          RETURN_IF_ERROR(
+              aggs_[i]->AddBatchStreaming(state, out_batch, mini_batch, &eos));
+          // child_batch_'s size is <= out_batch's capacity, so even under 
high memory
+          // pressure where all rows are streamed though, we will be able to 
process
+          // the entire input. Note that unlike the single agg case above, 
this works
+          // because any node that hits this path must have been preceded by an
+          // aggregation node in 'replicate_input_' mode, and so all memory 
pointed to by
+          // 'child_batch_' will be attached to 'child_batch_'.
+          DCHECK(eos);
+          mini_batch->Reset();
+        }
+      }
+    }
+    child_batch_processed_ = true;
+    child_batch_->Reset();
   } while (out_batch->num_rows() == 0 && !child_eos_);
 
   if (child_eos_) {
     child(0)->Close(state);
     child_batch_.reset();
-    RETURN_IF_ERROR(aggregator_->InputDone());
+    for (auto& agg : aggs_) RETURN_IF_ERROR(agg->InputDone());
   }
 
   return Status::OK();
@@ -138,7 +190,7 @@ void StreamingAggregationNode::Close(RuntimeState* state) {
   // All expr mem allocations should happen in the Aggregator.
   DCHECK(expr_results_pool() == nullptr
       || expr_results_pool()->total_allocated_bytes() == 0);
-  aggregator_->Close(state);
+  for (auto& agg : aggs_) agg->Close(state);
   child_batch_.reset();
   ExecNode::Close(state);
 }
@@ -146,8 +198,8 @@ void StreamingAggregationNode::Close(RuntimeState* state) {
 void StreamingAggregationNode::DebugString(
     int indentation_level, stringstream* out) const {
   *out << string(indentation_level * 2, ' ');
-  *out << "StreamingAggregationNode("
-       << "aggregator=" << aggregator_->DebugString();
+  *out << "StreamingAggregationNode(";
+  for (auto& agg : aggs_) agg->DebugString(indentation_level, out);
   ExecNode::DebugString(indentation_level, out);
   *out << ")";
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/streaming-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/streaming-aggregation-node.h 
b/be/src/exec/streaming-aggregation-node.h
index 8e06b2a..71193b9 100644
--- a/be/src/exec/streaming-aggregation-node.h
+++ b/be/src/exec/streaming-aggregation-node.h
@@ -20,8 +20,7 @@
 
 #include <memory>
 
-#include "exec/exec-node.h"
-#include "exec/grouping-aggregator.h"
+#include "exec/aggregation-node-base.h"
 
 namespace impala {
 
@@ -44,14 +43,11 @@ class RuntimeState;
 /// the final aggregation.
 ///
 /// This node only supports grouping aggregations.
-class StreamingAggregationNode : public ExecNode {
+class StreamingAggregationNode : public AggregationNodeBase {
  public:
   StreamingAggregationNode(
       ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
 
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
-  virtual Status Prepare(RuntimeState* state) override;
-  virtual void Codegen(RuntimeState* state) override;
   virtual Status Open(RuntimeState* state) override;
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) 
override;
   virtual Status Reset(RuntimeState* state) override;
@@ -63,14 +59,21 @@ class StreamingAggregationNode : public ExecNode {
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
 
-  /// Row batch used as argument to GetNext() for the child node 
preaggregations. Store
-  /// in node to avoid reallocating for every GetNext() call when streaming.
+  /// Row batch retrieved from the child and passed to Aggregators in 
GetNext(). Stored
+  /// here as we may need it in multiple GetNext() calls if we're streaming 
rows through.
   std::unique_ptr<RowBatch> child_batch_;
 
+  /// If true, there are no more rows in 'child_batch_' that need to be passed 
to any
+  /// Aggregator, and the next call to GetNext() will retrieve another batch, 
unless
+  /// 'child_eos_' is true.
+  bool child_batch_processed_ = true;
+
   /// True if no more rows to process from child.
-  bool child_eos_;
+  bool child_eos_ = false;
 
-  std::unique_ptr<GroupingAggregator> aggregator_;
+  /// If 'replicate_input_' is true, the index in 'aggs_' of the next 
Aggregator to pass
+  /// 'child_batch_' into.
+  int32_t replicate_agg_idx_ = 0;
 
   /// END: Members that must be Reset()
   /////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exprs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt
index 75312b5..8b270f4 100644
--- a/be/src/exprs/CMakeLists.txt
+++ b/be/src/exprs/CMakeLists.txt
@@ -63,6 +63,7 @@ add_library(Exprs
   udf-builtins-ir.cc
   utility-functions.cc
   utility-functions-ir.cc
+  valid-tuple-id.cc
 )
 add_dependencies(Exprs gen-deps gen_ir_descriptions)
 

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exprs/aggregate-functions-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/aggregate-functions-ir.cc 
b/be/src/exprs/aggregate-functions-ir.cc
index 33533f5..bc2bc0e 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -1690,6 +1690,43 @@ BigIntVal 
AggregateFunctions::SampledNdvFinalize(FunctionContext* ctx,
   return round(scaled_extrap_ndv * ndv_scale);
 }
 
+template <typename T>
+void AggregateFunctions::AggIfUpdate(
+    FunctionContext* ctx, const BooleanVal& cond, const T& src, T* dst) {
+  DCHECK(!cond.is_null);
+  if (cond.val) *dst = src;
+}
+
+template <>
+void AggregateFunctions::AggIfUpdate(
+    FunctionContext* ctx, const BooleanVal& cond, const StringVal& src, 
StringVal* dst) {
+  DCHECK(!cond.is_null);
+  if (cond.val) CopyStringVal(ctx, src, dst);
+}
+
+template <typename T>
+void AggregateFunctions::AggIfMerge(FunctionContext*, const T& src, T* dst) {
+  *dst = src;
+}
+
+template <>
+void AggregateFunctions::AggIfMerge(
+    FunctionContext* ctx, const StringVal& src, StringVal* dst) {
+  CopyStringVal(ctx, src, dst);
+}
+
+template <typename T>
+T AggregateFunctions::AggIfFinalize(FunctionContext*, const T& src) {
+  return src;
+}
+
+template <>
+StringVal AggregateFunctions::AggIfFinalize(FunctionContext* ctx, const 
StringVal& src) {
+  StringVal result = StringValGetValue(ctx, src);
+  if (!src.is_null) ctx->Free(src.ptr);
+  return result;
+}
+
 // An implementation of a simple single pass variance algorithm. A standard 
UDA must
 // be single pass (i.e. does not scan the table more than once), so the most 
canonical
 // two pass approach is not practical.
@@ -2351,6 +2388,62 @@ template void AggregateFunctions::SampledNdvUpdate(
 template void AggregateFunctions::SampledNdvUpdate(
     FunctionContext*, const DecimalVal&, const DoubleVal&, StringVal*);
 
+template void AggregateFunctions::AggIfUpdate(
+    FunctionContext*, const BooleanVal& cond, const BooleanVal& src, 
BooleanVal* dst);
+template void AggregateFunctions::AggIfUpdate(
+    FunctionContext*, const BooleanVal& cond, const TinyIntVal& src, 
TinyIntVal* dst);
+template void AggregateFunctions::AggIfUpdate(
+    FunctionContext*, const BooleanVal& cond, const SmallIntVal& src, 
SmallIntVal* dst);
+template void AggregateFunctions::AggIfUpdate(
+    FunctionContext*, const BooleanVal& cond, const IntVal& src, IntVal* dst);
+template void AggregateFunctions::AggIfUpdate(
+    FunctionContext*, const BooleanVal& cond, const BigIntVal& src, BigIntVal* 
dst);
+template void AggregateFunctions::AggIfUpdate(
+    FunctionContext*, const BooleanVal& cond, const FloatVal& src, FloatVal* 
dst);
+template void AggregateFunctions::AggIfUpdate(
+    FunctionContext*, const BooleanVal& cond, const DoubleVal& src, DoubleVal* 
dst);
+template void AggregateFunctions::AggIfUpdate(
+    FunctionContext*, const BooleanVal& cond, const TimestampVal& src, 
TimestampVal* dst);
+template void AggregateFunctions::AggIfUpdate(
+    FunctionContext*, const BooleanVal& cond, const DecimalVal& src, 
DecimalVal* dst);
+
+template void AggregateFunctions::AggIfMerge(
+    FunctionContext*, const BooleanVal& src, BooleanVal* dst);
+template void AggregateFunctions::AggIfMerge(
+    FunctionContext*, const TinyIntVal& src, TinyIntVal* dst);
+template void AggregateFunctions::AggIfMerge(
+    FunctionContext*, const SmallIntVal& src, SmallIntVal* dst);
+template void AggregateFunctions::AggIfMerge(
+    FunctionContext*, const IntVal& src, IntVal* dst);
+template void AggregateFunctions::AggIfMerge(
+    FunctionContext*, const BigIntVal& src, BigIntVal* dst);
+template void AggregateFunctions::AggIfMerge(
+    FunctionContext*, const FloatVal& src, FloatVal* dst);
+template void AggregateFunctions::AggIfMerge(
+    FunctionContext*, const DoubleVal& src, DoubleVal* dst);
+template void AggregateFunctions::AggIfMerge(
+    FunctionContext*, const TimestampVal& src, TimestampVal* dst);
+template void AggregateFunctions::AggIfMerge(
+    FunctionContext*, const DecimalVal& src, DecimalVal* dst);
+
+template BooleanVal AggregateFunctions::AggIfFinalize(
+    FunctionContext*, const BooleanVal& src);
+template TinyIntVal AggregateFunctions::AggIfFinalize(
+    FunctionContext*, const TinyIntVal& src);
+template SmallIntVal AggregateFunctions::AggIfFinalize(
+    FunctionContext*, const SmallIntVal& src);
+template IntVal AggregateFunctions::AggIfFinalize(FunctionContext*, const 
IntVal& src);
+template BigIntVal AggregateFunctions::AggIfFinalize(
+    FunctionContext*, const BigIntVal& src);
+template FloatVal AggregateFunctions::AggIfFinalize(
+    FunctionContext*, const FloatVal& src);
+template DoubleVal AggregateFunctions::AggIfFinalize(
+    FunctionContext*, const DoubleVal& src);
+template TimestampVal AggregateFunctions::AggIfFinalize(
+    FunctionContext*, const TimestampVal& src);
+template DecimalVal AggregateFunctions::AggIfFinalize(
+    FunctionContext*, const DecimalVal& src);
+
 template void AggregateFunctions::KnuthVarUpdate(
     FunctionContext*, const TinyIntVal&, StringVal*);
 template void AggregateFunctions::KnuthVarUpdate(

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exprs/aggregate-functions.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/aggregate-functions.h 
b/be/src/exprs/aggregate-functions.h
index 243e9f7..e3c1425 100644
--- a/be/src/exprs/aggregate-functions.h
+++ b/be/src/exprs/aggregate-functions.h
@@ -221,6 +221,16 @@ class AggregateFunctions {
   static void SampledNdvMerge(FunctionContext*, const StringVal& src, 
StringVal* dst);
   static BigIntVal SampledNdvFinalize(FunctionContext*, const StringVal& src);
 
+  /// The AGGIF(predicate, expr) function returns 'expr' if 'predicate' is 
true.
+  /// It is expected that 'predicate' only returns true for a single row per 
group.
+  /// The predicate must not evaluate to NULL.
+  template <typename T>
+  static void AggIfUpdate(FunctionContext*, const BooleanVal& cond, const T& 
src, T* dst);
+  template <typename T>
+  static void AggIfMerge(FunctionContext*, const T& src, T* dst);
+  template <typename T>
+  static T AggIfFinalize(FunctionContext*, const T& src);
+
   /// Knuth's variance algorithm, more numerically stable than canonical stddev
   /// algorithms; reference implementation:
   /// 
http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online_algorithm

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exprs/scalar-expr.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/scalar-expr.cc b/be/src/exprs/scalar-expr.cc
index 42c2867..00e6709 100644
--- a/be/src/exprs/scalar-expr.cc
+++ b/be/src/exprs/scalar-expr.cc
@@ -44,6 +44,7 @@
 #include "exprs/tuple-is-null-predicate.h"
 #include "exprs/udf-builtins.h"
 #include "exprs/utility-functions.h"
+#include "exprs/valid-tuple-id.h"
 #include "runtime/runtime-state.h"
 #include "runtime/tuple-row.h"
 #include "runtime/tuple.h"
@@ -186,6 +187,9 @@ Status ScalarExpr::CreateNode(
     case TExprNodeType::KUDU_PARTITION_EXPR:
       *expr = pool->Add(new KuduPartitionExpr(texpr_node));
       return Status::OK();
+    case TExprNodeType::VALID_TUPLE_ID_EXPR:
+      *expr = pool->Add(new ValidTupleIdExpr(texpr_node));
+      return Status::OK();
     default:
       *expr = nullptr;
       stringstream os;

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exprs/valid-tuple-id.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/valid-tuple-id.cc b/be/src/exprs/valid-tuple-id.cc
new file mode 100644
index 0000000..84cdf2f
--- /dev/null
+++ b/be/src/exprs/valid-tuple-id.cc
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/valid-tuple-id.h"
+
+#include <sstream>
+
+#include "gen-cpp/Exprs_types.h"
+
+#include "common/names.h"
+#include "runtime/descriptors.h"
+#include "runtime/tuple-row.h"
+
+namespace impala {
+
+ValidTupleIdExpr::ValidTupleIdExpr(const TExprNode& node) : ScalarExpr(node) {}
+
+Status ValidTupleIdExpr::Init(const RowDescriptor& row_desc, RuntimeState* 
state) {
+  RETURN_IF_ERROR(ScalarExpr::Init(row_desc, state));
+  DCHECK_EQ(0, children_.size());
+  tuple_ids_.reserve(row_desc.tuple_descriptors().size());
+  for (TupleDescriptor* tuple_desc : row_desc.tuple_descriptors()) {
+    tuple_ids_.push_back(tuple_desc->id());
+  }
+  return Status::OK();
+}
+
+int ValidTupleIdExpr::ComputeNonNullCount(const TupleRow* row) const {
+  int num_tuples = tuple_ids_.size();
+  int non_null_count = 0;
+  for (int i = 0; i < num_tuples; ++i) non_null_count += (row->GetTuple(i) != 
nullptr);
+  return non_null_count;
+}
+
+IntVal ValidTupleIdExpr::GetIntVal(ScalarExprEvaluator* eval, const TupleRow* 
row) const {
+  // Validate that exactly one tuple is non-NULL.
+  DCHECK_EQ(1, ComputeNonNullCount(row));
+  int num_tuples = tuple_ids_.size();
+  for (int i = 0; i < num_tuples; ++i) {
+    if (row->GetTuple(i) != nullptr) return IntVal(tuple_ids_[i]);
+  }
+  return IntVal::null();
+}
+
+Status ValidTupleIdExpr::GetCodegendComputeFn(LlvmCodeGen* codegen, 
llvm::Function** fn) {
+  return GetCodegendComputeFnWrapper(codegen, fn);
+}
+
+string ValidTupleIdExpr::DebugString() const {
+  return "ValidTupleId()";
+}
+
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exprs/valid-tuple-id.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/valid-tuple-id.h b/be/src/exprs/valid-tuple-id.h
new file mode 100644
index 0000000..7eb03d9
--- /dev/null
+++ b/be/src/exprs/valid-tuple-id.h
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXPRS_VALID_TUPLE_ID_H_
+#define IMPALA_EXPRS_VALID_TUPLE_ID_H_
+
+#include "exprs/scalar-expr.h"
+
+namespace impala {
+
+class TExprNode;
+
+/// Returns the tuple id of the single non-NULL tuple in the input row.
+/// Valid input rows must have exactly one non-NULL tuple.
+class ValidTupleIdExpr : public ScalarExpr {
+ protected:
+  friend class ScalarExpr;
+
+  ValidTupleIdExpr(const TExprNode& node);
+
+  virtual Status Init(const RowDescriptor& row_desc, RuntimeState* state) 
override;
+  virtual Status GetCodegendComputeFn(
+      LlvmCodeGen* codegen, llvm::Function** fn) override WARN_UNUSED_RESULT;
+  virtual std::string DebugString() const override;
+
+  virtual IntVal GetIntVal(ScalarExprEvaluator*, const TupleRow*) const 
override;
+
+ private:
+  /// Maps from tuple index in the row to its corresponding tuple id.
+  std::vector<TupleId> tuple_ids_;
+
+  /// Returns the number of tuples in 'row' that are non-null. Used for 
debugging.
+  int ComputeNonNullCount(const TupleRow* row) const;
+};
+
+} // namespace impala
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 90d8c4d..1334858 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -242,6 +242,10 @@ class RowBatch {
       return Get();
     }
 
+    /// Returns the index in the RowBatch of the current row. This does an 
integer
+    /// division and so should not be used in hot inner loops.
+    int RowNum() { return (row_ - parent_->tuple_ptrs_) / num_tuples_per_row_; 
}
+
     /// Returns true if the iterator is beyond the last row for read iterators.
     /// Useful for read iterators to determine the limit. Write iterators 
should use
     /// RowBatch::AtCapacity() instead.
@@ -334,6 +338,10 @@ class RowBatch {
         num_rows * num_tuples_per_row_ * sizeof(Tuple*));
   }
 
+  void ClearTuplePointers() {
+    memset(tuple_ptrs_, 0, capacity_ * num_tuples_per_row_ * sizeof(Tuple*));
+  }
+
   void ClearRow(TupleRow* row) {
     memset(row, 0, num_tuples_per_row_ * sizeof(Tuple*));
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/common/thrift/Exprs.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Exprs.thrift b/common/thrift/Exprs.thrift
index ee9fe94..584e525 100644
--- a/common/thrift/Exprs.thrift
+++ b/common/thrift/Exprs.thrift
@@ -38,7 +38,8 @@ enum TExprNodeType {
   FUNCTION_CALL,
   AGGREGATE_EXPR,
   IS_NOT_EMPTY_PRED,
-  KUDU_PARTITION_EXPR
+  KUDU_PARTITION_EXPR,
+  VALID_TUPLE_ID_EXPR
 }
 
 struct TBoolLiteral {

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index e116c3b..5d245fb 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -47,7 +47,8 @@ enum TPlanNodeType {
   UNNEST_NODE,
   SUBPLAN_NODE,
   KUDU_SCAN_NODE,
-  CARDINALITY_CHECK_NODE
+  CARDINALITY_CHECK_NODE,
+  MULTI_AGGREGATION_NODE
 }
 
 // phases of an execution node
@@ -352,7 +353,26 @@ struct TNestedLoopJoinNode {
   2: optional list<Exprs.TExpr> join_conjuncts
 }
 
-struct TAggregationNode {
+// This contains all of the information computed by the plan as part of the 
resource
+// profile that is needed by the backend to execute.
+struct TBackendResourceProfile {
+  // The minimum reservation for this plan node in bytes.
+  1: required i64 min_reservation
+
+  // The maximum reservation for this plan node in bytes. MAX_INT64 means 
effectively
+  // unlimited.
+  2: required i64 max_reservation
+
+  // The spillable buffer size in bytes to use for this node, chosen by the 
planner.
+  // Set iff the node uses spillable buffers.
+  3: optional i64 spillable_buffer_size
+
+  // The buffer size in bytes that is large enough to fit the largest row to 
be processed.
+  // Set if the node allocates buffers for rows from the buffer pool.
+  4: optional i64 max_row_buffer_size
+}
+
+struct TAggregator {
   1: optional list<Exprs.TExpr> grouping_exprs
   // aggregate exprs. The root of each expr is the aggregate function. The
   // other exprs are the inputs to the aggregate function.
@@ -366,14 +386,25 @@ struct TAggregationNode {
   // aggregate functions.
   4: required Types.TTupleId output_tuple_id
 
-  // Set to true if this aggregation node needs to run the finalization step.
+  // Set to true if this aggregator needs to run the finalization step.
   5: required bool need_finalize
 
   // Set to true to use the streaming preagg algorithm. Node must be a 
preaggregation.
   6: required bool use_streaming_preaggregation
 
-  // Estimate of number of input rows from the planner.
-  7: required i64 estimated_input_cardinality
+  7: required TBackendResourceProfile resource_profile
+}
+
+struct TAggregationNode {
+  // Aggregators for this node, each with a unique set of grouping exprs.
+  1: required list<TAggregator> aggregators
+
+  // Used in streaming aggregations to determine how much memory to use.
+  2: required i64 estimated_input_cardinality
+
+  // If true, this is the first AggregationNode in a aggregation plan with 
multiple
+  // Aggregators and the entire input to this node should be passed to each 
Aggregator.
+  3: required bool replicate_input
 }
 
 struct TSortInfo {
@@ -522,25 +553,6 @@ struct TUnnestNode {
   1: required Exprs.TExpr collection_expr
 }
 
-// This contains all of the information computed by the plan as part of the 
resource
-// profile that is needed by the backend to execute.
-struct TBackendResourceProfile {
-  // The minimum reservation for this plan node in bytes.
-  1: required i64 min_reservation
-
-  // The maximum reservation for this plan node in bytes. MAX_INT64 means 
effectively
-  // unlimited.
-  2: required i64 max_reservation
-
-  // The spillable buffer size in bytes to use for this node, chosen by the 
planner.
-  // Set iff the node uses spillable buffers.
-  3: optional i64 spillable_buffer_size
-
-  // The buffer size in bytes that is large enough to fit the largest row to 
be processed.
-  // Set if the node allocates buffers for rows from the buffer pool.
-  4: optional i64 max_row_buffer_size
-}
-
 struct TCardinalityCheckNode {
   // Associated statement of child
   1: required string display_statement

Reply via email to