http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/grouping-aggregator.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/grouping-aggregator.cc 
b/be/src/exec/grouping-aggregator.cc
new file mode 100644
index 0000000..60001a6
--- /dev/null
+++ b/be/src/exec/grouping-aggregator.cc
@@ -0,0 +1,1098 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/grouping-aggregator.h"
+
+#include <sstream>
+
+#include "codegen/llvm-codegen.h"
+#include "exec/exec-node.h"
+#include "exec/hash-table.inline.h"
+#include "exprs/agg-fn-evaluator.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/slot-ref.h"
+#include "gutil/strings/substitute.h"
+#include "runtime/buffered-tuple-stream.inline.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec-env.h"
+#include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
+#include "runtime/tuple.h"
+#include "util/runtime-profile-counters.h"
+
+#include "gen-cpp/PlanNodes_types.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+/// The minimum reduction factor (input rows divided by output rows) to grow 
hash tables
+/// in a streaming preaggregation, given that the hash tables are currently 
the given
+/// size or above. The sizes roughly correspond to hash table sizes where the 
bucket
+/// arrays will fit in  a cache level. Intuitively, we don't want the working 
set of the
+/// aggregation to expand to the next level of cache unless we're reducing the 
input
+/// enough to outweigh the increased memory latency we'll incur for each hash 
table
+/// lookup.
+///
+/// Note that the current reduction achieved is not always a good estimate of 
the
+/// final reduction. It may be biased either way depending on the ordering of 
the
+/// input. If the input order is random, we will underestimate the final 
reduction
+/// factor because the probability of a row having the same key as a previous 
row
+/// increases as more input is processed.  If the input order is correlated 
with the
+/// key, skew may bias the estimate. If high cardinality keys appear first, we
+/// may overestimate and if low cardinality keys appear first, we 
underestimate.
+/// To estimate the eventual reduction achieved, we estimate the final 
reduction
+/// using the planner's estimated input cardinality and the assumption that 
input
+/// is in a random order. This means that we assume that the reduction factor 
will
+/// increase over time.
+struct StreamingHtMinReductionEntry {
+  // Use 'streaming_ht_min_reduction' if the total size of hash table bucket 
directories
+  // in bytes is greater than this threshold.
+  int min_ht_mem;
+  // The minimum reduction factor to expand the hash tables.
+  double streaming_ht_min_reduction;
+};
+
+// TODO: experimentally tune these values and also programmatically get the 
cache size
+// of the machine that we're running on.
+static const StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
+    // Expand up to L2 cache always.
+    {0, 0.0},
+    // Expand into L3 cache if we look like we're getting some reduction.
+    {256 * 1024, 1.1},
+    // Expand into main memory if we're getting a significant reduction.
+    {2 * 1024 * 1024, 2.0},
+};
+
+static const int STREAMING_HT_MIN_REDUCTION_SIZE =
+    sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]);
+
+GroupingAggregator::GroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
+    const TPlanNode& tnode, const DescriptorTbl& descs)
+  : Aggregator(exec_node, pool, tnode, descs, "GroupingAggregator"),
+    intermediate_row_desc_(intermediate_tuple_desc_, false),
+    is_streaming_preagg_(tnode.agg_node.use_streaming_preaggregation),
+    needs_serialize_(false),
+    output_partition_(nullptr),
+    resource_profile_(exec_node->resource_profile()),
+    num_input_rows_(0),
+    is_in_subplan_(exec_node->IsInSubplan()),
+    limit_(exec_node->limit()),
+    add_batch_impl_fn_(nullptr),
+    add_batch_streaming_impl_fn_(nullptr),
+    ht_resize_timer_(nullptr),
+    get_results_timer_(nullptr),
+    num_hash_buckets_(nullptr),
+    partitions_created_(nullptr),
+    max_partition_level_(nullptr),
+    num_row_repartitioned_(nullptr),
+    num_repartitions_(nullptr),
+    num_spilled_partitions_(nullptr),
+    largest_partition_percent_(nullptr),
+    streaming_timer_(nullptr),
+    num_passthrough_rows_(nullptr),
+    preagg_estimated_reduction_(nullptr),
+    preagg_streaming_ht_min_reduction_(nullptr),
+    estimated_input_cardinality_(tnode.agg_node.estimated_input_cardinality),
+    partition_eos_(false),
+    partition_pool_(new ObjectPool()) {
+  DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS);
+}
+
+Status GroupingAggregator::Init(const TPlanNode& tnode, RuntimeState* state) {
+  RETURN_IF_ERROR(ScalarExpr::Create(
+      tnode.agg_node.grouping_exprs, input_row_desc_, state, 
&grouping_exprs_));
+
+  // Construct build exprs from intermediate_row_desc_
+  for (int i = 0; i < grouping_exprs_.size(); ++i) {
+    SlotDescriptor* desc = intermediate_tuple_desc_->slots()[i];
+    DCHECK(desc->type().type == TYPE_NULL || desc->type() == 
grouping_exprs_[i]->type());
+    // Hack to avoid TYPE_NULL SlotRefs.
+    SlotRef* build_expr =
+        pool_->Add(desc->type().type != TYPE_NULL ? new SlotRef(desc) :
+                                                    new SlotRef(desc, 
TYPE_BOOLEAN));
+    build_exprs_.push_back(build_expr);
+    RETURN_IF_ERROR(build_expr->Init(intermediate_row_desc_, state));
+    if (build_expr->type().IsVarLenStringType()) 
string_grouping_exprs_.push_back(i);
+  }
+
+  RETURN_IF_ERROR(Aggregator::Init(tnode, state));
+  for (int i = 0; i < agg_fns_.size(); ++i) {
+    needs_serialize_ |= agg_fns_[i]->SupportsSerialize();
+  }
+  return Status::OK();
+}
+
+Status GroupingAggregator::Prepare(RuntimeState* state) {
+  RETURN_IF_ERROR(Aggregator::Prepare(state));
+  state_ = state;
+  tuple_pool_.reset(new MemPool(mem_tracker_.get()));
+
+  ht_resize_timer_ = ADD_TIMER(runtime_profile(), "HTResizeTime");
+  get_results_timer_ = ADD_TIMER(runtime_profile(), "GetResultsTime");
+  num_hash_buckets_ = ADD_COUNTER(runtime_profile(), "HashBuckets", 
TUnit::UNIT);
+  partitions_created_ = ADD_COUNTER(runtime_profile(), "PartitionsCreated", 
TUnit::UNIT);
+  largest_partition_percent_ =
+      runtime_profile()->AddHighWaterMarkCounter("LargestPartitionPercent", 
TUnit::UNIT);
+  if (is_streaming_preagg_) {
+    runtime_profile()->AppendExecOption("Streaming Preaggregation");
+    streaming_timer_ = ADD_TIMER(runtime_profile(), "StreamingTime");
+    num_passthrough_rows_ =
+        ADD_COUNTER(runtime_profile(), "RowsPassedThrough", TUnit::UNIT);
+    preagg_estimated_reduction_ =
+        ADD_COUNTER(runtime_profile(), "ReductionFactorEstimate", 
TUnit::DOUBLE_VALUE);
+    preagg_streaming_ht_min_reduction_ = ADD_COUNTER(
+        runtime_profile(), "ReductionFactorThresholdToExpand", 
TUnit::DOUBLE_VALUE);
+  } else {
+    num_row_repartitioned_ =
+        ADD_COUNTER(runtime_profile(), "RowsRepartitioned", TUnit::UNIT);
+    num_repartitions_ = ADD_COUNTER(runtime_profile(), "NumRepartitions", 
TUnit::UNIT);
+    num_spilled_partitions_ =
+        ADD_COUNTER(runtime_profile(), "SpilledPartitions", TUnit::UNIT);
+    max_partition_level_ =
+        runtime_profile()->AddHighWaterMarkCounter("MaxPartitionLevel", 
TUnit::UNIT);
+  }
+
+  RETURN_IF_ERROR(HashTableCtx::Create(pool_, state, build_exprs_, 
grouping_exprs_, true,
+      vector<bool>(build_exprs_.size(), true), state->fragment_hash_seed(),
+      MAX_PARTITION_DEPTH, 1, expr_perm_pool_.get(), expr_results_pool_.get(),
+      expr_results_pool_.get(), &ht_ctx_));
+
+  reservation_manager_.Init(
+      Substitute("GroupingAggregator id=$0 ptr=$1", id_, this), 
runtime_profile_,
+      mem_tracker_.get(), resource_profile_, debug_options_);
+  return Status::OK();
+}
+
+void GroupingAggregator::Codegen(RuntimeState* state) {
+  LlvmCodeGen* codegen = state->codegen();
+  DCHECK(codegen != nullptr);
+  TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
+  Status codegen_status = is_streaming_preagg_ ?
+      CodegenAddBatchStreamingImpl(codegen, prefetch_mode) :
+      CodegenAddBatchImpl(codegen, prefetch_mode);
+  runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
+}
+
+Status GroupingAggregator::Open(RuntimeState* state) {
+  RETURN_IF_ERROR(Aggregator::Open(state));
+
+  // Claim reservation after the child has been opened to reduce the peak 
reservation
+  // requirement.
+  if (!buffer_pool_client()->is_registered()) {
+    DCHECK_GE(resource_profile_.min_reservation, MinReservation());
+    RETURN_IF_ERROR(reservation_manager_.ClaimBufferReservation(state));
+  }
+
+  DCHECK(ht_ctx_.get() != nullptr);
+  RETURN_IF_ERROR(ht_ctx_->Open(state));
+
+  if (ht_allocator_ == nullptr) {
+    // Allocate 'serialize_stream_' and 'ht_allocator_' on the first Open() 
call.
+    ht_allocator_.reset(new Suballocator(state_->exec_env()->buffer_pool(),
+        buffer_pool_client(), resource_profile_.spillable_buffer_size));
+
+    if (!is_streaming_preagg_ && needs_serialize_) {
+      serialize_stream_.reset(new BufferedTupleStream(state, 
&intermediate_row_desc_,
+          buffer_pool_client(), resource_profile_.spillable_buffer_size,
+          resource_profile_.max_row_buffer_size));
+      RETURN_IF_ERROR(serialize_stream_->Init(id_, false));
+      bool got_buffer;
+      // Reserve the memory for 'serialize_stream_' so we don't need to 
scrounge up
+      // another buffer during spilling.
+      RETURN_IF_ERROR(serialize_stream_->PrepareForWrite(&got_buffer));
+      DCHECK(got_buffer) << "Accounted in min reservation"
+                         << buffer_pool_client()->DebugString();
+      DCHECK(serialize_stream_->has_write_iterator());
+    }
+  }
+  RETURN_IF_ERROR(CreateHashPartitions(0));
+  return Status::OK();
+}
+
+Status GroupingAggregator::GetNext(RuntimeState* state, RowBatch* row_batch, 
bool* eos) {
+  if (!partition_eos_) {
+    RETURN_IF_ERROR(GetRowsFromPartition(state, row_batch));
+  }
+  *eos = partition_eos_;
+  return Status::OK();
+}
+
+Status GroupingAggregator::GetRowsFromPartition(
+    RuntimeState* state, RowBatch* row_batch) {
+  DCHECK(!row_batch->AtCapacity());
+  if (output_iterator_.AtEnd()) {
+    // Done with this partition, move onto the next one.
+    if (output_partition_ != nullptr) {
+      output_partition_->Close(false);
+      output_partition_ = nullptr;
+    }
+    if (aggregated_partitions_.empty() && spilled_partitions_.empty()) {
+      // No more partitions, all done.
+      partition_eos_ = true;
+      return Status::OK();
+    }
+    // Process next partition.
+    RETURN_IF_ERROR(NextPartition());
+    DCHECK(output_partition_ != nullptr);
+  }
+
+  SCOPED_TIMER(get_results_timer_);
+
+  // The output row batch may reference memory allocated by Serialize() or 
Finalize(),
+  // allocating that memory directly from the row batch's pool means we can 
safely return
+  // the batch.
+  vector<ScopedResultsPool> allocate_from_batch_pool = 
ScopedResultsPool::Create(
+      output_partition_->agg_fn_evals, row_batch->tuple_data_pool());
+  int count = 0;
+  const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
+  // Keeping returning rows from the current partition.
+  while (!output_iterator_.AtEnd() && !row_batch->AtCapacity()) {
+    // This loop can go on for a long time if the conjuncts are very 
selective. Do query
+    // maintenance every N iterations.
+    if ((count++ & (N - 1)) == 0) {
+      RETURN_IF_CANCELLED(state);
+      RETURN_IF_ERROR(QueryMaintenance(state));
+    }
+
+    int row_idx = row_batch->AddRow();
+    TupleRow* row = row_batch->GetRow(row_idx);
+    Tuple* intermediate_tuple = output_iterator_.GetTuple();
+    Tuple* output_tuple = GetOutputTuple(output_partition_->agg_fn_evals,
+        intermediate_tuple, row_batch->tuple_data_pool());
+    output_iterator_.Next();
+    row->SetTuple(0, output_tuple);
+    DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size());
+    if (ExecNode::EvalConjuncts(conjunct_evals_.data(), conjuncts_.size(), 
row)) {
+      row_batch->CommitLastRow();
+      ++num_rows_returned_;
+      if (ReachedLimit()) break;
+    }
+  }
+
+  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  partition_eos_ = ReachedLimit();
+  if (output_iterator_.AtEnd()) row_batch->MarkNeedsDeepCopy();
+
+  return Status::OK();
+}
+
+bool GroupingAggregator::ShouldExpandPreaggHashTables() const {
+  int64_t ht_mem = 0;
+  int64_t ht_rows = 0;
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    HashTable* ht = hash_partitions_[i]->hash_tbl.get();
+    ht_mem += ht->CurrentMemSize();
+    ht_rows += ht->size();
+  }
+
+  // Need some rows in tables to have valid statistics.
+  if (ht_rows == 0) return true;
+
+  // Find the appropriate reduction factor in our table for the current hash 
table sizes.
+  int cache_level = 0;
+  while (cache_level + 1 < STREAMING_HT_MIN_REDUCTION_SIZE
+      && ht_mem >= STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
+    ++cache_level;
+  }
+
+  // Compare the number of rows in the hash table with the number of input 
rows that
+  // were aggregated into it. Exclude passed through rows from this 
calculation since
+  // they were not in hash tables.
+  const int64_t aggregated_input_rows = num_input_rows_ - num_rows_returned_;
+  const int64_t expected_input_rows = estimated_input_cardinality_ - 
num_rows_returned_;
+  double current_reduction = static_cast<double>(aggregated_input_rows) / 
ht_rows;
+
+  // TODO: workaround for IMPALA-2490: subplan node rows_returned counter may 
be
+  // inaccurate, which could lead to a divide by zero below.
+  if (aggregated_input_rows <= 0) return true;
+
+  // Extrapolate the current reduction factor (r) using the formula
+  // R = 1 + (N / n) * (r - 1), where R is the reduction factor over the full 
input data
+  // set, N is the number of input rows, excluding passed-through rows, and n 
is the
+  // number of rows inserted or merged into the hash tables. This is a very 
rough
+  // approximation but is good enough to be useful.
+  // TODO: consider collecting more statistics to better estimate reduction.
+  double estimated_reduction = aggregated_input_rows >= expected_input_rows ?
+      current_reduction :
+      1 + (expected_input_rows / aggregated_input_rows) * (current_reduction - 
1);
+  double min_reduction =
+      STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
+
+  COUNTER_SET(preagg_estimated_reduction_, estimated_reduction);
+  COUNTER_SET(preagg_streaming_ht_min_reduction_, min_reduction);
+  return estimated_reduction > min_reduction;
+}
+
+void GroupingAggregator::CleanupHashTbl(
+    const vector<AggFnEvaluator*>& agg_fn_evals, HashTable::Iterator it) {
+  if (!needs_finalize_ && !needs_serialize_) return;
+
+  // Iterate through the remaining rows in the hash table and call 
Serialize/Finalize on
+  // them in order to free any memory allocated by UDAs.
+  if (needs_finalize_) {
+    // Finalize() requires a dst tuple but we don't actually need the result,
+    // so allocate a single dummy tuple to avoid accumulating memory.
+    Tuple* dummy_dst = nullptr;
+    dummy_dst = Tuple::Create(output_tuple_desc_->byte_size(), 
tuple_pool_.get());
+    while (!it.AtEnd()) {
+      Tuple* tuple = it.GetTuple();
+      AggFnEvaluator::Finalize(agg_fn_evals, tuple, dummy_dst);
+      it.Next();
+      // Free any expr result allocations to prevent them accumulating 
excessively.
+      expr_results_pool_->Clear();
+    }
+  } else {
+    while (!it.AtEnd()) {
+      Tuple* tuple = it.GetTuple();
+      AggFnEvaluator::Serialize(agg_fn_evals, tuple);
+      it.Next();
+      // Free any expr result allocations to prevent them accumulating 
excessively.
+      expr_results_pool_->Clear();
+    }
+  }
+}
+
+Status GroupingAggregator::Reset(RuntimeState* state) {
+  DCHECK(!is_streaming_preagg_) << "Cannot reset preaggregation";
+  partition_eos_ = false;
+  // Reset the HT and the partitions for this grouping agg.
+  ht_ctx_->set_level(0);
+  ClosePartitions();
+  return Status::OK();
+}
+
+void GroupingAggregator::Close(RuntimeState* state) {
+  // Iterate through the remaining rows in the hash table and call 
Serialize/Finalize on
+  // them in order to free any memory allocated by UDAs
+  if (output_partition_ != nullptr) {
+    CleanupHashTbl(output_partition_->agg_fn_evals, output_iterator_);
+    output_partition_->Close(false);
+  }
+
+  ClosePartitions();
+
+  if (tuple_pool_.get() != nullptr) tuple_pool_->FreeAll();
+  if (ht_ctx_.get() != nullptr) ht_ctx_->Close(state);
+  ht_ctx_.reset();
+  if (serialize_stream_.get() != nullptr) {
+    serialize_stream_->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+  }
+  ScalarExpr::Close(grouping_exprs_);
+  ScalarExpr::Close(build_exprs_);
+
+  reservation_manager_.Close(state);
+  // Must be called after tuple_pool_ is freed, so that mem_tracker_ can be 
closed.
+  Aggregator::Close(state);
+}
+
+Status GroupingAggregator::AddBatch(RuntimeState* state, RowBatch* batch) {
+  SCOPED_TIMER(build_timer_);
+  num_input_rows_ += batch->num_rows();
+
+  TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
+  if (add_batch_impl_fn_ != nullptr) {
+    RETURN_IF_ERROR(add_batch_impl_fn_(this, batch, prefetch_mode, 
ht_ctx_.get()));
+  } else {
+    RETURN_IF_ERROR(AddBatchImpl<false>(batch, prefetch_mode, ht_ctx_.get()));
+  }
+
+  return Status::OK();
+}
+
+Status GroupingAggregator::AddBatchStreaming(
+    RuntimeState* state, RowBatch* out_batch, RowBatch* child_batch) {
+  SCOPED_TIMER(streaming_timer_);
+  num_input_rows_ += child_batch->num_rows();
+
+  int remaining_capacity[PARTITION_FANOUT];
+  bool ht_needs_expansion = false;
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    HashTable* hash_tbl = GetHashTable(i);
+    remaining_capacity[i] = hash_tbl->NumInsertsBeforeResize();
+    ht_needs_expansion |= remaining_capacity[i] < child_batch->num_rows();
+  }
+
+  // Stop expanding hash tables if we're not reducing the input sufficiently. 
As our
+  // hash tables expand out of each level of cache hierarchy, every hash table 
lookup
+  // will take longer. We also may not be able to expand hash tables because 
of memory
+  // pressure. In this case HashTable::CheckAndResize() will fail. In either 
case we
+  // should always use the remaining space in the hash table to avoid wasting 
memory.
+  if (ht_needs_expansion && ShouldExpandPreaggHashTables()) {
+    for (int i = 0; i < PARTITION_FANOUT; ++i) {
+      HashTable* ht = GetHashTable(i);
+      if (remaining_capacity[i] < child_batch->num_rows()) {
+        SCOPED_TIMER(ht_resize_timer_);
+        bool resized;
+        RETURN_IF_ERROR(
+            ht->CheckAndResize(child_batch->num_rows(), ht_ctx_.get(), 
&resized));
+        if (resized) {
+          remaining_capacity[i] = ht->NumInsertsBeforeResize();
+        }
+      }
+    }
+  }
+
+  TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
+  if (add_batch_streaming_impl_fn_ != nullptr) {
+    RETURN_IF_ERROR(add_batch_streaming_impl_fn_(this, needs_serialize_, 
prefetch_mode,
+        child_batch, out_batch, ht_ctx_.get(), remaining_capacity));
+  } else {
+    RETURN_IF_ERROR(AddBatchStreamingImpl(needs_serialize_, prefetch_mode, 
child_batch,
+        out_batch, ht_ctx_.get(), remaining_capacity));
+  }
+
+  num_rows_returned_ += out_batch->num_rows();
+  COUNTER_SET(num_passthrough_rows_, num_rows_returned_);
+  return Status::OK();
+}
+
+Status GroupingAggregator::InputDone() {
+  return MoveHashPartitions(num_input_rows_);
+}
+
+Tuple* GroupingAggregator::ConstructIntermediateTuple(
+    const vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool, Status* 
status) noexcept {
+  const int fixed_size = intermediate_tuple_desc_->byte_size();
+  const int varlen_size = GroupingExprsVarlenSize();
+  const int tuple_data_size = fixed_size + varlen_size;
+  uint8_t* tuple_data = pool->TryAllocate(tuple_data_size);
+  if (UNLIKELY(tuple_data == nullptr)) {
+    string details = Substitute("Cannot perform aggregation at aggregator with 
id $0. "
+                                "Failed to allocate $1 bytes for intermediate 
tuple.",
+        id_, tuple_data_size);
+    *status = pool->mem_tracker()->MemLimitExceeded(state_, details, 
tuple_data_size);
+    return nullptr;
+  }
+  memset(tuple_data, 0, fixed_size);
+  Tuple* intermediate_tuple = reinterpret_cast<Tuple*>(tuple_data);
+  uint8_t* varlen_data = tuple_data + fixed_size;
+  CopyGroupingValues(intermediate_tuple, varlen_data, varlen_size);
+  InitAggSlots(agg_fn_evals, intermediate_tuple);
+  return intermediate_tuple;
+}
+
+Tuple* GroupingAggregator::ConstructIntermediateTuple(
+    const vector<AggFnEvaluator*>& agg_fn_evals, BufferedTupleStream* stream,
+    Status* status) noexcept {
+  DCHECK(stream != nullptr && status != nullptr);
+  // Allocate space for the entire tuple in the stream.
+  const int fixed_size = intermediate_tuple_desc_->byte_size();
+  const int varlen_size = GroupingExprsVarlenSize();
+  const int tuple_size = fixed_size + varlen_size;
+  uint8_t* tuple_data = stream->AddRowCustomBegin(tuple_size, status);
+  if (UNLIKELY(tuple_data == nullptr)) {
+    // If we failed to allocate and did not hit an error (indicated by a 
non-ok status),
+    // the caller of this function can try to free some space, e.g. through 
spilling, and
+    // re-attempt to allocate space for this row.
+    return nullptr;
+  }
+  Tuple* tuple = reinterpret_cast<Tuple*>(tuple_data);
+  tuple->Init(fixed_size);
+  uint8_t* varlen_buffer = tuple_data + fixed_size;
+  CopyGroupingValues(tuple, varlen_buffer, varlen_size);
+  InitAggSlots(agg_fn_evals, tuple);
+  stream->AddRowCustomEnd(tuple_size);
+  return tuple;
+}
+
+int GroupingAggregator::GroupingExprsVarlenSize() {
+  int varlen_size = 0;
+  // TODO: The hash table could compute this as it hashes.
+  for (int expr_idx : string_grouping_exprs_) {
+    StringValue* sv = 
reinterpret_cast<StringValue*>(ht_ctx_->ExprValue(expr_idx));
+    // Avoid branching by multiplying length by null bit.
+    varlen_size += sv->len * !ht_ctx_->ExprValueNull(expr_idx);
+  }
+  return varlen_size;
+}
+
+// TODO: codegen this function.
+void GroupingAggregator::CopyGroupingValues(
+    Tuple* intermediate_tuple, uint8_t* buffer, int varlen_size) {
+  // Copy over all grouping slots (the variable length data is copied below).
+  for (int i = 0; i < grouping_exprs_.size(); ++i) {
+    SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[i];
+    if (ht_ctx_->ExprValueNull(i)) {
+      intermediate_tuple->SetNull(slot_desc->null_indicator_offset());
+    } else {
+      void* src = ht_ctx_->ExprValue(i);
+      void* dst = intermediate_tuple->GetSlot(slot_desc->tuple_offset());
+      memcpy(dst, src, slot_desc->slot_size());
+    }
+  }
+
+  for (int expr_idx : string_grouping_exprs_) {
+    if (ht_ctx_->ExprValueNull(expr_idx)) continue;
+
+    SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[expr_idx];
+    // ptr and len were already copied to the fixed-len part of string value
+    StringValue* sv = reinterpret_cast<StringValue*>(
+        intermediate_tuple->GetSlot(slot_desc->tuple_offset()));
+    memcpy(buffer, sv->ptr, sv->len);
+    sv->ptr = reinterpret_cast<char*>(buffer);
+    buffer += sv->len;
+  }
+}
+
+template <bool AGGREGATED_ROWS>
+Status GroupingAggregator::AppendSpilledRow(
+    Partition* __restrict__ partition, TupleRow* __restrict__ row) {
+  DCHECK(!is_streaming_preagg_);
+  DCHECK(partition->is_spilled());
+  BufferedTupleStream* stream = AGGREGATED_ROWS ?
+      partition->aggregated_row_stream.get() :
+      partition->unaggregated_row_stream.get();
+  DCHECK(!stream->is_pinned());
+  Status status;
+  if (LIKELY(stream->AddRow(row, &status))) return Status::OK();
+  RETURN_IF_ERROR(status);
+
+  // Keep trying to free memory by spilling until we succeed or hit an error.
+  // Running out of partitions to spill is treated as an error by 
SpillPartition().
+  while (true) {
+    RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
+    if (stream->AddRow(row, &status)) return Status::OK();
+    RETURN_IF_ERROR(status);
+  }
+}
+
+void GroupingAggregator::SetDebugOptions(const TDebugOptions& debug_options) {
+  debug_options_ = debug_options;
+}
+
+string GroupingAggregator::DebugString(int indentation_level) const {
+  stringstream ss;
+  DebugString(indentation_level, &ss);
+  return ss.str();
+}
+
+void GroupingAggregator::DebugString(int indentation_level, stringstream* out) 
const {
+  *out << string(indentation_level * 2, ' ');
+  *out << "GroupingAggregator("
+       << "intermediate_tuple_id=" << intermediate_tuple_id_
+       << " output_tuple_id=" << output_tuple_id_ << " needs_finalize=" << 
needs_finalize_
+       << " grouping_exprs=" << ScalarExpr::DebugString(grouping_exprs_)
+       << " agg_exprs=" << AggFn::DebugString(agg_fns_);
+  *out << ")";
+}
+
+Status GroupingAggregator::CreateHashPartitions(int level, int 
single_partition_idx) {
+  if (is_streaming_preagg_) DCHECK_EQ(level, 0);
+  if (UNLIKELY(level >= MAX_PARTITION_DEPTH)) {
+    return Status(
+        TErrorCode::PARTITIONED_AGG_MAX_PARTITION_DEPTH, id_, 
MAX_PARTITION_DEPTH);
+  }
+  ht_ctx_->set_level(level);
+
+  DCHECK(hash_partitions_.empty());
+  int num_partitions_created = 0;
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    hash_tbls_[i] = nullptr;
+    if (single_partition_idx == -1 || i == single_partition_idx) {
+      Partition* new_partition = partition_pool_->Add(new Partition(this, 
level, i));
+      ++num_partitions_created;
+      hash_partitions_.push_back(new_partition);
+      RETURN_IF_ERROR(new_partition->InitStreams());
+    } else {
+      hash_partitions_.push_back(nullptr);
+    }
+  }
+  // Now that all the streams are reserved (meaning we have enough memory to 
execute
+  // the algorithm), allocate the hash tables. These can fail and we can still 
continue.
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    Partition* partition = hash_partitions_[i];
+    if (partition == nullptr) continue;
+    if (partition->aggregated_row_stream == nullptr) {
+      // Failed to create the aggregated row stream - cannot create a hash 
table.
+      // Just continue with a NULL hash table so rows will be passed through.
+      DCHECK(is_streaming_preagg_);
+    } else {
+      bool got_memory;
+      RETURN_IF_ERROR(partition->InitHashTable(&got_memory));
+      // Spill the partition if we cannot create a hash table for a merge 
aggregation.
+      if (UNLIKELY(!got_memory)) {
+        DCHECK(!is_streaming_preagg_) << "Preagg reserves enough memory for 
hash tables";
+        // If we're repartitioning, we will be writing aggregated rows first.
+        RETURN_IF_ERROR(partition->Spill(level > 0));
+      }
+    }
+    hash_tbls_[i] = partition->hash_tbl.get();
+  }
+  // In this case we did not have to repartition, so ensure that while 
building the hash
+  // table all rows will be inserted into the partition at 
'single_partition_idx' in case
+  // a non deterministic grouping expression causes a row to hash to a 
different
+  // partition index.
+  if (single_partition_idx != -1) {
+    Partition* partition = hash_partitions_[single_partition_idx];
+    for (int i = 0; i < PARTITION_FANOUT; ++i) {
+      hash_partitions_[i] = partition;
+      hash_tbls_[i] = partition->hash_tbl.get();
+    }
+  }
+
+  COUNTER_ADD(partitions_created_, num_partitions_created);
+  if (!is_streaming_preagg_) {
+    COUNTER_SET(max_partition_level_, level);
+  }
+  return Status::OK();
+}
+
+Status GroupingAggregator::CheckAndResizeHashPartitions(
+    bool partitioning_aggregated_rows, int num_rows, const HashTableCtx* 
ht_ctx) {
+  DCHECK(!is_streaming_preagg_);
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    Partition* partition = hash_partitions_[i];
+    if (partition == nullptr) continue;
+    while (!partition->is_spilled()) {
+      {
+        SCOPED_TIMER(ht_resize_timer_);
+        bool resized;
+        RETURN_IF_ERROR(partition->hash_tbl->CheckAndResize(num_rows, ht_ctx, 
&resized));
+        if (resized) break;
+      }
+      RETURN_IF_ERROR(SpillPartition(partitioning_aggregated_rows));
+    }
+  }
+  return Status::OK();
+}
+
+Status GroupingAggregator::NextPartition() {
+  DCHECK(output_partition_ == nullptr);
+
+  if (!is_in_subplan_ && spilled_partitions_.empty()) {
+    // All partitions are in memory. Release reservation that was used for 
previous
+    // partitions that is no longer needed. If we have spilled partitions, we 
want to
+    // hold onto all reservation in case it is needed to process the spilled 
partitions.
+    DCHECK(!buffer_pool_client()->has_unpinned_pages());
+    Status status = reservation_manager_.ReleaseUnusedReservation();
+    DCHECK(status.ok()) << "Should not fail - all partitions are in memory so 
there are "
+                        << "no unpinned pages. " << status.GetDetail();
+  }
+
+  // Keep looping until we get to a partition that fits in memory.
+  Partition* partition = nullptr;
+  while (true) {
+    // First return partitions that are fully aggregated (and in memory).
+    if (!aggregated_partitions_.empty()) {
+      partition = aggregated_partitions_.front();
+      DCHECK(!partition->is_spilled());
+      aggregated_partitions_.pop_front();
+      break;
+    }
+
+    // No aggregated partitions in memory - we should not be using any 
reservation aside
+    // from 'serialize_stream_'.
+    DCHECK_EQ(serialize_stream_ != nullptr ? 
serialize_stream_->BytesPinned(false) : 0,
+        buffer_pool_client()->GetUsedReservation())
+        << buffer_pool_client()->DebugString();
+
+    // Try to fit a single spilled partition in memory. We can often do this 
because
+    // we only need to fit 1/PARTITION_FANOUT of the data in memory.
+    // TODO: in some cases when the partition probably won't fit in memory it 
could
+    // be better to skip directly to repartitioning.
+    RETURN_IF_ERROR(BuildSpilledPartition(&partition));
+    if (partition != nullptr) break;
+
+    // If we can't fit the partition in memory, repartition it.
+    RETURN_IF_ERROR(RepartitionSpilledPartition());
+  }
+  DCHECK(!partition->is_spilled());
+  DCHECK(partition->hash_tbl.get() != nullptr);
+  DCHECK(partition->aggregated_row_stream->is_pinned());
+
+  output_partition_ = partition;
+  output_iterator_ = output_partition_->hash_tbl->Begin(ht_ctx_.get());
+  COUNTER_ADD(num_hash_buckets_, output_partition_->hash_tbl->num_buckets());
+  return Status::OK();
+}
+
+Status GroupingAggregator::BuildSpilledPartition(Partition** built_partition) {
+  DCHECK(!spilled_partitions_.empty());
+  DCHECK(!is_streaming_preagg_);
+  // Leave the partition in 'spilled_partitions_' to be closed if we hit an 
error.
+  Partition* src_partition = spilled_partitions_.front();
+  DCHECK(src_partition->is_spilled());
+
+  // Create a new hash partition from the rows of the spilled partition. This 
is simpler
+  // than trying to finish building a partially-built partition in place. We 
only
+  // initialise one hash partition that all rows in 'src_partition' will hash 
to.
+  RETURN_IF_ERROR(CreateHashPartitions(src_partition->level, 
src_partition->idx));
+  Partition* dst_partition = hash_partitions_[src_partition->idx];
+  DCHECK(dst_partition != nullptr);
+
+  // Rebuild the hash table over spilled aggregate rows then start adding 
unaggregated
+  // rows to the hash table. It's possible the partition will spill at either 
stage.
+  // In that case we need to finish processing 'src_partition' so that all 
rows are
+  // appended to 'dst_partition'.
+  // TODO: if the partition spills again but the aggregation reduces the input
+  // significantly, we could do better here by keeping the incomplete hash 
table in
+  // memory and only spilling unaggregated rows that didn't fit in the hash 
table
+  // (somewhat similar to the passthrough pre-aggregation).
+  
RETURN_IF_ERROR(ProcessStream<true>(src_partition->aggregated_row_stream.get()));
+  
RETURN_IF_ERROR(ProcessStream<false>(src_partition->unaggregated_row_stream.get()));
+  src_partition->Close(false);
+  spilled_partitions_.pop_front();
+  hash_partitions_.clear();
+
+  if (dst_partition->is_spilled()) {
+    PushSpilledPartition(dst_partition);
+    *built_partition = nullptr;
+    // Spilled the partition - we should not be using any reservation except 
from
+    // 'serialize_stream_'.
+    DCHECK_EQ(serialize_stream_ != nullptr ? 
serialize_stream_->BytesPinned(false) : 0,
+        buffer_pool_client()->GetUsedReservation())
+        << buffer_pool_client()->DebugString();
+  } else {
+    *built_partition = dst_partition;
+  }
+  return Status::OK();
+}
+
+Status GroupingAggregator::RepartitionSpilledPartition() {
+  DCHECK(!spilled_partitions_.empty());
+  DCHECK(!is_streaming_preagg_);
+  // Leave the partition in 'spilled_partitions_' to be closed if we hit an 
error.
+  Partition* partition = spilled_partitions_.front();
+  DCHECK(partition->is_spilled());
+
+  // Create the new hash partitions to repartition into. This will allocate a
+  // write buffer for each partition's aggregated row stream.
+  RETURN_IF_ERROR(CreateHashPartitions(partition->level + 1));
+  COUNTER_ADD(num_repartitions_, 1);
+
+  // Rows in this partition could have been spilled into two streams, depending
+  // on if it is an aggregated intermediate, or an unaggregated row. Aggregated
+  // rows are processed first to save a hash table lookup in AddBatchImpl().
+  RETURN_IF_ERROR(ProcessStream<true>(partition->aggregated_row_stream.get()));
+
+  // Prepare write buffers so we can append spilled rows to unaggregated 
partitions.
+  for (Partition* hash_partition : hash_partitions_) {
+    if (!hash_partition->is_spilled()) continue;
+    // The aggregated rows have been repartitioned. Free up at least a 
buffer's worth of
+    // reservation and use it to pin the unaggregated write buffer.
+    
hash_partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+    bool got_buffer;
+    RETURN_IF_ERROR(
+        hash_partition->unaggregated_row_stream->PrepareForWrite(&got_buffer));
+    DCHECK(got_buffer) << "Accounted in min reservation"
+                       << buffer_pool_client()->DebugString();
+  }
+  
RETURN_IF_ERROR(ProcessStream<false>(partition->unaggregated_row_stream.get()));
+
+  COUNTER_ADD(num_row_repartitioned_, 
partition->aggregated_row_stream->num_rows());
+  COUNTER_ADD(num_row_repartitioned_, 
partition->unaggregated_row_stream->num_rows());
+
+  partition->Close(false);
+  spilled_partitions_.pop_front();
+
+  // Done processing this partition. Move the new partitions into
+  // spilled_partitions_/aggregated_partitions_.
+  int64_t num_input_rows = partition->aggregated_row_stream->num_rows()
+      + partition->unaggregated_row_stream->num_rows();
+  RETURN_IF_ERROR(MoveHashPartitions(num_input_rows));
+  return Status::OK();
+}
+
+template <bool AGGREGATED_ROWS>
+Status GroupingAggregator::ProcessStream(BufferedTupleStream* input_stream) {
+  DCHECK(!is_streaming_preagg_);
+  if (input_stream->num_rows() > 0) {
+    while (true) {
+      bool got_buffer = false;
+      RETURN_IF_ERROR(input_stream->PrepareForRead(true, &got_buffer));
+      if (got_buffer) break;
+      // Did not have a buffer to read the input stream. Spill and try again.
+      RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
+    }
+
+    TPrefetchMode::type prefetch_mode = state_->query_options().prefetch_mode;
+    bool eos = false;
+    const RowDescriptor* desc =
+        AGGREGATED_ROWS ? &intermediate_row_desc_ : &input_row_desc_;
+    RowBatch batch(desc, state_->batch_size(), mem_tracker_.get());
+    do {
+      RETURN_IF_ERROR(input_stream->GetNext(&batch, &eos));
+      RETURN_IF_ERROR(
+          AddBatchImpl<AGGREGATED_ROWS>(&batch, prefetch_mode, ht_ctx_.get()));
+      RETURN_IF_ERROR(QueryMaintenance(state_));
+      batch.Reset();
+    } while (!eos);
+  }
+  input_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+  return Status::OK();
+}
+
+Status GroupingAggregator::SpillPartition(bool more_aggregate_rows) {
+  int64_t max_freed_mem = 0;
+  int partition_idx = -1;
+
+  // Iterate over the partitions and pick the largest partition that is not 
spilled.
+  for (int i = 0; i < hash_partitions_.size(); ++i) {
+    if (hash_partitions_[i] == nullptr) continue;
+    if (hash_partitions_[i]->is_closed) continue;
+    if (hash_partitions_[i]->is_spilled()) continue;
+    // Pass 'true' because we need to keep the write block pinned. See 
Partition::Spill().
+    int64_t mem = 
hash_partitions_[i]->aggregated_row_stream->BytesPinned(true);
+    mem += hash_partitions_[i]->hash_tbl->ByteSize();
+    mem += hash_partitions_[i]->agg_fn_perm_pool->total_reserved_bytes();
+    DCHECK_GT(mem, 0); // At least the hash table buckets should occupy memory.
+    if (mem > max_freed_mem) {
+      max_freed_mem = mem;
+      partition_idx = i;
+    }
+  }
+  DCHECK_NE(partition_idx, -1) << "Should have been able to spill a partition 
to "
+                               << "reclaim memory: "
+                               << buffer_pool_client()->DebugString();
+  // Remove references to the destroyed hash table from 'hash_tbls_'.
+  // Additionally, we might be dealing with a rebuilt spilled partition, where 
all
+  // partitions point to a single in-memory partition. This also ensures that 
'hash_tbls_'
+  // remains consistent in that case.
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    if (hash_partitions_[i] == hash_partitions_[partition_idx]) hash_tbls_[i] 
= nullptr;
+  }
+  return hash_partitions_[partition_idx]->Spill(more_aggregate_rows);
+}
+
+Status GroupingAggregator::MoveHashPartitions(int64_t num_input_rows) {
+  DCHECK(!hash_partitions_.empty());
+  stringstream ss;
+  ss << "PA(node_id=" << id_ << ") partitioned(level=" << 
hash_partitions_[0]->level
+     << ") " << num_input_rows << " rows into:" << endl;
+  for (int i = 0; i < hash_partitions_.size(); ++i) {
+    Partition* partition = hash_partitions_[i];
+    if (partition == nullptr) continue;
+    // We might be dealing with a rebuilt spilled partition, where all 
partitions are
+    // pointing to a single in-memory partition, so make sure we only proceed 
for the
+    // right partition.
+    if (i != partition->idx) continue;
+    int64_t aggregated_rows = 0;
+    if (partition->aggregated_row_stream != nullptr) {
+      aggregated_rows = partition->aggregated_row_stream->num_rows();
+    }
+    int64_t unaggregated_rows = 0;
+    if (partition->unaggregated_row_stream != nullptr) {
+      unaggregated_rows = partition->unaggregated_row_stream->num_rows();
+    }
+    double total_rows = aggregated_rows + unaggregated_rows;
+    double percent = total_rows * 100 / num_input_rows;
+    ss << "  " << i << " " << (partition->is_spilled() ? "spilled" : "not 
spilled")
+       << " (fraction=" << fixed << setprecision(2) << percent << "%)" << endl
+       << "    #aggregated rows:" << aggregated_rows << endl
+       << "    #unaggregated rows: " << unaggregated_rows << endl;
+
+    // TODO: update counters to support doubles.
+    COUNTER_SET(largest_partition_percent_, static_cast<int64_t>(percent));
+
+    if (total_rows == 0) {
+      partition->Close(false);
+    } else if (partition->is_spilled()) {
+      PushSpilledPartition(partition);
+    } else {
+      aggregated_partitions_.push_back(partition);
+    }
+  }
+  VLOG(2) << ss.str();
+  hash_partitions_.clear();
+  return Status::OK();
+}
+
+void GroupingAggregator::PushSpilledPartition(Partition* partition) {
+  DCHECK(partition->is_spilled());
+  DCHECK(partition->hash_tbl == nullptr);
+  // Ensure all pages in the spilled partition's streams are unpinned by 
invalidating
+  // the streams' read and write iterators. We may need all the memory to 
process the
+  // next spilled partitions.
+  
partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+  
partition->unaggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+  spilled_partitions_.push_front(partition);
+}
+
+void GroupingAggregator::ClosePartitions() {
+  for (Partition* partition : hash_partitions_) {
+    if (partition != nullptr) partition->Close(true);
+  }
+  hash_partitions_.clear();
+  for (Partition* partition : aggregated_partitions_) partition->Close(true);
+  aggregated_partitions_.clear();
+  for (Partition* partition : spilled_partitions_) partition->Close(true);
+  spilled_partitions_.clear();
+  memset(hash_tbls_, 0, sizeof(hash_tbls_));
+  partition_pool_->Clear();
+}
+
+int64_t GroupingAggregator::MinReservation() const {
+  // Must be kept in sync with AggregationNode.computeNodeResourceProfile() in 
fe.
+  if (is_streaming_preagg_) {
+    // Reserve at least one buffer and a 64kb hash table per partition.
+    return (resource_profile_.spillable_buffer_size + 64 * 1024) * 
PARTITION_FANOUT;
+  }
+  int num_buffers = PARTITION_FANOUT + 1 + (needs_serialize_ ? 1 : 0);
+  // Two of the buffers must fit the maximum row.
+  return resource_profile_.spillable_buffer_size * (num_buffers - 2)
+      + resource_profile_.max_row_buffer_size * 2;
+}
+
+Status GroupingAggregator::QueryMaintenance(RuntimeState* state) {
+  expr_results_pool_->Clear();
+  return state->CheckQueryState();
+}
+
+BufferPool::ClientHandle* GroupingAggregator::buffer_pool_client() {
+  return reservation_manager_.buffer_pool_client();
+}
+
+Status GroupingAggregator::CodegenAddBatchImpl(
+    LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) {
+  llvm::Function* update_tuple_fn;
+  RETURN_IF_ERROR(CodegenUpdateTuple(codegen, &update_tuple_fn));
+
+  // Get the cross compiled update row batch function
+  IRFunction::Type ir_fn = IRFunction::GROUPING_AGG_ADD_BATCH_IMPL;
+  llvm::Function* add_batch_impl_fn = codegen->GetFunction(ir_fn, true);
+  DCHECK(add_batch_impl_fn != nullptr);
+
+  int replaced;
+  // Codegen for grouping using hash table
+
+  // Replace prefetch_mode with constant so branches can be optimised out.
+  llvm::Value* prefetch_mode_arg = codegen->GetArgument(add_batch_impl_fn, 3);
+  
prefetch_mode_arg->replaceAllUsesWith(codegen->GetI32Constant(prefetch_mode));
+
+  // The codegen'd AddBatchImpl function is only used in Open() with level_ = 
0,
+  // so don't use murmur hash
+  llvm::Function* hash_fn;
+  RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(codegen, /* use murmur */ false, 
&hash_fn));
+
+  // Codegen HashTable::Equals<true>
+  llvm::Function* build_equals_fn;
+  RETURN_IF_ERROR(ht_ctx_->CodegenEquals(codegen, true, &build_equals_fn));
+
+  // Codegen for evaluating input rows
+  llvm::Function* eval_grouping_expr_fn;
+  RETURN_IF_ERROR(ht_ctx_->CodegenEvalRow(codegen, false, 
&eval_grouping_expr_fn));
+
+  // Replace call sites
+  replaced =
+      codegen->ReplaceCallSites(add_batch_impl_fn, eval_grouping_expr_fn, 
"EvalProbeRow");
+  DCHECK_EQ(replaced, 1);
+
+  replaced = codegen->ReplaceCallSites(add_batch_impl_fn, hash_fn, "HashRow");
+  DCHECK_EQ(replaced, 1);
+
+  replaced = codegen->ReplaceCallSites(add_batch_impl_fn, build_equals_fn, 
"Equals");
+  DCHECK_EQ(replaced, 1);
+
+  HashTableCtx::HashTableReplacedConstants replaced_constants;
+  const bool stores_duplicates = false;
+  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(
+      codegen, stores_duplicates, 1, add_batch_impl_fn, &replaced_constants));
+  DCHECK_GE(replaced_constants.stores_nulls, 1);
+  DCHECK_GE(replaced_constants.finds_some_nulls, 1);
+  DCHECK_GE(replaced_constants.stores_duplicates, 1);
+  DCHECK_GE(replaced_constants.stores_tuples, 1);
+  DCHECK_GE(replaced_constants.quadratic_probing, 1);
+
+  replaced = codegen->ReplaceCallSites(add_batch_impl_fn, update_tuple_fn, 
"UpdateTuple");
+  DCHECK_GE(replaced, 1);
+  add_batch_impl_fn = codegen->FinalizeFunction(add_batch_impl_fn);
+  if (add_batch_impl_fn == nullptr) {
+    return Status("GroupingAggregator::CodegenAddBatchImpl(): codegen'd "
+                  "AddBatchImpl() function failed verification, see log");
+  }
+
+  void** codegened_fn_ptr = reinterpret_cast<void**>(&add_batch_impl_fn_);
+  codegen->AddFunctionToJit(add_batch_impl_fn, codegened_fn_ptr);
+  return Status::OK();
+}
+
+Status GroupingAggregator::CodegenAddBatchStreamingImpl(
+    LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) {
+  DCHECK(is_streaming_preagg_);
+
+  IRFunction::Type ir_fn = IRFunction::GROUPING_AGG_ADD_BATCH_STREAMING_IMPL;
+  llvm::Function* add_batch_streaming_impl_fn = codegen->GetFunction(ir_fn, 
true);
+  DCHECK(add_batch_streaming_impl_fn != nullptr);
+
+  // Make needs_serialize arg constant so dead code can be optimised out.
+  llvm::Value* needs_serialize_arg = 
codegen->GetArgument(add_batch_streaming_impl_fn, 2);
+  
needs_serialize_arg->replaceAllUsesWith(codegen->GetBoolConstant(needs_serialize_));
+
+  // Replace prefetch_mode with constant so branches can be optimised out.
+  llvm::Value* prefetch_mode_arg = 
codegen->GetArgument(add_batch_streaming_impl_fn, 3);
+  
prefetch_mode_arg->replaceAllUsesWith(codegen->GetI32Constant(prefetch_mode));
+
+  llvm::Function* update_tuple_fn;
+  RETURN_IF_ERROR(CodegenUpdateTuple(codegen, &update_tuple_fn));
+
+  // We only use the top-level hash function for streaming aggregations.
+  llvm::Function* hash_fn;
+  RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(codegen, false, &hash_fn));
+
+  // Codegen HashTable::Equals
+  llvm::Function* equals_fn;
+  RETURN_IF_ERROR(ht_ctx_->CodegenEquals(codegen, true, &equals_fn));
+
+  // Codegen for evaluating input rows
+  llvm::Function* eval_grouping_expr_fn;
+  RETURN_IF_ERROR(ht_ctx_->CodegenEvalRow(codegen, false, 
&eval_grouping_expr_fn));
+
+  // Replace call sites
+  int replaced = codegen->ReplaceCallSites(
+      add_batch_streaming_impl_fn, update_tuple_fn, "UpdateTuple");
+  DCHECK_EQ(replaced, 2);
+
+  replaced = codegen->ReplaceCallSites(
+      add_batch_streaming_impl_fn, eval_grouping_expr_fn, "EvalProbeRow");
+  DCHECK_EQ(replaced, 1);
+
+  replaced = codegen->ReplaceCallSites(add_batch_streaming_impl_fn, hash_fn, 
"HashRow");
+  DCHECK_EQ(replaced, 1);
+
+  replaced = codegen->ReplaceCallSites(add_batch_streaming_impl_fn, equals_fn, 
"Equals");
+  DCHECK_EQ(replaced, 1);
+
+  HashTableCtx::HashTableReplacedConstants replaced_constants;
+  const bool stores_duplicates = false;
+  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(
+      codegen, stores_duplicates, 1, add_batch_streaming_impl_fn, 
&replaced_constants));
+  DCHECK_GE(replaced_constants.stores_nulls, 1);
+  DCHECK_GE(replaced_constants.finds_some_nulls, 1);
+  DCHECK_GE(replaced_constants.stores_duplicates, 1);
+  DCHECK_GE(replaced_constants.stores_tuples, 1);
+  DCHECK_GE(replaced_constants.quadratic_probing, 1);
+
+  DCHECK(add_batch_streaming_impl_fn != nullptr);
+  add_batch_streaming_impl_fn = 
codegen->FinalizeFunction(add_batch_streaming_impl_fn);
+  if (add_batch_streaming_impl_fn == nullptr) {
+    return Status("GroupingAggregator::CodegenAddBatchStreamingImpl(): 
codegen'd "
+                  "AddBatchStreamingImpl() function failed verification, see 
log");
+  }
+
+  codegen->AddFunctionToJit(add_batch_streaming_impl_fn,
+      reinterpret_cast<void**>(&add_batch_streaming_impl_fn_));
+  return Status::OK();
+}
+
+// Instantiate required templates.
+template Status GroupingAggregator::AppendSpilledRow<false>(Partition*, 
TupleRow*);
+template Status GroupingAggregator::AppendSpilledRow<true>(Partition*, 
TupleRow*);
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/grouping-aggregator.h
----------------------------------------------------------------------
diff --git a/be/src/exec/grouping-aggregator.h 
b/be/src/exec/grouping-aggregator.h
new file mode 100644
index 0000000..0d1b893
--- /dev/null
+++ b/be/src/exec/grouping-aggregator.h
@@ -0,0 +1,624 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_GROUPING_AGGREGATOR_H
+#define IMPALA_EXEC_GROUPING_AGGREGATOR_H
+
+#include <deque>
+#include <memory>
+#include <vector>
+
+#include "exec/aggregator.h"
+#include "exec/hash-table.h"
+#include "runtime/buffered-tuple-stream.h"
+#include "runtime/bufferpool/suballocator.h"
+#include "runtime/descriptors.h"
+#include "runtime/mem-pool.h"
+#include "runtime/reservation-manager.h"
+
+namespace impala {
+
+class AggFnEvaluator;
+class LlvmCodeGen;
+class RowBatch;
+class RuntimeState;
+class Tuple;
+
+/// Aggregator for doing grouping aggregations. Input is passed to the 
aggregator through
+/// AddBatch(), or AddBatchStreaming() if this is a pre-agg. Then:
+///  1. Each row is hashed and we pick a dst partition (hash_partitions_).
+///  2. If the dst partition is not spilled, we probe into the partitions hash 
table
+///  to aggregate/insert the row.
+///  3. If the partition is already spilled, the input row is spilled.
+///  4. When all the input is consumed, we walk hash_partitions_, put the 
spilled ones
+///  into spilled_partitions_ and the non-spilled ones into 
aggregated_partitions_.
+///  aggregated_partitions_ contain partitions that are fully processed and 
the result
+///  can just be returned. Partitions in spilled_partitions_ need to be 
repartitioned
+///  and we just repeat these steps.
+//
+/// Each partition contains these structures:
+/// 1) Hash Table for aggregated rows. This contains just the hash table 
directory
+///    structure but not the rows themselves. This is NULL for spilled 
partitions when
+///    we stop maintaining the hash table.
+/// 2) MemPool for var-len result data for rows in the hash table. If the 
aggregate
+///    function returns a string, we cannot append it to the tuple stream as 
that
+///    structure is immutable. Instead, when we need to spill, we sweep and 
copy the
+///    rows into a tuple stream.
+/// 3) Aggregated tuple stream for rows that are/were in the hash table. This 
stream
+///    contains rows that are aggregated. When the partition is not spilled, 
this stream
+///    is pinned and contains the memory referenced by the hash table.
+///    In the case where the aggregate function does not return a string 
(meaning the
+///    size of all the slots is known when the row is constructed), this 
stream contains
+///    all the memory for the result rows and the MemPool (2) is not used.
+/// 4) Unaggregated tuple stream. Stream to spill unaggregated rows.
+///    Rows in this stream always have child(0)'s layout.
+///
+/// Buffering: Each stream and hash table needs to maintain at least one 
buffer when
+/// it is being read or written. The streams for a given agg use a uniform 
buffer size,
+/// except when processing rows larger than that buffer size. In that case, 
the agg uses
+/// BufferedTupleStream's variable buffer size support to handle larger rows 
up to the
+/// maximum row size. Only two max-sized buffers are needed for the agg to 
spill: one
+/// to hold rows being read from a spilled input stream and another for a 
temporary write
+/// buffer when adding a row to an output stream.
+///
+/// Two-phase aggregation: we support two-phase distributed aggregations, where
+/// pre-aggregrations attempt to reduce the size of data before shuffling data 
across the
+/// network to be merged by the merge aggregation node. This aggregator 
supports a
+/// streaming mode for pre-aggregations where it maintains a hash table of 
aggregated
+/// rows, but can pass through unaggregated rows (after transforming them into 
the
+/// same tuple format as aggregated rows) when a heuristic determines that it 
is better
+/// to send rows across the network instead of consuming additional memory and 
CPU
+/// resources to expand its hash table. The planner decides whether a given
+/// pre-aggregation should use the streaming preaggregation algorithm or the 
same
+/// blocking aggregation algorithm as used in merge aggregations.
+/// TODO: make this less of a heuristic by factoring in the cost of the 
exchange vs the
+/// cost of the pre-aggregation.
+///
+/// Handling memory pressure: the node uses two different strategies for 
responding to
+/// memory pressure, depending on whether it is a streaming pre-aggregation or 
not. If
+/// the node is a streaming preaggregation, it stops growing its hash table 
further by
+/// converting unaggregated rows into the aggregated tuple format and passing 
them
+/// through. If the node is not a streaming pre-aggregation, it responds to 
memory
+/// pressure by spilling partitions to disk.
+///
+/// TODO: Buffer rows before probing into the hash table?
+/// TODO: After spilling, we can still maintain a very small hash table just 
to remove
+/// some number of rows (from likely going to disk).
+/// TODO: Consider allowing to spill the hash table structure in addition to 
the rows.
+/// TODO: Do we want to insert a buffer before probing into the partition's 
hash table?
+/// TODO: Use a prefetch/batched probe interface.
+/// TODO: Return rows from the aggregated_row_stream rather than the HT.
+/// TODO: Think about spilling heuristic.
+/// TODO: When processing a spilled partition, we have a lot more information 
and can
+/// size the partitions/hash tables better.
+/// TODO: Start with unpartitioned (single partition) and switch to 
partitioning and
+/// spilling only if the size gets large, say larger than the LLC.
+/// TODO: Simplify or cleanup the various uses of agg_fn_ctx, agg_fn_ctx_, and 
ctx.
+/// There are so many contexts in use that a plain "ctx" variable should never 
be used.
+/// Likewise, it's easy to mixup the agg fn ctxs, there should be a way to 
simplify this.
+/// TODO: support an Init() method with an initial value in the UDAF interface.
+class GroupingAggregator : public Aggregator {
+ public:
+  GroupingAggregator(ExecNode* exec_node, ObjectPool* pool, const TPlanNode& 
tnode,
+      const DescriptorTbl& descs);
+
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status Prepare(RuntimeState* state) override;
+  virtual void Codegen(RuntimeState* state) override;
+  virtual Status Open(RuntimeState* state) override;
+  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) 
override;
+  virtual Status Reset(RuntimeState* state) override;
+  virtual void Close(RuntimeState* state) override;
+
+  virtual Status AddBatch(RuntimeState* state, RowBatch* batch) override;
+  /// Used to insert input rows if this is a streaming pre-agg. Tries to 
aggregate all of
+  /// the rows of 'child_batch', but if there isn't enough memory available 
rows will be
+  /// streamed through and returned in 'out_batch'. AddBatch() and 
AddBatchStreaming()
+  /// should not be called on the same GroupingAggregator.
+  Status AddBatchStreaming(
+      RuntimeState* state, RowBatch* out_batch, RowBatch* child_batch);
+  virtual Status InputDone() override WARN_UNUSED_RESULT;
+
+  virtual int num_grouping_exprs() override { return grouping_exprs_.size(); }
+
+  virtual void SetDebugOptions(const TDebugOptions& debug_options) override;
+
+  virtual std::string DebugString(int indentation_level = 0) const override;
+  virtual void DebugString(int indentation_level, std::stringstream* out) 
const override;
+
+ private:
+  struct Partition;
+
+  /// Number of initial partitions to create. Must be a power of 2.
+  static const int PARTITION_FANOUT = 16;
+
+  /// Needs to be the log(PARTITION_FANOUT).
+  /// We use the upper bits to pick the partition and lower bits in the HT.
+  /// TODO: different hash functions here too? We don't need that many bits to 
pick
+  /// the partition so this might be okay.
+  static const int NUM_PARTITIONING_BITS = 4;
+
+  /// Maximum number of times we will repartition. The maximum build table we 
can process
+  /// (if we have enough scratch disk space) in case there is no skew is:
+  ///  MEM_LIMIT * (PARTITION_FANOUT ^ MAX_PARTITION_DEPTH).
+  /// In the case where there is skew, repartitioning is unlikely to help 
(assuming a
+  /// reasonable hash function).
+  /// Note that we need to have at least as many SEED_PRIMES in HashTableCtx.
+  /// TODO: we can revisit and try harder to explicitly detect skew.
+  static const int MAX_PARTITION_DEPTH = 16;
+
+  /// Default initial number of buckets in a hash table.
+  /// TODO: rethink this ?
+  static const int64_t PAGG_DEFAULT_HASH_TABLE_SZ = 1024;
+
+  /// Codegen doesn't allow for automatic Status variables because then 
exception
+  /// handling code is needed to destruct the Status, and our function call 
substitution
+  /// doesn't know how to deal with the LLVM IR 'invoke' instruction. 
Workaround that by
+  /// placing the Status here so exceptions won't need to destruct it.
+  /// TODO: fix IMPALA-1948 and remove this.
+  Status add_batch_status_;
+
+  /// Row with the intermediate tuple as its only tuple.
+  /// Construct a new row desc for preparing the build exprs because neither 
the child's
+  /// nor this node's output row desc may contain the intermediate tuple, e.g.,
+  /// in a single-node plan with an intermediate tuple different from the 
output tuple.
+  /// Lives in the query state's obj_pool.
+  RowDescriptor intermediate_row_desc_;
+
+  /// True if this is first phase of a two-phase distributed aggregation for 
which we
+  /// are doing a streaming preaggregation.
+  const bool is_streaming_preagg_;
+
+  /// True if any of the evaluators require the serialize step.
+  bool needs_serialize_;
+
+  /// Exprs used to evaluate input rows
+  std::vector<ScalarExpr*> grouping_exprs_;
+
+  /// Exprs used to insert constructed aggregation tuple into the hash table.
+  /// All the exprs are simply SlotRefs for the intermediate tuple.
+  std::vector<ScalarExpr*> build_exprs_;
+
+  /// Indices of grouping exprs with var-len string types in grouping_exprs_.
+  /// We need to do more work for var-len expressions when allocating and 
spilling rows.
+  /// All var-len grouping exprs have type string.
+  std::vector<int> string_grouping_exprs_;
+
+  RuntimeState* state_;
+
+  /// Allocator for hash table memory.
+  std::unique_ptr<Suballocator> ht_allocator_;
+
+  /// MemPool used to allocate memory during Close() when creating new output 
tuples. The
+  /// pool should not be Reset() to allow amortizing memory allocation over a 
series of
+  /// Reset()/Open()/GetNext()* calls.
+  std::unique_ptr<MemPool> tuple_pool_;
+
+  /// The current partition and iterator to the next row in its hash table 
that we need
+  /// to return in GetNext()
+  Partition* output_partition_;
+  HashTable::Iterator output_iterator_;
+
+  /// Resource information sent from the frontend.
+  const TBackendResourceProfile resource_profile_;
+
+  ReservationManager reservation_manager_;
+  BufferPool::ClientHandle* buffer_pool_client();
+
+  /// The number of rows that have been passed to AddBatch() or 
AddBatchStreaming().
+  int64_t num_input_rows_;
+
+  /// True if this aggregator is being executed in a subplan.
+  const bool is_in_subplan_;
+
+  int64_t limit_; // -1: no limit
+  bool ReachedLimit() { return limit_ != -1 && num_rows_returned_ >= limit_; }
+
+  typedef Status (*AddBatchImplFn)(
+      GroupingAggregator*, RowBatch*, TPrefetchMode::type, HashTableCtx*);
+  /// Jitted AddBatchImpl function pointer. Null if codegen is disabled.
+  AddBatchImplFn add_batch_impl_fn_;
+
+  typedef Status (*AddBatchStreamingImplFn)(GroupingAggregator*, bool,
+      TPrefetchMode::type, RowBatch*, RowBatch*, HashTableCtx*, 
int[PARTITION_FANOUT]);
+  /// Jitted AddBatchStreamingImpl function pointer.  Null if codegen is 
disabled.
+  AddBatchStreamingImplFn add_batch_streaming_impl_fn_;
+
+  /// Total time spent resizing hash tables.
+  RuntimeProfile::Counter* ht_resize_timer_;
+
+  /// Time spent returning the aggregated rows
+  RuntimeProfile::Counter* get_results_timer_;
+
+  /// Total number of hash buckets across all partitions.
+  RuntimeProfile::Counter* num_hash_buckets_;
+
+  /// Total number of partitions created.
+  RuntimeProfile::Counter* partitions_created_;
+
+  /// Level of max partition (i.e. number of repartitioning steps).
+  RuntimeProfile::HighWaterMarkCounter* max_partition_level_;
+
+  /// Number of rows that have been repartitioned.
+  RuntimeProfile::Counter* num_row_repartitioned_;
+
+  /// Number of partitions that have been repartitioned.
+  RuntimeProfile::Counter* num_repartitions_;
+
+  /// Number of partitions that have been spilled.
+  RuntimeProfile::Counter* num_spilled_partitions_;
+
+  /// The largest fraction after repartitioning. This is expected to be
+  /// 1 / PARTITION_FANOUT. A value much larger indicates skew.
+  RuntimeProfile::HighWaterMarkCounter* largest_partition_percent_;
+
+  /// Time spent in streaming preagg algorithm.
+  RuntimeProfile::Counter* streaming_timer_;
+
+  /// The number of rows passed through without aggregation.
+  RuntimeProfile::Counter* num_passthrough_rows_;
+
+  /// The estimated reduction of the preaggregation.
+  RuntimeProfile::Counter* preagg_estimated_reduction_;
+
+  /// Expose the minimum reduction factor to continue growing the hash tables.
+  RuntimeProfile::Counter* preagg_streaming_ht_min_reduction_;
+
+  /// The estimated number of input rows from the planner.
+  int64_t estimated_input_cardinality_;
+
+  TDebugOptions debug_options_;
+
+  /////////////////////////////////////////
+  /// BEGIN: Members that must be Reset()
+
+  /// If true, no more rows to output from partitions.
+  bool partition_eos_;
+
+  /// Used for hash-related functionality, such as evaluating rows and 
calculating hashes.
+  /// It also owns the evaluators for the grouping and build expressions used 
during hash
+  /// table insertion and probing.
+  boost::scoped_ptr<HashTableCtx> ht_ctx_;
+
+  /// Object pool that holds the Partition objects in hash_partitions_.
+  std::unique_ptr<ObjectPool> partition_pool_;
+
+  /// Current partitions we are partitioning into. IMPALA-5788: For the case 
where we
+  /// rebuild a spilled partition that fits in memory, all pointers in this 
vector will
+  /// point to a single in-memory partition.
+  std::vector<Partition*> hash_partitions_;
+
+  /// Cache for hash tables in 'hash_partitions_'. IMPALA-5788: For the case 
where we
+  /// rebuild a spilled partition that fits in memory, all pointers in this 
array will
+  /// point to the hash table that is a part of a single in-memory partition.
+  HashTable* hash_tbls_[PARTITION_FANOUT];
+
+  /// All partitions that have been spilled and need further processing.
+  std::deque<Partition*> spilled_partitions_;
+
+  /// All partitions that are aggregated and can just return the results in 
GetNext().
+  /// After consuming all the input, hash_partitions_ is split into 
spilled_partitions_
+  /// and aggregated_partitions_, depending on if it was spilled or not.
+  std::deque<Partition*> aggregated_partitions_;
+
+  /// END: Members that must be Reset()
+  /////////////////////////////////////////
+
+  /// The hash table and streams (aggregated and unaggregated) for an 
individual
+  /// partition. The streams of each partition always (i.e. regardless of 
level)
+  /// initially use small buffers. Streaming pre-aggregations do not spill and 
do not
+  /// require an unaggregated stream.
+  struct Partition {
+    Partition(GroupingAggregator* parent, int level, int idx)
+      : parent(parent), is_closed(false), level(level), idx(idx) {}
+
+    ~Partition();
+
+    /// Initializes aggregated_row_stream and unaggregated_row_stream (if a 
spilling
+    /// aggregation), allocating one buffer for each. Spilling merge 
aggregations must
+    /// have enough reservation for the initial buffer for the stream, so this 
should
+    /// not fail due to OOM. Preaggregations do not reserve any buffers: if 
does not
+    /// have enough reservation for the initial buffer, the aggregated row 
stream is not
+    /// created and an OK status is returned.
+    Status InitStreams() WARN_UNUSED_RESULT;
+
+    /// Initializes the hash table. 'aggregated_row_stream' must be non-NULL.
+    /// Sets 'got_memory' to true if the hash table was initialised or false 
on OOM.
+    Status InitHashTable(bool* got_memory) WARN_UNUSED_RESULT;
+
+    /// Called in case we need to serialize aggregated rows. This step 
effectively does
+    /// a merge aggregation in this aggregator.
+    Status SerializeStreamForSpilling() WARN_UNUSED_RESULT;
+
+    /// Closes this partition. If finalize_rows is true, this iterates over 
all rows
+    /// in aggregated_row_stream and finalizes them (this is only used in the 
cancellation
+    /// path).
+    void Close(bool finalize_rows);
+
+    /// Spill this partition. 'more_aggregate_rows' = true means that more 
aggregate rows
+    /// may be appended to the the partition before appending unaggregated 
rows. On
+    /// success, one of the streams is left with a write iterator: the 
aggregated stream
+    /// if 'more_aggregate_rows' is true or the unaggregated stream otherwise.
+    Status Spill(bool more_aggregate_rows) WARN_UNUSED_RESULT;
+
+    bool is_spilled() const { return hash_tbl.get() == nullptr; }
+
+    GroupingAggregator* parent;
+
+    /// If true, this partition is closed and there is nothing left to do.
+    bool is_closed;
+
+    /// How many times rows in this partition have been repartitioned. 
Partitions created
+    /// from the aggregator's input is level 0, 1 after the first 
repartitionining, etc.
+    const int level;
+
+    /// The index of this partition within 'hash_partitions_' at its level.
+    const int idx;
+
+    /// Hash table for this partition.
+    /// Can be NULL if this partition is no longer maintaining a hash table 
(i.e.
+    /// is spilled or we are passing through all rows for this partition).
+    std::unique_ptr<HashTable> hash_tbl;
+
+    /// Clone of parent's agg_fn_evals_. Permanent allocations come from
+    /// 'agg_fn_perm_pool' and result allocations come from 
'expr_results_pool_'.
+    std::vector<AggFnEvaluator*> agg_fn_evals;
+
+    /// Pool for permanent allocations for this partition's 'agg_fn_evals'. 
Freed at the
+    /// same times as 'agg_fn_evals' are closed: either when the partition is 
closed or
+    /// when it is spilled.
+    std::unique_ptr<MemPool> agg_fn_perm_pool;
+
+    /// Tuple stream used to store aggregated rows. When the partition is not 
spilled,
+    /// (meaning the hash table is maintained), this stream is pinned and 
contains the
+    /// memory referenced by the hash table. When it is spilled, this consumes 
reservation
+    /// for a write buffer only during repartitioning of aggregated rows.
+    ///
+    /// For streaming preaggs, this may be NULL if sufficient memory is not 
available.
+    /// In that case hash_tbl is also NULL and all rows for the partition will 
be passed
+    /// through.
+    std::unique_ptr<BufferedTupleStream> aggregated_row_stream;
+
+    /// Unaggregated rows that are spilled. Always NULL for streaming 
pre-aggregations.
+    /// Always unpinned. Has a write buffer allocated when the partition is 
spilled and
+    /// unaggregated rows are being processed.
+    std::unique_ptr<BufferedTupleStream> unaggregated_row_stream;
+  };
+
+  /// Stream used to store serialized spilled rows. Only used if 
needs_serialize_
+  /// is set. This stream is never pinned and only used in Partition::Spill as 
a
+  /// a temporary buffer.
+  std::unique_ptr<BufferedTupleStream> serialize_stream_;
+
+  /// Accessor for 'hash_tbls_' that verifies consistency with the partitions.
+  HashTable* ALWAYS_INLINE GetHashTable(int partition_idx) {
+    HashTable* ht = hash_tbls_[partition_idx];
+    DCHECK_EQ(ht, hash_partitions_[partition_idx]->hash_tbl.get());
+    return ht;
+  }
+
+  /// Copies grouping values stored in 'ht_ctx_' that were computed over 
'current_row_'
+  /// using 'grouping_expr_evals_'. Aggregation expr slots are set to their 
initial
+  /// values. Returns NULL if there was not enough memory to allocate the 
tuple or errors
+  /// occurred. In which case, 'status' is set. Allocates tuple and var-len 
data for
+  /// grouping exprs from stream. Var-len data for aggregate exprs is 
allocated from the
+  /// FunctionContexts, so is stored outside the stream. If stream's small 
buffers get
+  /// full, it will attempt to switch to IO-buffers.
+  Tuple* ConstructIntermediateTuple(const std::vector<AggFnEvaluator*>& 
agg_fn_evals,
+      BufferedTupleStream* stream, Status* status) noexcept;
+
+  /// Constructs intermediate tuple, allocating memory from pool instead of 
the stream.
+  /// Returns NULL and sets status if there is not enough memory to allocate 
the tuple.
+  Tuple* ConstructIntermediateTuple(const std::vector<AggFnEvaluator*>& 
agg_fn_evals,
+      MemPool* pool, Status* status) noexcept;
+
+  /// Returns the number of bytes of variable-length data for the grouping 
values stored
+  /// in 'ht_ctx_'.
+  int GroupingExprsVarlenSize();
+
+  /// Initializes intermediate tuple by copying grouping values stored in 
'ht_ctx_' that
+  /// that were computed over 'current_row_' using 'grouping_expr_evals_'. 
Writes the
+  /// var-len data into buffer. 'buffer' points to the start of a buffer of at 
least the
+  /// size of the variable-length data: 'varlen_size'.
+  void CopyGroupingValues(Tuple* intermediate_tuple, uint8_t* buffer, int 
varlen_size);
+
+  /// Processes a batch of rows. This is the core function of the algorithm. 
We partition
+  /// the rows into hash_partitions_, spilling as necessary.
+  /// If AGGREGATED_ROWS is true, it means that the rows in the batch are 
already
+  /// pre-aggregated.
+  /// 'prefetch_mode' specifies the prefetching mode in use. If it's not 
PREFETCH_NONE,
+  ///     hash table buckets will be prefetched based on the hash values 
computed. Note
+  ///     that 'prefetch_mode' will be substituted with constants during 
codegen time.
+  //
+  /// This function is replaced by codegen. We pass in ht_ctx_.get() as an 
argument for
+  /// performance.
+  template <bool AGGREGATED_ROWS>
+  Status IR_ALWAYS_INLINE AddBatchImpl(RowBatch* batch, TPrefetchMode::type 
prefetch_mode,
+      HashTableCtx* ht_ctx) WARN_UNUSED_RESULT;
+
+  /// Evaluates the rows in 'batch' starting at 'start_row_idx' and stores the 
results in
+  /// the expression values cache in 'ht_ctx'. The number of rows evaluated 
depends on
+  /// the capacity of the cache. 'prefetch_mode' specifies the prefetching 
mode in use.
+  /// If it's not PREFETCH_NONE, hash table buckets for the computed hashes 
will be
+  /// prefetched. Note that codegen replaces 'prefetch_mode' with a constant.
+  template <bool AGGREGATED_ROWS>
+  void EvalAndHashPrefetchGroup(RowBatch* batch, int start_row_idx,
+      TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx);
+
+  /// This function processes each individual row in AddBatchImpl(). Must be 
inlined into
+  /// AddBatchImpl for codegen to substitute function calls with codegen'd 
versions.
+  /// May spill partitions if not enough memory is available.
+  template <bool AGGREGATED_ROWS>
+  Status IR_ALWAYS_INLINE ProcessRow(
+      TupleRow* row, HashTableCtx* ht_ctx) WARN_UNUSED_RESULT;
+
+  /// Create a new intermediate tuple in partition, initialized with row. 
ht_ctx is
+  /// the context for the partition's hash table and hash is the precomputed 
hash of
+  /// the row. The row can be an unaggregated or aggregated row depending on
+  /// AGGREGATED_ROWS. Spills partitions if necessary to append the new 
intermediate
+  /// tuple to the partition's stream. Must be inlined into AddBatchImpl for 
codegen
+  /// to substitute function calls with codegen'd versions.  insert_it is an 
iterator
+  /// for insertion returned from HashTable::FindBuildRowBucket().
+  template <bool AGGREGATED_ROWS>
+  Status IR_ALWAYS_INLINE AddIntermediateTuple(Partition* partition, TupleRow* 
row,
+      uint32_t hash, HashTable::Iterator insert_it) WARN_UNUSED_RESULT;
+
+  /// Append a row to a spilled partition. The row may be aggregated or 
unaggregated
+  /// according to AGGREGATED_ROWS. May spill partitions if needed to append 
the row
+  /// buffers.
+  template <bool AGGREGATED_ROWS>
+  Status IR_ALWAYS_INLINE AppendSpilledRow(
+      Partition* partition, TupleRow* row) WARN_UNUSED_RESULT;
+
+  /// Reads all the rows from input_stream and process them by calling 
AddBatchImpl().
+  template <bool AGGREGATED_ROWS>
+  Status ProcessStream(BufferedTupleStream* input_stream) WARN_UNUSED_RESULT;
+
+  /// Get rows for the next rowbatch from the next partition. Sets 
'partition_eos_' to
+  /// true if all rows from all partitions have been returned or the limit is 
reached.
+  Status GetRowsFromPartition(
+      RuntimeState* state, RowBatch* row_batch) WARN_UNUSED_RESULT;
+
+  /// Return true if we should keep expanding hash tables in the preagg. If 
false,
+  /// the preagg should pass through any rows it can't fit in its tables.
+  bool ShouldExpandPreaggHashTables() const;
+
+  /// Streaming processing of in_batch from child. Rows from child are either 
aggregated
+  /// into the hash table or added to 'out_batch' in the intermediate tuple 
format.
+  /// 'in_batch' is processed entirely, and 'out_batch' must have enough 
capacity to
+  /// store all of the rows in 'in_batch'.
+  /// 'needs_serialize' is an argument so that codegen can replace it with a 
constant,
+  ///     rather than using the member variable 'needs_serialize_'.
+  /// 'prefetch_mode' specifies the prefetching mode in use. If it's not 
PREFETCH_NONE,
+  ///     hash table buckets will be prefetched based on the hash values 
computed. Note
+  ///     that 'prefetch_mode' will be substituted with constants during 
codegen time.
+  /// 'remaining_capacity' is an array with PARTITION_FANOUT entries with the 
number of
+  ///     additional rows that can be added to the hash table per partition. 
It is updated
+  ///     by AddBatchStreamingImpl() when it inserts new rows.
+  /// 'ht_ctx' is passed in as a way to avoid aliasing of 'this' confusing the 
optimiser.
+  Status AddBatchStreamingImpl(bool needs_serialize, TPrefetchMode::type 
prefetch_mode,
+      RowBatch* in_batch, RowBatch* out_batch, HashTableCtx* ht_ctx,
+      int remaining_capacity[PARTITION_FANOUT]) WARN_UNUSED_RESULT;
+
+  /// Tries to add intermediate to the hash table 'hash_tbl' of 'partition' 
for streaming
+  /// aggregation. The input row must have been evaluated with 'ht_ctx', with 
'hash' set
+  /// to the corresponding hash. If the tuple already exists in the hash 
table, update
+  /// the tuple and return true. Otherwise try to create a new entry in the 
hash table,
+  /// returning true if successful or false if the table is full. 
'remaining_capacity'
+  /// keeps track of how many more entries can be added to the hash table so 
we can avoid
+  /// retrying inserts. It is decremented if an insert succeeds and set to 
zero if an
+  /// insert fails. If an error occurs, returns false and sets 'status'.
+  bool IR_ALWAYS_INLINE TryAddToHashTable(HashTableCtx* ht_ctx, Partition* 
partition,
+      HashTable* hash_tbl, TupleRow* in_row, uint32_t hash, int* 
remaining_capacity,
+      Status* status) WARN_UNUSED_RESULT;
+
+  /// Initializes hash_partitions_. 'level' is the level for the partitions to 
create.
+  /// If 'single_partition_idx' is provided, it must be a number in range
+  /// [0, PARTITION_FANOUT), and only that partition is created - all others 
point to it.
+  /// Also sets ht_ctx_'s level to 'level'.
+  Status CreateHashPartitions(
+      int level, int single_partition_idx = -1) WARN_UNUSED_RESULT;
+
+  /// Ensure that hash tables for all in-memory partitions are large enough to 
fit
+  /// 'num_rows' additional hash table entries. If there is not enough memory 
to
+  /// resize the hash tables, may spill partitions. 'aggregated_rows' is true 
if
+  /// we're currently partitioning aggregated rows.
+  Status CheckAndResizeHashPartitions(
+      bool aggregated_rows, int num_rows, const HashTableCtx* ht_ctx) 
WARN_UNUSED_RESULT;
+
+  /// Prepares the next partition to return results from. On return, this 
function
+  /// initializes output_iterator_ and output_partition_. This either removes
+  /// a partition from aggregated_partitions_ (and is done) or removes the next
+  /// partition from aggregated_partitions_ and repartitions it.
+  Status NextPartition() WARN_UNUSED_RESULT;
+
+  /// Tries to build the first partition in 'spilled_partitions_'.
+  /// If successful, set *built_partition to the partition. The caller owns 
the partition
+  /// and is responsible for closing it. If unsuccessful because the partition 
could not
+  /// fit in memory, set *built_partition to NULL and append the spilled 
partition to the
+  /// head of 'spilled_partitions_' so it can be processed by
+  /// RepartitionSpilledPartition().
+  Status BuildSpilledPartition(Partition** built_partition) WARN_UNUSED_RESULT;
+
+  /// Repartitions the first partition in 'spilled_partitions_' into 
PARTITION_FANOUT
+  /// output partitions. On success, each output partition is either:
+  /// * closed, if no rows were added to the partition.
+  /// * in 'spilled_partitions_', if the partition spilled.
+  /// * in 'aggregated_partitions_', if the output partition was not spilled.
+  Status RepartitionSpilledPartition() WARN_UNUSED_RESULT;
+
+  /// Picks a partition from 'hash_partitions_' to spill. 
'more_aggregate_rows' is passed
+  /// to Partition::Spill() when spilling the partition. See the 
Partition::Spill()
+  /// comment for further explanation.
+  Status SpillPartition(bool more_aggregate_rows) WARN_UNUSED_RESULT;
+
+  /// Moves the partitions in hash_partitions_ to aggregated_partitions_ or
+  /// spilled_partitions_. Partitions moved to spilled_partitions_ are 
unpinned.
+  /// input_rows is the number of input rows that have been repartitioned.
+  /// Used for diagnostics.
+  Status MoveHashPartitions(int64_t input_rows) WARN_UNUSED_RESULT;
+
+  /// Adds a partition to the front of 'spilled_partitions_' for later 
processing.
+  /// 'spilled_partitions_' uses LIFO so more finely partitioned partitions 
are processed
+  /// first). This allows us to delete pages earlier and bottom out the 
recursion
+  /// earlier and also improves time locality of access to spilled data on 
disk.
+  void PushSpilledPartition(Partition* partition);
+
+  /// Calls Close() on every Partition in 'aggregated_partitions_',
+  /// 'spilled_partitions_', and 'hash_partitions_' and then resets the lists,
+  /// the vector and the partition pool.
+  void ClosePartitions();
+
+  /// Calls finalizes on all tuples starting at 'it'.
+  void CleanupHashTbl(
+      const std::vector<AggFnEvaluator*>& agg_fn_evals, HashTable::Iterator 
it);
+
+  /// Clears 'expr_results_pool_' and returns the result of 
state->CheckQueryState().
+  /// Aggregators should call this periodically, e.g. once per input row 
batch. This
+  /// should not be called outside the main execution thread.
+  /// TODO: IMPALA-2399: replace QueryMaintenance() - see JIRA for more 
details.
+  Status QueryMaintenance(RuntimeState* state) WARN_UNUSED_RESULT;
+
+  /// Codegen the non-streaming add row batch loop. The loop has already been 
compiled to
+  /// IR and loaded into the codegen object. UpdateAggTuple has also been 
codegen'd to IR.
+  /// This function will modify the loop subsituting the statically compiled 
functions
+  /// with codegen'd ones. 'add_batch_impl_fn_' will be updated with the 
codegened
+  // function.
+  /// Assumes AGGREGATED_ROWS = false.
+  Status CodegenAddBatchImpl(
+      LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) 
WARN_UNUSED_RESULT;
+
+  /// Codegen the materialization loop for streaming preaggregations.
+  /// 'add_batch_streaming_impl_fn_' will be updated with the codegened 
function.
+  Status CodegenAddBatchStreamingImpl(
+      LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) 
WARN_UNUSED_RESULT;
+
+  /// Compute minimum buffer reservation for grouping aggregations.
+  /// We need one buffer per partition, which is used either as the write 
buffer for the
+  /// aggregated stream or the unaggregated stream. We need an additional 
buffer to read
+  /// the stream we are currently repartitioning. The read buffer needs to be 
a max-sized
+  /// buffer to hold a max-sized row and we need one max-sized write buffer 
that is used
+  /// temporarily to append a row to any stream.
+  ///
+  /// If we need to serialize, we need an additional buffer while spilling a 
partition
+  /// as the partitions aggregate stream needs to be serialized and rewritten.
+  /// We do not spill streaming preaggregations, so we do not need to reserve 
any buffers.
+  int64_t MinReservation() const;
+};
+} // namespace impala
+
+#endif // IMPALA_EXEC_GROUPING_AGGREGATOR_H

http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/non-grouping-aggregator-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/non-grouping-aggregator-ir.cc 
b/be/src/exec/non-grouping-aggregator-ir.cc
new file mode 100644
index 0000000..335cbc7
--- /dev/null
+++ b/be/src/exec/non-grouping-aggregator-ir.cc
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/non-grouping-aggregator.h"
+
+#include "runtime/row-batch.h"
+
+using namespace impala;
+
+Status NonGroupingAggregator::AddBatchImpl(RowBatch* batch) {
+  Tuple* output_tuple = singleton_output_tuple_;
+  FOREACH_ROW(batch, 0, batch_iter) {
+    UpdateTuple(agg_fn_evals_.data(), output_tuple, batch_iter.Get());
+  }
+  return Status::OK();
+}

Reply via email to