Repository: impala
Updated Branches:
  refs/heads/master dde930830 -> e07fbc1b6


http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h 
b/be/src/exec/partitioned-aggregation-node.h
deleted file mode 100644
index c522c1b..0000000
--- a/be/src/exec/partitioned-aggregation-node.h
+++ /dev/null
@@ -1,734 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_EXEC_PARTITIONED_AGGREGATION_NODE_H
-#define IMPALA_EXEC_PARTITIONED_AGGREGATION_NODE_H
-
-#include <deque>
-
-#include <boost/scoped_ptr.hpp>
-
-#include "exec/exec-node.h"
-#include "exec/hash-table.h"
-#include "runtime/buffered-tuple-stream.h"
-#include "runtime/bufferpool/suballocator.h"
-#include "runtime/descriptors.h" // for TupleId
-#include "runtime/mem-pool.h"
-#include "runtime/string-value.h"
-
-namespace llvm {
-class BasicBlock;
-class Function;
-class Value;
-}
-
-namespace impala {
-
-class AggFn;
-class AggFnEvaluator;
-class CodegenAnyVal;
-class LlvmCodeGen;
-class LlvmBuilder;
-class RowBatch;
-class RuntimeState;
-struct StringValue;
-class Tuple;
-class TupleDescriptor;
-class SlotDescriptor;
-
-/// Node for doing partitioned hash aggregation.
-/// This node consumes the input (which can be from the child(0) or a spilled 
partition).
-///  1. Each row is hashed and we pick a dst partition (hash_partitions_).
-///  2. If the dst partition is not spilled, we probe into the partitions hash 
table
-///  to aggregate/insert the row.
-///  3. If the partition is already spilled, the input row is spilled.
-///  4. When all the input is consumed, we walk hash_partitions_, put the 
spilled ones
-///  into spilled_partitions_ and the non-spilled ones into 
aggregated_partitions_.
-///  aggregated_partitions_ contain partitions that are fully processed and 
the result
-///  can just be returned. Partitions in spilled_partitions_ need to be 
repartitioned
-///  and we just repeat these steps.
-//
-/// Each partition contains these structures:
-/// 1) Hash Table for aggregated rows. This contains just the hash table 
directory
-///    structure but not the rows themselves. This is NULL for spilled 
partitions when
-///    we stop maintaining the hash table.
-/// 2) MemPool for var-len result data for rows in the hash table. If the 
aggregate
-///    function returns a string, we cannot append it to the tuple stream as 
that
-///    structure is immutable. Instead, when we need to spill, we sweep and 
copy the
-///    rows into a tuple stream.
-/// 3) Aggregated tuple stream for rows that are/were in the hash table. This 
stream
-///    contains rows that are aggregated. When the partition is not spilled, 
this stream
-///    is pinned and contains the memory referenced by the hash table.
-///    In the case where the aggregate function does not return a string 
(meaning the
-///    size of all the slots is known when the row is constructed), this 
stream contains
-///    all the memory for the result rows and the MemPool (2) is not used.
-/// 4) Unaggregated tuple stream. Stream to spill unaggregated rows.
-///    Rows in this stream always have child(0)'s layout.
-///
-/// Buffering: Each stream and hash table needs to maintain at least one 
buffer when
-/// it is being read or written. The streams for a given agg use a uniform 
buffer size,
-/// except when processing rows larger than that buffer size. In that case, 
the agg uses
-/// BufferedTupleStream's variable buffer size support to handle larger rows 
up to the
-/// maximum row size. Only two max-sized buffers are needed for the agg to 
spill: one
-/// to hold rows being read from a spilled input stream and another for a 
temporary write
-/// buffer when adding a row to an output stream.
-///
-/// Two-phase aggregation: we support two-phase distributed aggregations, where
-/// pre-aggregrations attempt to reduce the size of data before shuffling data 
across the
-/// network to be merged by the merge aggregation node. This exec node 
supports a
-/// streaming mode for pre-aggregations where it maintains a hash table of 
aggregated
-/// rows, but can pass through unaggregated rows (after transforming them into 
the
-/// same tuple format as aggregated rows) when a heuristic determines that it 
is better
-/// to send rows across the network instead of consuming additional memory and 
CPU
-/// resources to expand its hash table. The planner decides whether a given
-/// pre-aggregation should use the streaming preaggregation algorithm or the 
same
-/// blocking aggregation algorithm as used in merge aggregations.
-/// TODO: make this less of a heuristic by factoring in the cost of the 
exchange vs the
-/// cost of the pre-aggregation.
-///
-/// If there are no grouping expressions, there is only a single output row 
for both
-/// preaggregations and merge aggregations. This case is handled separately to 
avoid
-/// building hash tables. There is also no need to do streaming 
preaggregations.
-///
-/// Handling memory pressure: the node uses two different strategies for 
responding to
-/// memory pressure, depending on whether it is a streaming pre-aggregation or 
not. If
-/// the node is a streaming preaggregation, it stops growing its hash table 
further by
-/// converting unaggregated rows into the aggregated tuple format and passing 
them
-/// through. If the node is not a streaming pre-aggregation, it responds to 
memory
-/// pressure by spilling partitions to disk.
-///
-/// TODO: Buffer rows before probing into the hash table?
-/// TODO: After spilling, we can still maintain a very small hash table just 
to remove
-/// some number of rows (from likely going to disk).
-/// TODO: Consider allowing to spill the hash table structure in addition to 
the rows.
-/// TODO: Do we want to insert a buffer before probing into the partition's 
hash table?
-/// TODO: Use a prefetch/batched probe interface.
-/// TODO: Return rows from the aggregated_row_stream rather than the HT.
-/// TODO: Think about spilling heuristic.
-/// TODO: When processing a spilled partition, we have a lot more information 
and can
-/// size the partitions/hash tables better.
-/// TODO: Start with unpartitioned (single partition) and switch to 
partitioning and
-/// spilling only if the size gets large, say larger than the LLC.
-/// TODO: Simplify or cleanup the various uses of agg_fn_ctx, agg_fn_ctx_, and 
ctx.
-/// There are so many contexts in use that a plain "ctx" variable should never 
be used.
-/// Likewise, it's easy to mixup the agg fn ctxs, there should be a way to 
simplify this.
-/// TODO: support an Init() method with an initial value in the UDAF interface.
-class PartitionedAggregationNode : public ExecNode {
- public:
-  PartitionedAggregationNode(ObjectPool* pool,
-      const TPlanNode& tnode, const DescriptorTbl& descs);
-
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
-  virtual Status Prepare(RuntimeState* state);
-  virtual void Codegen(RuntimeState* state);
-  virtual Status Open(RuntimeState* state);
-  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
-  virtual Status Reset(RuntimeState* state);
-  virtual void Close(RuntimeState* state);
-
-  static const char* LLVM_CLASS_NAME;
-
- protected:
-  virtual std::string DebugString(int indentation_level) const;
-  virtual void DebugString(int indentation_level, std::stringstream* out) 
const;
-
- private:
-  struct Partition;
-
-  /// Number of initial partitions to create. Must be a power of 2.
-  static const int PARTITION_FANOUT = 16;
-
-  /// Needs to be the log(PARTITION_FANOUT).
-  /// We use the upper bits to pick the partition and lower bits in the HT.
-  /// TODO: different hash functions here too? We don't need that many bits to 
pick
-  /// the partition so this might be okay.
-  static const int NUM_PARTITIONING_BITS = 4;
-
-  /// Maximum number of times we will repartition. The maximum build table we 
can process
-  /// (if we have enough scratch disk space) in case there is no skew is:
-  ///  MEM_LIMIT * (PARTITION_FANOUT ^ MAX_PARTITION_DEPTH).
-  /// In the case where there is skew, repartitioning is unlikely to help 
(assuming a
-  /// reasonable hash function).
-  /// Note that we need to have at least as many SEED_PRIMES in HashTableCtx.
-  /// TODO: we can revisit and try harder to explicitly detect skew.
-  static const int MAX_PARTITION_DEPTH = 16;
-
-  /// Default initial number of buckets in a hash table.
-  /// TODO: rethink this ?
-  static const int64_t PAGG_DEFAULT_HASH_TABLE_SZ = 1024;
-
-  /// Codegen doesn't allow for automatic Status variables because then 
exception
-  /// handling code is needed to destruct the Status, and our function call 
substitution
-  /// doesn't know how to deal with the LLVM IR 'invoke' instruction. 
Workaround that by
-  /// placing the Status here so exceptions won't need to destruct it.
-  /// TODO: fix IMPALA-1948 and remove this.
-  Status process_batch_status_;
-
-  /// Tuple into which Update()/Merge()/Serialize() results are stored.
-  TupleId intermediate_tuple_id_;
-  TupleDescriptor* intermediate_tuple_desc_;
-
-  /// Row with the intermediate tuple as its only tuple.
-  /// Construct a new row desc for preparing the build exprs because neither 
the child's
-  /// nor this node's output row desc may contain the intermediate tuple, e.g.,
-  /// in a single-node plan with an intermediate tuple different from the 
output tuple.
-  /// Lives in the query state's obj_pool.
-  RowDescriptor intermediate_row_desc_;
-
-  /// Tuple into which Finalize() results are stored. Possibly the same as
-  /// the intermediate tuple.
-  TupleId output_tuple_id_;
-  TupleDescriptor* output_tuple_desc_;
-
-  /// Certain aggregates require a finalize step, which is the final step of 
the
-  /// aggregate after consuming all input rows. The finalize step converts the 
aggregate
-  /// value into its final form. This is true if this node contains aggregate 
that
-  /// requires a finalize step.
-  const bool needs_finalize_;
-
-  /// True if this is first phase of a two-phase distributed aggregation for 
which we
-  /// are doing a streaming preaggregation.
-  const bool is_streaming_preagg_;
-
-  /// True if any of the evaluators require the serialize step.
-  bool needs_serialize_;
-
-  /// The list of all aggregate operations for this exec node.
-  std::vector<AggFn*> agg_fns_;
-
-  /// Evaluators for each aggregate function. If this is a grouping 
aggregation, these
-  /// evaluators are only used to create cloned per-partition evaluators. The 
cloned
-  /// evaluators are then used to evaluate the functions. If this is a 
non-grouping
-  /// aggregation these evaluators are used directly to evaluate the functions.
-  ///
-  /// Permanent and result allocations for these allocators are allocated from
-  /// 'expr_perm_pool_' and 'expr_results_pool_' respectively.
-  std::vector<AggFnEvaluator*> agg_fn_evals_;
-
-  /// Exprs used to evaluate input rows
-  std::vector<ScalarExpr*> grouping_exprs_;
-
-  /// Exprs used to insert constructed aggregation tuple into the hash table.
-  /// All the exprs are simply SlotRefs for the intermediate tuple.
-  std::vector<ScalarExpr*> build_exprs_;
-
-  /// Indices of grouping exprs with var-len string types in grouping_exprs_.
-  /// We need to do more work for var-len expressions when allocating and 
spilling rows.
-  /// All var-len grouping exprs have type string.
-  std::vector<int> string_grouping_exprs_;
-
-  RuntimeState* state_;
-
-  /// Allocator for hash table memory.
-  boost::scoped_ptr<Suballocator> ht_allocator_;
-
-  /// MemPool used to allocate memory for when we don't have grouping and 
don't initialize
-  /// the partitioning structures, or during Close() when creating new output 
tuples.
-  /// For non-grouping aggregations, the ownership of the pool's memory is 
transferred
-  /// to the output batch on eos. The pool should not be Reset() to allow 
amortizing
-  /// memory allocation over a series of Reset()/Open()/GetNext()* calls.
-  boost::scoped_ptr<MemPool> singleton_tuple_pool_;
-
-  /// The current partition and iterator to the next row in its hash table 
that we need
-  /// to return in GetNext()
-  Partition* output_partition_;
-  HashTable::Iterator output_iterator_;
-
-  typedef Status (*ProcessBatchNoGroupingFn)(PartitionedAggregationNode*, 
RowBatch*);
-  /// Jitted ProcessBatchNoGrouping function pointer. Null if codegen is 
disabled.
-  ProcessBatchNoGroupingFn process_batch_no_grouping_fn_;
-
-  typedef Status (*ProcessBatchFn)(
-      PartitionedAggregationNode*, RowBatch*, TPrefetchMode::type, 
HashTableCtx*);
-  /// Jitted ProcessBatch function pointer. Null if codegen is disabled.
-  ProcessBatchFn process_batch_fn_;
-
-  typedef Status (*ProcessBatchStreamingFn)(PartitionedAggregationNode*, bool,
-      TPrefetchMode::type, RowBatch*, RowBatch*, HashTableCtx*, 
int[PARTITION_FANOUT]);
-  /// Jitted ProcessBatchStreaming function pointer.  Null if codegen is 
disabled.
-  ProcessBatchStreamingFn process_batch_streaming_fn_;
-
-  /// Time spent processing the child rows
-  RuntimeProfile::Counter* build_timer_;
-
-  /// Total time spent resizing hash tables.
-  RuntimeProfile::Counter* ht_resize_timer_;
-
-  /// Time spent returning the aggregated rows
-  RuntimeProfile::Counter* get_results_timer_;
-
-  /// Total number of hash buckets across all partitions.
-  RuntimeProfile::Counter* num_hash_buckets_;
-
-  /// Total number of partitions created.
-  RuntimeProfile::Counter* partitions_created_;
-
-  /// Level of max partition (i.e. number of repartitioning steps).
-  RuntimeProfile::HighWaterMarkCounter* max_partition_level_;
-
-  /// Number of rows that have been repartitioned.
-  RuntimeProfile::Counter* num_row_repartitioned_;
-
-  /// Number of partitions that have been repartitioned.
-  RuntimeProfile::Counter* num_repartitions_;
-
-  /// Number of partitions that have been spilled.
-  RuntimeProfile::Counter* num_spilled_partitions_;
-
-  /// The largest fraction after repartitioning. This is expected to be
-  /// 1 / PARTITION_FANOUT. A value much larger indicates skew.
-  RuntimeProfile::HighWaterMarkCounter* largest_partition_percent_;
-
-  /// Time spent in streaming preagg algorithm.
-  RuntimeProfile::Counter* streaming_timer_;
-
-  /// The number of rows passed through without aggregation.
-  RuntimeProfile::Counter* num_passthrough_rows_;
-
-  /// The estimated reduction of the preaggregation.
-  RuntimeProfile::Counter* preagg_estimated_reduction_;
-
-  /// Expose the minimum reduction factor to continue growing the hash tables.
-  RuntimeProfile::Counter* preagg_streaming_ht_min_reduction_;
-
-  /// The estimated number of input rows from the planner.
-  int64_t estimated_input_cardinality_;
-
-  /////////////////////////////////////////
-  /// BEGIN: Members that must be Reset()
-
-  /// Result of aggregation w/o GROUP BY.
-  /// Note: can be NULL even if there is no grouping if the result tuple is 0 
width
-  /// e.g. select 1 from table group by col.
-  Tuple* singleton_output_tuple_;
-  bool singleton_output_tuple_returned_;
-
-  /// Row batch used as argument to GetNext() for the child node 
preaggregations. Store
-  /// in node to avoid reallocating for every GetNext() call when streaming.
-  boost::scoped_ptr<RowBatch> child_batch_;
-
-  /// If true, no more rows to output from partitions.
-  bool partition_eos_;
-
-  /// True if no more rows to process from child.
-  bool child_eos_;
-
-  /// Used for hash-related functionality, such as evaluating rows and 
calculating hashes.
-  /// It also owns the evaluators for the grouping and build expressions used 
during hash
-  /// table insertion and probing.
-  boost::scoped_ptr<HashTableCtx> ht_ctx_;
-
-  /// Object pool that holds the Partition objects in hash_partitions_.
-  boost::scoped_ptr<ObjectPool> partition_pool_;
-
-  /// Current partitions we are partitioning into. IMPALA-5788: For the case 
where we
-  /// rebuild a spilled partition that fits in memory, all pointers in this 
vector will
-  /// point to a single in-memory partition.
-  std::vector<Partition*> hash_partitions_;
-
-  /// Cache for hash tables in 'hash_partitions_'. IMPALA-5788: For the case 
where we
-  /// rebuild a spilled partition that fits in memory, all pointers in this 
array will
-  /// point to the hash table that is a part of a single in-memory partition.
-  HashTable* hash_tbls_[PARTITION_FANOUT];
-
-  /// All partitions that have been spilled and need further processing.
-  std::deque<Partition*> spilled_partitions_;
-
-  /// All partitions that are aggregated and can just return the results in 
GetNext().
-  /// After consuming all the input, hash_partitions_ is split into 
spilled_partitions_
-  /// and aggregated_partitions_, depending on if it was spilled or not.
-  std::deque<Partition*> aggregated_partitions_;
-
-  /// END: Members that must be Reset()
-  /////////////////////////////////////////
-
-  /// The hash table and streams (aggregated and unaggregated) for an 
individual
-  /// partition. The streams of each partition always (i.e. regardless of 
level)
-  /// initially use small buffers. Streaming pre-aggregations do not spill and 
do not
-  /// require an unaggregated stream.
-  struct Partition {
-    Partition(PartitionedAggregationNode* parent, int level, int idx)
-      : parent(parent), is_closed(false), level(level), idx(idx) {}
-
-    ~Partition();
-
-    /// Initializes aggregated_row_stream and unaggregated_row_stream (if a 
spilling
-    /// aggregation), allocating one buffer for each. Spilling merge 
aggregations must
-    /// have enough reservation for the initial buffer for the stream, so this 
should
-    /// not fail due to OOM. Preaggregations do not reserve any buffers: if 
does not
-    /// have enough reservation for the initial buffer, the aggregated row 
stream is not
-    /// created and an OK status is returned.
-    Status InitStreams() WARN_UNUSED_RESULT;
-
-    /// Initializes the hash table. 'aggregated_row_stream' must be non-NULL.
-    /// Sets 'got_memory' to true if the hash table was initialised or false 
on OOM.
-    Status InitHashTable(bool* got_memory) WARN_UNUSED_RESULT;
-
-    /// Called in case we need to serialize aggregated rows. This step 
effectively does
-    /// a merge aggregation in this node.
-    Status SerializeStreamForSpilling() WARN_UNUSED_RESULT;
-
-    /// Closes this partition. If finalize_rows is true, this iterates over 
all rows
-    /// in aggregated_row_stream and finalizes them (this is only used in the 
cancellation
-    /// path).
-    void Close(bool finalize_rows);
-
-    /// Spill this partition. 'more_aggregate_rows' = true means that more 
aggregate rows
-    /// may be appended to the the partition before appending unaggregated 
rows. On
-    /// success, one of the streams is left with a write iterator: the 
aggregated stream
-    /// if 'more_aggregate_rows' is true or the unaggregated stream otherwise.
-    Status Spill(bool more_aggregate_rows) WARN_UNUSED_RESULT;
-
-    bool is_spilled() const { return hash_tbl.get() == NULL; }
-
-    PartitionedAggregationNode* parent;
-
-    /// If true, this partition is closed and there is nothing left to do.
-    bool is_closed;
-
-    /// How many times rows in this partition have been repartitioned. 
Partitions created
-    /// from the node's children's input is level 0, 1 after the first 
repartitionining,
-    /// etc.
-    const int level;
-
-    /// The index of this partition within 'hash_partitions_' at its level.
-    const int idx;
-
-    /// Hash table for this partition.
-    /// Can be NULL if this partition is no longer maintaining a hash table 
(i.e.
-    /// is spilled or we are passing through all rows for this partition).
-    boost::scoped_ptr<HashTable> hash_tbl;
-
-    /// Clone of parent's agg_fn_evals_. Permanent allocations come from
-    /// 'agg_fn_perm_pool' and result allocations come from the ExecNode's
-    /// 'expr_results_pool_'.
-    std::vector<AggFnEvaluator*> agg_fn_evals;
-
-    /// Pool for permanent allocations for this partition's 'agg_fn_evals'. 
Freed at the
-    /// same times as 'agg_fn_evals' are closed: either when the partition is 
closed or
-    /// when it is spilled.
-    boost::scoped_ptr<MemPool> agg_fn_perm_pool;
-
-    /// Tuple stream used to store aggregated rows. When the partition is not 
spilled,
-    /// (meaning the hash table is maintained), this stream is pinned and 
contains the
-    /// memory referenced by the hash table. When it is spilled, this consumes 
reservation
-    /// for a write buffer only during repartitioning of aggregated rows.
-    ///
-    /// For streaming preaggs, this may be NULL if sufficient memory is not 
available.
-    /// In that case hash_tbl is also NULL and all rows for the partition will 
be passed
-    /// through.
-    boost::scoped_ptr<BufferedTupleStream> aggregated_row_stream;
-
-    /// Unaggregated rows that are spilled. Always NULL for streaming 
pre-aggregations.
-    /// Always unpinned. Has a write buffer allocated when the partition is 
spilled and
-    /// unaggregated rows are being processed.
-    boost::scoped_ptr<BufferedTupleStream> unaggregated_row_stream;
-  };
-
-  /// Stream used to store serialized spilled rows. Only used if 
needs_serialize_
-  /// is set. This stream is never pinned and only used in Partition::Spill as 
a
-  /// a temporary buffer.
-  boost::scoped_ptr<BufferedTupleStream> serialize_stream_;
-
-  /// Accessor for 'hash_tbls_' that verifies consistency with the partitions.
-  HashTable* ALWAYS_INLINE GetHashTable(int partition_idx) {
-    HashTable* ht = hash_tbls_[partition_idx];
-    DCHECK_EQ(ht, hash_partitions_[partition_idx]->hash_tbl.get());
-    return ht;
-  }
-
-  /// Constructs singleton output tuple, allocating memory from pool.
-  Tuple* ConstructSingletonOutputTuple(
-      const std::vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool);
-
-  /// Copies grouping values stored in 'ht_ctx_' that were computed over 
'current_row_'
-  /// using 'grouping_expr_evals_'. Aggregation expr slots are set to their 
initial
-  /// values. Returns NULL if there was not enough memory to allocate the 
tuple or errors
-  /// occurred. In which case, 'status' is set. Allocates tuple and var-len 
data for
-  /// grouping exprs from stream. Var-len data for aggregate exprs is 
allocated from the
-  /// FunctionContexts, so is stored outside the stream. If stream's small 
buffers get
-  /// full, it will attempt to switch to IO-buffers.
-  Tuple* ConstructIntermediateTuple(const std::vector<AggFnEvaluator*>& 
agg_fn_evals,
-      BufferedTupleStream* stream, Status* status) noexcept;
-
-  /// Constructs intermediate tuple, allocating memory from pool instead of 
the stream.
-  /// Returns NULL and sets status if there is not enough memory to allocate 
the tuple.
-  Tuple* ConstructIntermediateTuple(const std::vector<AggFnEvaluator*>& 
agg_fn_evals,
-      MemPool* pool, Status* status) noexcept;
-
-  /// Returns the number of bytes of variable-length data for the grouping 
values stored
-  /// in 'ht_ctx_'.
-  int GroupingExprsVarlenSize();
-
-  /// Initializes intermediate tuple by copying grouping values stored in 
'ht_ctx_' that
-  /// that were computed over 'current_row_' using 'grouping_expr_evals_'. 
Writes the
-  /// var-len data into buffer. 'buffer' points to the start of a buffer of at 
least the
-  /// size of the variable-length data: 'varlen_size'.
-  void CopyGroupingValues(Tuple* intermediate_tuple, uint8_t* buffer, int 
varlen_size);
-
-  /// Initializes the aggregate function slots of an intermediate tuple.
-  /// Any var-len data is allocated from the FunctionContexts.
-  void InitAggSlots(const std::vector<AggFnEvaluator*>& agg_fn_evals,
-      Tuple* intermediate_tuple);
-
-  /// Updates the given aggregation intermediate tuple with aggregation values 
computed
-  /// over 'row' using 'agg_fn_evals'. Whether the agg fn evaluator calls 
Update() or
-  /// Merge() is controlled by the evaluator itself, unless enforced 
explicitly by passing
-  /// in is_merge == true.  The override is needed to merge spilled and 
non-spilled rows
-  /// belonging to the same partition independent of whether the agg fn 
evaluators have
-  /// is_merge() == true.
-  /// This function is replaced by codegen (which is why we don't use a vector 
argument
-  /// for agg_fn_evals).. Any var-len data is allocated from the 
FunctionContexts.
-  /// TODO: Fix the arguments order. Need to update CodegenUpdateTuple() too.
-  void UpdateTuple(AggFnEvaluator** agg_fn_evals, Tuple* tuple, TupleRow* row,
-      bool is_merge = false) noexcept;
-
-  /// Called on the intermediate tuple of each group after all input rows have 
been
-  /// consumed and aggregated. Computes the final aggregate values to be 
returned in
-  /// GetNext() using the agg fn evaluators' Serialize() or Finalize().
-  /// For the Finalize() case if the output tuple is different from the 
intermediate
-  /// tuple, then a new tuple is allocated from 'pool' to hold the final 
result.
-  /// Grouping values are copied into the output tuple and the the output 
tuple holding
-  /// the finalized/serialized aggregate values is returned.
-  /// TODO: Coordinate the allocation of new tuples with the release of memory
-  /// so as not to make memory consumption blow up.
-  Tuple* GetOutputTuple(const std::vector<AggFnEvaluator*>& agg_fn_evals,
-      Tuple* tuple, MemPool* pool);
-
-  /// Do the aggregation for all tuple rows in the batch when there is no 
grouping.
-  /// This function is replaced by codegen.
-  Status ProcessBatchNoGrouping(RowBatch* batch) WARN_UNUSED_RESULT;
-
-  /// Processes a batch of rows. This is the core function of the algorithm. 
We partition
-  /// the rows into hash_partitions_, spilling as necessary.
-  /// If AGGREGATED_ROWS is true, it means that the rows in the batch are 
already
-  /// pre-aggregated.
-  /// 'prefetch_mode' specifies the prefetching mode in use. If it's not 
PREFETCH_NONE,
-  ///     hash table buckets will be prefetched based on the hash values 
computed. Note
-  ///     that 'prefetch_mode' will be substituted with constants during 
codegen time.
-  //
-  /// This function is replaced by codegen. We pass in ht_ctx_.get() as an 
argument for
-  /// performance.
-  template <bool AGGREGATED_ROWS>
-  Status IR_ALWAYS_INLINE ProcessBatch(RowBatch* batch, TPrefetchMode::type 
prefetch_mode,
-      HashTableCtx* ht_ctx) WARN_UNUSED_RESULT;
-
-  /// Evaluates the rows in 'batch' starting at 'start_row_idx' and stores the 
results in
-  /// the expression values cache in 'ht_ctx'. The number of rows evaluated 
depends on
-  /// the capacity of the cache. 'prefetch_mode' specifies the prefetching 
mode in use.
-  /// If it's not PREFETCH_NONE, hash table buckets for the computed hashes 
will be
-  /// prefetched. Note that codegen replaces 'prefetch_mode' with a constant.
-  template<bool AGGREGATED_ROWS>
-  void EvalAndHashPrefetchGroup(RowBatch* batch, int start_row_idx,
-      TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx);
-
-  /// This function processes each individual row in ProcessBatch(). Must be 
inlined into
-  /// ProcessBatch for codegen to substitute function calls with codegen'd 
versions.
-  /// May spill partitions if not enough memory is available.
-  template <bool AGGREGATED_ROWS>
-  Status IR_ALWAYS_INLINE ProcessRow(
-      TupleRow* row, HashTableCtx* ht_ctx) WARN_UNUSED_RESULT;
-
-  /// Create a new intermediate tuple in partition, initialized with row. 
ht_ctx is
-  /// the context for the partition's hash table and hash is the precomputed 
hash of
-  /// the row. The row can be an unaggregated or aggregated row depending on
-  /// AGGREGATED_ROWS. Spills partitions if necessary to append the new 
intermediate
-  /// tuple to the partition's stream. Must be inlined into ProcessBatch for 
codegen
-  /// to substitute function calls with codegen'd versions.  insert_it is an 
iterator
-  /// for insertion returned from HashTable::FindBuildRowBucket().
-  template <bool AGGREGATED_ROWS>
-  Status IR_ALWAYS_INLINE AddIntermediateTuple(Partition* partition, TupleRow* 
row,
-      uint32_t hash, HashTable::Iterator insert_it) WARN_UNUSED_RESULT;
-
-  /// Append a row to a spilled partition. The row may be aggregated or 
unaggregated
-  /// according to AGGREGATED_ROWS. May spill partitions if needed to append 
the row
-  /// buffers.
-  template <bool AGGREGATED_ROWS>
-  Status IR_ALWAYS_INLINE AppendSpilledRow(
-      Partition* partition, TupleRow* row) WARN_UNUSED_RESULT;
-
-  /// Reads all the rows from input_stream and process them by calling 
ProcessBatch().
-  template <bool AGGREGATED_ROWS>
-  Status ProcessStream(BufferedTupleStream* input_stream) WARN_UNUSED_RESULT;
-
-  /// Output 'singleton_output_tuple_' and transfer memory to 'row_batch'.
-  void GetSingletonOutput(RowBatch* row_batch);
-
-  /// Get rows for the next rowbatch from the next partition. Sets 
'partition_eos_' to
-  /// true if all rows from all partitions have been returned or the limit is 
reached.
-  Status GetRowsFromPartition(
-      RuntimeState* state, RowBatch* row_batch) WARN_UNUSED_RESULT;
-
-  /// Get output rows from child for streaming pre-aggregation. Aggregates 
some rows with
-  /// hash table and passes through other rows converted into the intermediate
-  /// tuple format. Sets 'child_eos_' once all rows from child have been 
returned.
-  Status GetRowsStreaming(RuntimeState* state, RowBatch* row_batch) 
WARN_UNUSED_RESULT;
-
-  /// Return true if we should keep expanding hash tables in the preagg. If 
false,
-  /// the preagg should pass through any rows it can't fit in its tables.
-  bool ShouldExpandPreaggHashTables() const;
-
-  /// Streaming processing of in_batch from child. Rows from child are either 
aggregated
-  /// into the hash table or added to 'out_batch' in the intermediate tuple 
format.
-  /// 'in_batch' is processed entirely, and 'out_batch' must have enough 
capacity to
-  /// store all of the rows in 'in_batch'.
-  /// 'needs_serialize' is an argument so that codegen can replace it with a 
constant,
-  ///     rather than using the member variable 'needs_serialize_'.
-  /// 'prefetch_mode' specifies the prefetching mode in use. If it's not 
PREFETCH_NONE,
-  ///     hash table buckets will be prefetched based on the hash values 
computed. Note
-  ///     that 'prefetch_mode' will be substituted with constants during 
codegen time.
-  /// 'remaining_capacity' is an array with PARTITION_FANOUT entries with the 
number of
-  ///     additional rows that can be added to the hash table per partition. 
It is updated
-  ///     by ProcessBatchStreaming() when it inserts new rows.
-  /// 'ht_ctx' is passed in as a way to avoid aliasing of 'this' confusing the 
optimiser.
-  Status ProcessBatchStreaming(bool needs_serialize, TPrefetchMode::type 
prefetch_mode,
-      RowBatch* in_batch, RowBatch* out_batch, HashTableCtx* ht_ctx,
-      int remaining_capacity[PARTITION_FANOUT]) WARN_UNUSED_RESULT;
-
-  /// Tries to add intermediate to the hash table 'hash_tbl' of 'partition' 
for streaming
-  /// aggregation. The input row must have been evaluated with 'ht_ctx', with 
'hash' set
-  /// to the corresponding hash. If the tuple already exists in the hash 
table, update
-  /// the tuple and return true. Otherwise try to create a new entry in the 
hash table,
-  /// returning true if successful or false if the table is full. 
'remaining_capacity'
-  /// keeps track of how many more entries can be added to the hash table so 
we can avoid
-  /// retrying inserts. It is decremented if an insert succeeds and set to 
zero if an
-  /// insert fails. If an error occurs, returns false and sets 'status'.
-  bool IR_ALWAYS_INLINE TryAddToHashTable(HashTableCtx* ht_ctx, Partition* 
partition,
-      HashTable* hash_tbl, TupleRow* in_row, uint32_t hash, int* 
remaining_capacity,
-      Status* status) WARN_UNUSED_RESULT;
-
-  /// Initializes hash_partitions_. 'level' is the level for the partitions to 
create.
-  /// If 'single_partition_idx' is provided, it must be a number in range
-  /// [0, PARTITION_FANOUT), and only that partition is created - all others 
point to it.
-  /// Also sets ht_ctx_'s level to 'level'.
-  Status CreateHashPartitions(
-      int level, int single_partition_idx = -1) WARN_UNUSED_RESULT;
-
-  /// Ensure that hash tables for all in-memory partitions are large enough to 
fit
-  /// 'num_rows' additional hash table entries. If there is not enough memory 
to
-  /// resize the hash tables, may spill partitions. 'aggregated_rows' is true 
if
-  /// we're currently partitioning aggregated rows.
-  Status CheckAndResizeHashPartitions(
-      bool aggregated_rows, int num_rows, const HashTableCtx* ht_ctx) 
WARN_UNUSED_RESULT;
-
-  /// Prepares the next partition to return results from. On return, this 
function
-  /// initializes output_iterator_ and output_partition_. This either removes
-  /// a partition from aggregated_partitions_ (and is done) or removes the next
-  /// partition from aggregated_partitions_ and repartitions it.
-  Status NextPartition() WARN_UNUSED_RESULT;
-
-  /// Tries to build the first partition in 'spilled_partitions_'.
-  /// If successful, set *built_partition to the partition. The caller owns 
the partition
-  /// and is responsible for closing it. If unsuccessful because the partition 
could not
-  /// fit in memory, set *built_partition to NULL and append the spilled 
partition to the
-  /// head of 'spilled_partitions_' so it can be processed by
-  /// RepartitionSpilledPartition().
-  Status BuildSpilledPartition(Partition** built_partition) WARN_UNUSED_RESULT;
-
-  /// Repartitions the first partition in 'spilled_partitions_' into 
PARTITION_FANOUT
-  /// output partitions. On success, each output partition is either:
-  /// * closed, if no rows were added to the partition.
-  /// * in 'spilled_partitions_', if the partition spilled.
-  /// * in 'aggregated_partitions_', if the output partition was not spilled.
-  Status RepartitionSpilledPartition() WARN_UNUSED_RESULT;
-
-  /// Picks a partition from 'hash_partitions_' to spill. 
'more_aggregate_rows' is passed
-  /// to Partition::Spill() when spilling the partition. See the 
Partition::Spill()
-  /// comment for further explanation.
-  Status SpillPartition(bool more_aggregate_rows) WARN_UNUSED_RESULT;
-
-  /// Moves the partitions in hash_partitions_ to aggregated_partitions_ or
-  /// spilled_partitions_. Partitions moved to spilled_partitions_ are 
unpinned.
-  /// input_rows is the number of input rows that have been repartitioned.
-  /// Used for diagnostics.
-  Status MoveHashPartitions(int64_t input_rows) WARN_UNUSED_RESULT;
-
-  /// Adds a partition to the front of 'spilled_partitions_' for later 
processing.
-  /// 'spilled_partitions_' uses LIFO so more finely partitioned partitions 
are processed
-  /// first). This allows us to delete pages earlier and bottom out the 
recursion
-  /// earlier and also improves time locality of access to spilled data on 
disk.
-  void PushSpilledPartition(Partition* partition);
-
-  /// Calls Close() on every Partition in 'aggregated_partitions_',
-  /// 'spilled_partitions_', and 'hash_partitions_' and then resets the lists,
-  /// the vector and the partition pool.
-  void ClosePartitions();
-
-  /// Calls finalizes on all tuples starting at 'it'.
-  void CleanupHashTbl(const std::vector<AggFnEvaluator*>& agg_fn_evals,
-      HashTable::Iterator it);
-
-  /// Codegen for updating aggregate expressions agg_fns_[agg_fn_idx]
-  /// and returns the IR function in 'fn'. Returns non-OK status if codegen
-  /// is unsuccessful.
-  Status CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx,
-      SlotDescriptor* slot_desc, llvm::Function** fn) WARN_UNUSED_RESULT;
-
-  /// Codegen a call to a function implementing the UDA interface with input 
values
-  /// from 'input_vals'. 'dst_val' should contain the previous value of the 
aggregate
-  /// function, and 'updated_dst_val' is set to the new value after the Update 
or Merge
-  /// operation is applied. The instruction sequence for the UDA call is 
inserted at
-  /// the insert position of 'builder'.
-  Status CodegenCallUda(LlvmCodeGen* codegen, LlvmBuilder* builder, AggFn* 
agg_fn,
-      llvm::Value* agg_fn_ctx_arg, const std::vector<CodegenAnyVal>& 
input_vals,
-      const CodegenAnyVal& dst_val, CodegenAnyVal* updated_dst_val) 
WARN_UNUSED_RESULT;
-
-  /// Codegen UpdateTuple(). Returns non-OK status if codegen is unsuccessful.
-  Status CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn) 
WARN_UNUSED_RESULT;
-
-  /// Codegen the non-streaming process row batch loop. The loop has already 
been
-  /// compiled to IR and loaded into the codegen object. UpdateAggTuple has 
also been
-  /// codegen'd to IR. This function will modify the loop subsituting the 
statically
-  /// compiled functions with codegen'd ones. 'process_batch_fn_' or
-  /// 'process_batch_no_grouping_fn_' will be updated with the codegened 
function
-  /// depending on whether this is a grouping or non-grouping aggregation.
-  /// Assumes AGGREGATED_ROWS = false.
-  Status CodegenProcessBatch(
-      LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) 
WARN_UNUSED_RESULT;
-
-  /// Codegen the materialization loop for streaming preaggregations.
-  /// 'process_batch_streaming_fn_' will be updated with the codegened 
function.
-  Status CodegenProcessBatchStreaming(
-      LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) 
WARN_UNUSED_RESULT;
-
-  /// Compute minimum buffer reservation for grouping aggregations.
-  /// We need one buffer per partition, which is used either as the write 
buffer for the
-  /// aggregated stream or the unaggregated stream. We need an additional 
buffer to read
-  /// the stream we are currently repartitioning. The read buffer needs to be 
a max-sized
-  /// buffer to hold a max-sized row and we need one max-sized write buffer 
that is used
-  /// temporarily to append a row to any stream.
-  ///
-  /// If we need to serialize, we need an additional buffer while spilling a 
partition
-  /// as the partitions aggregate stream needs to be serialized and rewritten.
-  /// We do not spill streaming preaggregations, so we do not need to reserve 
any buffers.
-  int64_t MinReservation() const {
-    DCHECK(!grouping_exprs_.empty());
-    // Must be kept in sync with AggregationNode.computeNodeResourceProfile() 
in fe.
-    if (is_streaming_preagg_) {
-      // Reserve at least one buffer and a 64kb hash table per partition.
-      return (resource_profile_.spillable_buffer_size + 64 * 1024) * 
PARTITION_FANOUT;
-    }
-    int num_buffers = PARTITION_FANOUT + 1 + (needs_serialize_ ? 1 : 0);
-    // Two of the buffers must fit the maximum row.
-    return resource_profile_.spillable_buffer_size * (num_buffers - 2) +
-          resource_profile_.max_row_buffer_size * 2;
-  }
-};
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/streaming-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/streaming-aggregation-node.cc 
b/be/src/exec/streaming-aggregation-node.cc
new file mode 100644
index 0000000..4ad7820
--- /dev/null
+++ b/be/src/exec/streaming-aggregation-node.cc
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/streaming-aggregation-node.h"
+
+#include <sstream>
+
+#include "gutil/strings/substitute.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "util/runtime-profile-counters.h"
+
+#include "gen-cpp/PlanNodes_types.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+StreamingAggregationNode::StreamingAggregationNode(
+    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
+  : ExecNode(pool, tnode, descs), child_eos_(false) {
+  DCHECK(conjunct_evals_.empty()) << "Preaggs have no conjuncts";
+  DCHECK(!tnode.agg_node.grouping_exprs.empty()) << "Streaming preaggs do 
grouping";
+  DCHECK(limit_ == -1) << "Preaggs have no limits";
+}
+
+Status StreamingAggregationNode::Init(const TPlanNode& tnode, RuntimeState* 
state) {
+  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
+  aggregator_.reset(new GroupingAggregator(this, pool_, tnode, 
state->desc_tbl()));
+  RETURN_IF_ERROR(aggregator_->Init(tnode, state));
+  runtime_profile_->AddChild(aggregator_->runtime_profile());
+  return Status::OK();
+}
+
+Status StreamingAggregationNode::Prepare(RuntimeState* state) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  RETURN_IF_ERROR(ExecNode::Prepare(state));
+  aggregator_->SetDebugOptions(debug_options_);
+  RETURN_IF_ERROR(aggregator_->Prepare(state));
+  state->CheckAndAddCodegenDisabledMessage(runtime_profile());
+  return Status::OK();
+}
+
+void StreamingAggregationNode::Codegen(RuntimeState* state) {
+  DCHECK(state->ShouldCodegen());
+  ExecNode::Codegen(state);
+  if (IsNodeCodegenDisabled()) return;
+
+  aggregator_->Codegen(state);
+}
+
+Status StreamingAggregationNode::Open(RuntimeState* state) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  // Open the child before consuming resources in this node.
+  RETURN_IF_ERROR(child(0)->Open(state));
+  RETURN_IF_ERROR(ExecNode::Open(state));
+
+  RETURN_IF_ERROR(aggregator_->Open(state));
+
+  // Streaming preaggregations do all processing in GetNext().
+  return Status::OK();
+}
+
+Status StreamingAggregationNode::GetNext(
+    RuntimeState* state, RowBatch* row_batch, bool* eos) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
+  RETURN_IF_CANCELLED(state);
+  RETURN_IF_ERROR(QueryMaintenance(state));
+
+  if (ReachedLimit()) {
+    *eos = true;
+    return Status::OK();
+  }
+
+  bool aggregator_eos = false;
+  if (!child_eos_) {
+    // For streaming preaggregations, we process rows from the child as we go.
+    RETURN_IF_ERROR(GetRowsStreaming(state, row_batch));
+  } else {
+    RETURN_IF_ERROR(aggregator_->GetNext(state, row_batch, &aggregator_eos));
+  }
+
+  *eos = aggregator_eos && child_eos_;
+  num_rows_returned_ += row_batch->num_rows();
+  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  return Status::OK();
+}
+
+Status StreamingAggregationNode::GetRowsStreaming(
+    RuntimeState* state, RowBatch* out_batch) {
+  DCHECK(!child_eos_);
+
+  if (child_batch_ == nullptr) {
+    child_batch_.reset(
+        new RowBatch(child(0)->row_desc(), state->batch_size(), 
mem_tracker()));
+  }
+
+  do {
+    DCHECK_EQ(out_batch->num_rows(), 0);
+    RETURN_IF_CANCELLED(state);
+    RETURN_IF_ERROR(QueryMaintenance(state));
+
+    RETURN_IF_ERROR(child(0)->GetNext(state, child_batch_.get(), &child_eos_));
+
+    RETURN_IF_ERROR(aggregator_->AddBatchStreaming(state, out_batch, 
child_batch_.get()));
+    child_batch_->Reset(); // All rows from child_batch_ were processed.
+  } while (out_batch->num_rows() == 0 && !child_eos_);
+
+  if (child_eos_) {
+    child(0)->Close(state);
+    child_batch_.reset();
+    RETURN_IF_ERROR(aggregator_->InputDone());
+  }
+
+  return Status::OK();
+}
+
+Status StreamingAggregationNode::Reset(RuntimeState* state) {
+  DCHECK(false) << "Cannot reset preaggregation";
+  return Status("Cannot reset preaggregation");
+}
+
+void StreamingAggregationNode::Close(RuntimeState* state) {
+  if (is_closed()) return;
+  aggregator_->Close(state);
+  child_batch_.reset();
+  ExecNode::Close(state);
+}
+
+void StreamingAggregationNode::DebugString(
+    int indentation_level, stringstream* out) const {
+  *out << string(indentation_level * 2, ' ');
+  *out << "StreamingAggregationNode("
+       << "aggregator=" << aggregator_->DebugString();
+  ExecNode::DebugString(indentation_level, out);
+  *out << ")";
+}
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/streaming-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/streaming-aggregation-node.h 
b/be/src/exec/streaming-aggregation-node.h
new file mode 100644
index 0000000..8e06b2a
--- /dev/null
+++ b/be/src/exec/streaming-aggregation-node.h
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_STREAMING_AGGREGATION_NODE_H
+#define IMPALA_EXEC_STREAMING_AGGREGATION_NODE_H
+
+#include <memory>
+
+#include "exec/exec-node.h"
+#include "exec/grouping-aggregator.h"
+
+namespace impala {
+
+class RowBatch;
+class RuntimeState;
+
+/// Node for doing streaming partitioned hash aggregation.
+///
+/// This node consumes the input from child(0) during GetNext() and then 
passes it to the
+/// Aggregator, which does the actual work of aggregating. The aggregator will 
attempt to
+/// aggregate the rows into its hash table, but if there is not enough memory 
available or
+/// if the reduction from the aggregation is not very good, it will 'stream' 
the rows
+/// through and return them without aggregating them instead of spilling. 
After all of the
+/// input as been processed from child(0), subsequent calls to GetNext() will 
return any
+/// rows that were aggregated in the Aggregator's hash table.
+///
+/// Since the rows returned by GetNext() may be only partially aggregated if 
there are
+/// memory contraints, this is a preliminary aggregation step that functions 
as an
+/// optimization and will always be followed in the plan by an AggregationNode 
that does
+/// the final aggregation.
+///
+/// This node only supports grouping aggregations.
+class StreamingAggregationNode : public ExecNode {
+ public:
+  StreamingAggregationNode(
+      ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status Prepare(RuntimeState* state) override;
+  virtual void Codegen(RuntimeState* state) override;
+  virtual Status Open(RuntimeState* state) override;
+  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) 
override;
+  virtual Status Reset(RuntimeState* state) override;
+  virtual void Close(RuntimeState* state) override;
+
+  virtual void DebugString(int indentation_level, std::stringstream* out) 
const override;
+
+ private:
+  /////////////////////////////////////////
+  /// BEGIN: Members that must be Reset()
+
+  /// Row batch used as argument to GetNext() for the child node 
preaggregations. Store
+  /// in node to avoid reallocating for every GetNext() call when streaming.
+  std::unique_ptr<RowBatch> child_batch_;
+
+  /// True if no more rows to process from child.
+  bool child_eos_;
+
+  std::unique_ptr<GroupingAggregator> aggregator_;
+
+  /// END: Members that must be Reset()
+  /////////////////////////////////////////
+
+  /// Get output rows from child for streaming pre-aggregation. Aggregates 
some rows with
+  /// hash table and passes through other rows converted into the intermediate
+  /// tuple format. Sets 'child_eos_' once all rows from child have been 
returned.
+  Status GetRowsStreaming(RuntimeState* state, RowBatch* row_batch) 
WARN_UNUSED_RESULT;
+};
+} // namespace impala
+
+#endif // IMPALA_EXEC_STREAMING_AGGREGATION_NODE_H

Reply via email to