Repository: impala Updated Branches: refs/heads/master dde930830 -> e07fbc1b6
http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/partitioned-aggregation-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h deleted file mode 100644 index c522c1b..0000000 --- a/be/src/exec/partitioned-aggregation-node.h +++ /dev/null @@ -1,734 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -#ifndef IMPALA_EXEC_PARTITIONED_AGGREGATION_NODE_H -#define IMPALA_EXEC_PARTITIONED_AGGREGATION_NODE_H - -#include <deque> - -#include <boost/scoped_ptr.hpp> - -#include "exec/exec-node.h" -#include "exec/hash-table.h" -#include "runtime/buffered-tuple-stream.h" -#include "runtime/bufferpool/suballocator.h" -#include "runtime/descriptors.h" // for TupleId -#include "runtime/mem-pool.h" -#include "runtime/string-value.h" - -namespace llvm { -class BasicBlock; -class Function; -class Value; -} - -namespace impala { - -class AggFn; -class AggFnEvaluator; -class CodegenAnyVal; -class LlvmCodeGen; -class LlvmBuilder; -class RowBatch; -class RuntimeState; -struct StringValue; -class Tuple; -class TupleDescriptor; -class SlotDescriptor; - -/// Node for doing partitioned hash aggregation. -/// This node consumes the input (which can be from the child(0) or a spilled partition). -/// 1. Each row is hashed and we pick a dst partition (hash_partitions_). -/// 2. If the dst partition is not spilled, we probe into the partitions hash table -/// to aggregate/insert the row. -/// 3. If the partition is already spilled, the input row is spilled. -/// 4. When all the input is consumed, we walk hash_partitions_, put the spilled ones -/// into spilled_partitions_ and the non-spilled ones into aggregated_partitions_. -/// aggregated_partitions_ contain partitions that are fully processed and the result -/// can just be returned. Partitions in spilled_partitions_ need to be repartitioned -/// and we just repeat these steps. -// -/// Each partition contains these structures: -/// 1) Hash Table for aggregated rows. This contains just the hash table directory -/// structure but not the rows themselves. This is NULL for spilled partitions when -/// we stop maintaining the hash table. -/// 2) MemPool for var-len result data for rows in the hash table. If the aggregate -/// function returns a string, we cannot append it to the tuple stream as that -/// structure is immutable. Instead, when we need to spill, we sweep and copy the -/// rows into a tuple stream. -/// 3) Aggregated tuple stream for rows that are/were in the hash table. This stream -/// contains rows that are aggregated. When the partition is not spilled, this stream -/// is pinned and contains the memory referenced by the hash table. -/// In the case where the aggregate function does not return a string (meaning the -/// size of all the slots is known when the row is constructed), this stream contains -/// all the memory for the result rows and the MemPool (2) is not used. -/// 4) Unaggregated tuple stream. Stream to spill unaggregated rows. -/// Rows in this stream always have child(0)'s layout. -/// -/// Buffering: Each stream and hash table needs to maintain at least one buffer when -/// it is being read or written. The streams for a given agg use a uniform buffer size, -/// except when processing rows larger than that buffer size. In that case, the agg uses -/// BufferedTupleStream's variable buffer size support to handle larger rows up to the -/// maximum row size. Only two max-sized buffers are needed for the agg to spill: one -/// to hold rows being read from a spilled input stream and another for a temporary write -/// buffer when adding a row to an output stream. -/// -/// Two-phase aggregation: we support two-phase distributed aggregations, where -/// pre-aggregrations attempt to reduce the size of data before shuffling data across the -/// network to be merged by the merge aggregation node. This exec node supports a -/// streaming mode for pre-aggregations where it maintains a hash table of aggregated -/// rows, but can pass through unaggregated rows (after transforming them into the -/// same tuple format as aggregated rows) when a heuristic determines that it is better -/// to send rows across the network instead of consuming additional memory and CPU -/// resources to expand its hash table. The planner decides whether a given -/// pre-aggregation should use the streaming preaggregation algorithm or the same -/// blocking aggregation algorithm as used in merge aggregations. -/// TODO: make this less of a heuristic by factoring in the cost of the exchange vs the -/// cost of the pre-aggregation. -/// -/// If there are no grouping expressions, there is only a single output row for both -/// preaggregations and merge aggregations. This case is handled separately to avoid -/// building hash tables. There is also no need to do streaming preaggregations. -/// -/// Handling memory pressure: the node uses two different strategies for responding to -/// memory pressure, depending on whether it is a streaming pre-aggregation or not. If -/// the node is a streaming preaggregation, it stops growing its hash table further by -/// converting unaggregated rows into the aggregated tuple format and passing them -/// through. If the node is not a streaming pre-aggregation, it responds to memory -/// pressure by spilling partitions to disk. -/// -/// TODO: Buffer rows before probing into the hash table? -/// TODO: After spilling, we can still maintain a very small hash table just to remove -/// some number of rows (from likely going to disk). -/// TODO: Consider allowing to spill the hash table structure in addition to the rows. -/// TODO: Do we want to insert a buffer before probing into the partition's hash table? -/// TODO: Use a prefetch/batched probe interface. -/// TODO: Return rows from the aggregated_row_stream rather than the HT. -/// TODO: Think about spilling heuristic. -/// TODO: When processing a spilled partition, we have a lot more information and can -/// size the partitions/hash tables better. -/// TODO: Start with unpartitioned (single partition) and switch to partitioning and -/// spilling only if the size gets large, say larger than the LLC. -/// TODO: Simplify or cleanup the various uses of agg_fn_ctx, agg_fn_ctx_, and ctx. -/// There are so many contexts in use that a plain "ctx" variable should never be used. -/// Likewise, it's easy to mixup the agg fn ctxs, there should be a way to simplify this. -/// TODO: support an Init() method with an initial value in the UDAF interface. -class PartitionedAggregationNode : public ExecNode { - public: - PartitionedAggregationNode(ObjectPool* pool, - const TPlanNode& tnode, const DescriptorTbl& descs); - - virtual Status Init(const TPlanNode& tnode, RuntimeState* state); - virtual Status Prepare(RuntimeState* state); - virtual void Codegen(RuntimeState* state); - virtual Status Open(RuntimeState* state); - virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos); - virtual Status Reset(RuntimeState* state); - virtual void Close(RuntimeState* state); - - static const char* LLVM_CLASS_NAME; - - protected: - virtual std::string DebugString(int indentation_level) const; - virtual void DebugString(int indentation_level, std::stringstream* out) const; - - private: - struct Partition; - - /// Number of initial partitions to create. Must be a power of 2. - static const int PARTITION_FANOUT = 16; - - /// Needs to be the log(PARTITION_FANOUT). - /// We use the upper bits to pick the partition and lower bits in the HT. - /// TODO: different hash functions here too? We don't need that many bits to pick - /// the partition so this might be okay. - static const int NUM_PARTITIONING_BITS = 4; - - /// Maximum number of times we will repartition. The maximum build table we can process - /// (if we have enough scratch disk space) in case there is no skew is: - /// MEM_LIMIT * (PARTITION_FANOUT ^ MAX_PARTITION_DEPTH). - /// In the case where there is skew, repartitioning is unlikely to help (assuming a - /// reasonable hash function). - /// Note that we need to have at least as many SEED_PRIMES in HashTableCtx. - /// TODO: we can revisit and try harder to explicitly detect skew. - static const int MAX_PARTITION_DEPTH = 16; - - /// Default initial number of buckets in a hash table. - /// TODO: rethink this ? - static const int64_t PAGG_DEFAULT_HASH_TABLE_SZ = 1024; - - /// Codegen doesn't allow for automatic Status variables because then exception - /// handling code is needed to destruct the Status, and our function call substitution - /// doesn't know how to deal with the LLVM IR 'invoke' instruction. Workaround that by - /// placing the Status here so exceptions won't need to destruct it. - /// TODO: fix IMPALA-1948 and remove this. - Status process_batch_status_; - - /// Tuple into which Update()/Merge()/Serialize() results are stored. - TupleId intermediate_tuple_id_; - TupleDescriptor* intermediate_tuple_desc_; - - /// Row with the intermediate tuple as its only tuple. - /// Construct a new row desc for preparing the build exprs because neither the child's - /// nor this node's output row desc may contain the intermediate tuple, e.g., - /// in a single-node plan with an intermediate tuple different from the output tuple. - /// Lives in the query state's obj_pool. - RowDescriptor intermediate_row_desc_; - - /// Tuple into which Finalize() results are stored. Possibly the same as - /// the intermediate tuple. - TupleId output_tuple_id_; - TupleDescriptor* output_tuple_desc_; - - /// Certain aggregates require a finalize step, which is the final step of the - /// aggregate after consuming all input rows. The finalize step converts the aggregate - /// value into its final form. This is true if this node contains aggregate that - /// requires a finalize step. - const bool needs_finalize_; - - /// True if this is first phase of a two-phase distributed aggregation for which we - /// are doing a streaming preaggregation. - const bool is_streaming_preagg_; - - /// True if any of the evaluators require the serialize step. - bool needs_serialize_; - - /// The list of all aggregate operations for this exec node. - std::vector<AggFn*> agg_fns_; - - /// Evaluators for each aggregate function. If this is a grouping aggregation, these - /// evaluators are only used to create cloned per-partition evaluators. The cloned - /// evaluators are then used to evaluate the functions. If this is a non-grouping - /// aggregation these evaluators are used directly to evaluate the functions. - /// - /// Permanent and result allocations for these allocators are allocated from - /// 'expr_perm_pool_' and 'expr_results_pool_' respectively. - std::vector<AggFnEvaluator*> agg_fn_evals_; - - /// Exprs used to evaluate input rows - std::vector<ScalarExpr*> grouping_exprs_; - - /// Exprs used to insert constructed aggregation tuple into the hash table. - /// All the exprs are simply SlotRefs for the intermediate tuple. - std::vector<ScalarExpr*> build_exprs_; - - /// Indices of grouping exprs with var-len string types in grouping_exprs_. - /// We need to do more work for var-len expressions when allocating and spilling rows. - /// All var-len grouping exprs have type string. - std::vector<int> string_grouping_exprs_; - - RuntimeState* state_; - - /// Allocator for hash table memory. - boost::scoped_ptr<Suballocator> ht_allocator_; - - /// MemPool used to allocate memory for when we don't have grouping and don't initialize - /// the partitioning structures, or during Close() when creating new output tuples. - /// For non-grouping aggregations, the ownership of the pool's memory is transferred - /// to the output batch on eos. The pool should not be Reset() to allow amortizing - /// memory allocation over a series of Reset()/Open()/GetNext()* calls. - boost::scoped_ptr<MemPool> singleton_tuple_pool_; - - /// The current partition and iterator to the next row in its hash table that we need - /// to return in GetNext() - Partition* output_partition_; - HashTable::Iterator output_iterator_; - - typedef Status (*ProcessBatchNoGroupingFn)(PartitionedAggregationNode*, RowBatch*); - /// Jitted ProcessBatchNoGrouping function pointer. Null if codegen is disabled. - ProcessBatchNoGroupingFn process_batch_no_grouping_fn_; - - typedef Status (*ProcessBatchFn)( - PartitionedAggregationNode*, RowBatch*, TPrefetchMode::type, HashTableCtx*); - /// Jitted ProcessBatch function pointer. Null if codegen is disabled. - ProcessBatchFn process_batch_fn_; - - typedef Status (*ProcessBatchStreamingFn)(PartitionedAggregationNode*, bool, - TPrefetchMode::type, RowBatch*, RowBatch*, HashTableCtx*, int[PARTITION_FANOUT]); - /// Jitted ProcessBatchStreaming function pointer. Null if codegen is disabled. - ProcessBatchStreamingFn process_batch_streaming_fn_; - - /// Time spent processing the child rows - RuntimeProfile::Counter* build_timer_; - - /// Total time spent resizing hash tables. - RuntimeProfile::Counter* ht_resize_timer_; - - /// Time spent returning the aggregated rows - RuntimeProfile::Counter* get_results_timer_; - - /// Total number of hash buckets across all partitions. - RuntimeProfile::Counter* num_hash_buckets_; - - /// Total number of partitions created. - RuntimeProfile::Counter* partitions_created_; - - /// Level of max partition (i.e. number of repartitioning steps). - RuntimeProfile::HighWaterMarkCounter* max_partition_level_; - - /// Number of rows that have been repartitioned. - RuntimeProfile::Counter* num_row_repartitioned_; - - /// Number of partitions that have been repartitioned. - RuntimeProfile::Counter* num_repartitions_; - - /// Number of partitions that have been spilled. - RuntimeProfile::Counter* num_spilled_partitions_; - - /// The largest fraction after repartitioning. This is expected to be - /// 1 / PARTITION_FANOUT. A value much larger indicates skew. - RuntimeProfile::HighWaterMarkCounter* largest_partition_percent_; - - /// Time spent in streaming preagg algorithm. - RuntimeProfile::Counter* streaming_timer_; - - /// The number of rows passed through without aggregation. - RuntimeProfile::Counter* num_passthrough_rows_; - - /// The estimated reduction of the preaggregation. - RuntimeProfile::Counter* preagg_estimated_reduction_; - - /// Expose the minimum reduction factor to continue growing the hash tables. - RuntimeProfile::Counter* preagg_streaming_ht_min_reduction_; - - /// The estimated number of input rows from the planner. - int64_t estimated_input_cardinality_; - - ///////////////////////////////////////// - /// BEGIN: Members that must be Reset() - - /// Result of aggregation w/o GROUP BY. - /// Note: can be NULL even if there is no grouping if the result tuple is 0 width - /// e.g. select 1 from table group by col. - Tuple* singleton_output_tuple_; - bool singleton_output_tuple_returned_; - - /// Row batch used as argument to GetNext() for the child node preaggregations. Store - /// in node to avoid reallocating for every GetNext() call when streaming. - boost::scoped_ptr<RowBatch> child_batch_; - - /// If true, no more rows to output from partitions. - bool partition_eos_; - - /// True if no more rows to process from child. - bool child_eos_; - - /// Used for hash-related functionality, such as evaluating rows and calculating hashes. - /// It also owns the evaluators for the grouping and build expressions used during hash - /// table insertion and probing. - boost::scoped_ptr<HashTableCtx> ht_ctx_; - - /// Object pool that holds the Partition objects in hash_partitions_. - boost::scoped_ptr<ObjectPool> partition_pool_; - - /// Current partitions we are partitioning into. IMPALA-5788: For the case where we - /// rebuild a spilled partition that fits in memory, all pointers in this vector will - /// point to a single in-memory partition. - std::vector<Partition*> hash_partitions_; - - /// Cache for hash tables in 'hash_partitions_'. IMPALA-5788: For the case where we - /// rebuild a spilled partition that fits in memory, all pointers in this array will - /// point to the hash table that is a part of a single in-memory partition. - HashTable* hash_tbls_[PARTITION_FANOUT]; - - /// All partitions that have been spilled and need further processing. - std::deque<Partition*> spilled_partitions_; - - /// All partitions that are aggregated and can just return the results in GetNext(). - /// After consuming all the input, hash_partitions_ is split into spilled_partitions_ - /// and aggregated_partitions_, depending on if it was spilled or not. - std::deque<Partition*> aggregated_partitions_; - - /// END: Members that must be Reset() - ///////////////////////////////////////// - - /// The hash table and streams (aggregated and unaggregated) for an individual - /// partition. The streams of each partition always (i.e. regardless of level) - /// initially use small buffers. Streaming pre-aggregations do not spill and do not - /// require an unaggregated stream. - struct Partition { - Partition(PartitionedAggregationNode* parent, int level, int idx) - : parent(parent), is_closed(false), level(level), idx(idx) {} - - ~Partition(); - - /// Initializes aggregated_row_stream and unaggregated_row_stream (if a spilling - /// aggregation), allocating one buffer for each. Spilling merge aggregations must - /// have enough reservation for the initial buffer for the stream, so this should - /// not fail due to OOM. Preaggregations do not reserve any buffers: if does not - /// have enough reservation for the initial buffer, the aggregated row stream is not - /// created and an OK status is returned. - Status InitStreams() WARN_UNUSED_RESULT; - - /// Initializes the hash table. 'aggregated_row_stream' must be non-NULL. - /// Sets 'got_memory' to true if the hash table was initialised or false on OOM. - Status InitHashTable(bool* got_memory) WARN_UNUSED_RESULT; - - /// Called in case we need to serialize aggregated rows. This step effectively does - /// a merge aggregation in this node. - Status SerializeStreamForSpilling() WARN_UNUSED_RESULT; - - /// Closes this partition. If finalize_rows is true, this iterates over all rows - /// in aggregated_row_stream and finalizes them (this is only used in the cancellation - /// path). - void Close(bool finalize_rows); - - /// Spill this partition. 'more_aggregate_rows' = true means that more aggregate rows - /// may be appended to the the partition before appending unaggregated rows. On - /// success, one of the streams is left with a write iterator: the aggregated stream - /// if 'more_aggregate_rows' is true or the unaggregated stream otherwise. - Status Spill(bool more_aggregate_rows) WARN_UNUSED_RESULT; - - bool is_spilled() const { return hash_tbl.get() == NULL; } - - PartitionedAggregationNode* parent; - - /// If true, this partition is closed and there is nothing left to do. - bool is_closed; - - /// How many times rows in this partition have been repartitioned. Partitions created - /// from the node's children's input is level 0, 1 after the first repartitionining, - /// etc. - const int level; - - /// The index of this partition within 'hash_partitions_' at its level. - const int idx; - - /// Hash table for this partition. - /// Can be NULL if this partition is no longer maintaining a hash table (i.e. - /// is spilled or we are passing through all rows for this partition). - boost::scoped_ptr<HashTable> hash_tbl; - - /// Clone of parent's agg_fn_evals_. Permanent allocations come from - /// 'agg_fn_perm_pool' and result allocations come from the ExecNode's - /// 'expr_results_pool_'. - std::vector<AggFnEvaluator*> agg_fn_evals; - - /// Pool for permanent allocations for this partition's 'agg_fn_evals'. Freed at the - /// same times as 'agg_fn_evals' are closed: either when the partition is closed or - /// when it is spilled. - boost::scoped_ptr<MemPool> agg_fn_perm_pool; - - /// Tuple stream used to store aggregated rows. When the partition is not spilled, - /// (meaning the hash table is maintained), this stream is pinned and contains the - /// memory referenced by the hash table. When it is spilled, this consumes reservation - /// for a write buffer only during repartitioning of aggregated rows. - /// - /// For streaming preaggs, this may be NULL if sufficient memory is not available. - /// In that case hash_tbl is also NULL and all rows for the partition will be passed - /// through. - boost::scoped_ptr<BufferedTupleStream> aggregated_row_stream; - - /// Unaggregated rows that are spilled. Always NULL for streaming pre-aggregations. - /// Always unpinned. Has a write buffer allocated when the partition is spilled and - /// unaggregated rows are being processed. - boost::scoped_ptr<BufferedTupleStream> unaggregated_row_stream; - }; - - /// Stream used to store serialized spilled rows. Only used if needs_serialize_ - /// is set. This stream is never pinned and only used in Partition::Spill as a - /// a temporary buffer. - boost::scoped_ptr<BufferedTupleStream> serialize_stream_; - - /// Accessor for 'hash_tbls_' that verifies consistency with the partitions. - HashTable* ALWAYS_INLINE GetHashTable(int partition_idx) { - HashTable* ht = hash_tbls_[partition_idx]; - DCHECK_EQ(ht, hash_partitions_[partition_idx]->hash_tbl.get()); - return ht; - } - - /// Constructs singleton output tuple, allocating memory from pool. - Tuple* ConstructSingletonOutputTuple( - const std::vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool); - - /// Copies grouping values stored in 'ht_ctx_' that were computed over 'current_row_' - /// using 'grouping_expr_evals_'. Aggregation expr slots are set to their initial - /// values. Returns NULL if there was not enough memory to allocate the tuple or errors - /// occurred. In which case, 'status' is set. Allocates tuple and var-len data for - /// grouping exprs from stream. Var-len data for aggregate exprs is allocated from the - /// FunctionContexts, so is stored outside the stream. If stream's small buffers get - /// full, it will attempt to switch to IO-buffers. - Tuple* ConstructIntermediateTuple(const std::vector<AggFnEvaluator*>& agg_fn_evals, - BufferedTupleStream* stream, Status* status) noexcept; - - /// Constructs intermediate tuple, allocating memory from pool instead of the stream. - /// Returns NULL and sets status if there is not enough memory to allocate the tuple. - Tuple* ConstructIntermediateTuple(const std::vector<AggFnEvaluator*>& agg_fn_evals, - MemPool* pool, Status* status) noexcept; - - /// Returns the number of bytes of variable-length data for the grouping values stored - /// in 'ht_ctx_'. - int GroupingExprsVarlenSize(); - - /// Initializes intermediate tuple by copying grouping values stored in 'ht_ctx_' that - /// that were computed over 'current_row_' using 'grouping_expr_evals_'. Writes the - /// var-len data into buffer. 'buffer' points to the start of a buffer of at least the - /// size of the variable-length data: 'varlen_size'. - void CopyGroupingValues(Tuple* intermediate_tuple, uint8_t* buffer, int varlen_size); - - /// Initializes the aggregate function slots of an intermediate tuple. - /// Any var-len data is allocated from the FunctionContexts. - void InitAggSlots(const std::vector<AggFnEvaluator*>& agg_fn_evals, - Tuple* intermediate_tuple); - - /// Updates the given aggregation intermediate tuple with aggregation values computed - /// over 'row' using 'agg_fn_evals'. Whether the agg fn evaluator calls Update() or - /// Merge() is controlled by the evaluator itself, unless enforced explicitly by passing - /// in is_merge == true. The override is needed to merge spilled and non-spilled rows - /// belonging to the same partition independent of whether the agg fn evaluators have - /// is_merge() == true. - /// This function is replaced by codegen (which is why we don't use a vector argument - /// for agg_fn_evals).. Any var-len data is allocated from the FunctionContexts. - /// TODO: Fix the arguments order. Need to update CodegenUpdateTuple() too. - void UpdateTuple(AggFnEvaluator** agg_fn_evals, Tuple* tuple, TupleRow* row, - bool is_merge = false) noexcept; - - /// Called on the intermediate tuple of each group after all input rows have been - /// consumed and aggregated. Computes the final aggregate values to be returned in - /// GetNext() using the agg fn evaluators' Serialize() or Finalize(). - /// For the Finalize() case if the output tuple is different from the intermediate - /// tuple, then a new tuple is allocated from 'pool' to hold the final result. - /// Grouping values are copied into the output tuple and the the output tuple holding - /// the finalized/serialized aggregate values is returned. - /// TODO: Coordinate the allocation of new tuples with the release of memory - /// so as not to make memory consumption blow up. - Tuple* GetOutputTuple(const std::vector<AggFnEvaluator*>& agg_fn_evals, - Tuple* tuple, MemPool* pool); - - /// Do the aggregation for all tuple rows in the batch when there is no grouping. - /// This function is replaced by codegen. - Status ProcessBatchNoGrouping(RowBatch* batch) WARN_UNUSED_RESULT; - - /// Processes a batch of rows. This is the core function of the algorithm. We partition - /// the rows into hash_partitions_, spilling as necessary. - /// If AGGREGATED_ROWS is true, it means that the rows in the batch are already - /// pre-aggregated. - /// 'prefetch_mode' specifies the prefetching mode in use. If it's not PREFETCH_NONE, - /// hash table buckets will be prefetched based on the hash values computed. Note - /// that 'prefetch_mode' will be substituted with constants during codegen time. - // - /// This function is replaced by codegen. We pass in ht_ctx_.get() as an argument for - /// performance. - template <bool AGGREGATED_ROWS> - Status IR_ALWAYS_INLINE ProcessBatch(RowBatch* batch, TPrefetchMode::type prefetch_mode, - HashTableCtx* ht_ctx) WARN_UNUSED_RESULT; - - /// Evaluates the rows in 'batch' starting at 'start_row_idx' and stores the results in - /// the expression values cache in 'ht_ctx'. The number of rows evaluated depends on - /// the capacity of the cache. 'prefetch_mode' specifies the prefetching mode in use. - /// If it's not PREFETCH_NONE, hash table buckets for the computed hashes will be - /// prefetched. Note that codegen replaces 'prefetch_mode' with a constant. - template<bool AGGREGATED_ROWS> - void EvalAndHashPrefetchGroup(RowBatch* batch, int start_row_idx, - TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx); - - /// This function processes each individual row in ProcessBatch(). Must be inlined into - /// ProcessBatch for codegen to substitute function calls with codegen'd versions. - /// May spill partitions if not enough memory is available. - template <bool AGGREGATED_ROWS> - Status IR_ALWAYS_INLINE ProcessRow( - TupleRow* row, HashTableCtx* ht_ctx) WARN_UNUSED_RESULT; - - /// Create a new intermediate tuple in partition, initialized with row. ht_ctx is - /// the context for the partition's hash table and hash is the precomputed hash of - /// the row. The row can be an unaggregated or aggregated row depending on - /// AGGREGATED_ROWS. Spills partitions if necessary to append the new intermediate - /// tuple to the partition's stream. Must be inlined into ProcessBatch for codegen - /// to substitute function calls with codegen'd versions. insert_it is an iterator - /// for insertion returned from HashTable::FindBuildRowBucket(). - template <bool AGGREGATED_ROWS> - Status IR_ALWAYS_INLINE AddIntermediateTuple(Partition* partition, TupleRow* row, - uint32_t hash, HashTable::Iterator insert_it) WARN_UNUSED_RESULT; - - /// Append a row to a spilled partition. The row may be aggregated or unaggregated - /// according to AGGREGATED_ROWS. May spill partitions if needed to append the row - /// buffers. - template <bool AGGREGATED_ROWS> - Status IR_ALWAYS_INLINE AppendSpilledRow( - Partition* partition, TupleRow* row) WARN_UNUSED_RESULT; - - /// Reads all the rows from input_stream and process them by calling ProcessBatch(). - template <bool AGGREGATED_ROWS> - Status ProcessStream(BufferedTupleStream* input_stream) WARN_UNUSED_RESULT; - - /// Output 'singleton_output_tuple_' and transfer memory to 'row_batch'. - void GetSingletonOutput(RowBatch* row_batch); - - /// Get rows for the next rowbatch from the next partition. Sets 'partition_eos_' to - /// true if all rows from all partitions have been returned or the limit is reached. - Status GetRowsFromPartition( - RuntimeState* state, RowBatch* row_batch) WARN_UNUSED_RESULT; - - /// Get output rows from child for streaming pre-aggregation. Aggregates some rows with - /// hash table and passes through other rows converted into the intermediate - /// tuple format. Sets 'child_eos_' once all rows from child have been returned. - Status GetRowsStreaming(RuntimeState* state, RowBatch* row_batch) WARN_UNUSED_RESULT; - - /// Return true if we should keep expanding hash tables in the preagg. If false, - /// the preagg should pass through any rows it can't fit in its tables. - bool ShouldExpandPreaggHashTables() const; - - /// Streaming processing of in_batch from child. Rows from child are either aggregated - /// into the hash table or added to 'out_batch' in the intermediate tuple format. - /// 'in_batch' is processed entirely, and 'out_batch' must have enough capacity to - /// store all of the rows in 'in_batch'. - /// 'needs_serialize' is an argument so that codegen can replace it with a constant, - /// rather than using the member variable 'needs_serialize_'. - /// 'prefetch_mode' specifies the prefetching mode in use. If it's not PREFETCH_NONE, - /// hash table buckets will be prefetched based on the hash values computed. Note - /// that 'prefetch_mode' will be substituted with constants during codegen time. - /// 'remaining_capacity' is an array with PARTITION_FANOUT entries with the number of - /// additional rows that can be added to the hash table per partition. It is updated - /// by ProcessBatchStreaming() when it inserts new rows. - /// 'ht_ctx' is passed in as a way to avoid aliasing of 'this' confusing the optimiser. - Status ProcessBatchStreaming(bool needs_serialize, TPrefetchMode::type prefetch_mode, - RowBatch* in_batch, RowBatch* out_batch, HashTableCtx* ht_ctx, - int remaining_capacity[PARTITION_FANOUT]) WARN_UNUSED_RESULT; - - /// Tries to add intermediate to the hash table 'hash_tbl' of 'partition' for streaming - /// aggregation. The input row must have been evaluated with 'ht_ctx', with 'hash' set - /// to the corresponding hash. If the tuple already exists in the hash table, update - /// the tuple and return true. Otherwise try to create a new entry in the hash table, - /// returning true if successful or false if the table is full. 'remaining_capacity' - /// keeps track of how many more entries can be added to the hash table so we can avoid - /// retrying inserts. It is decremented if an insert succeeds and set to zero if an - /// insert fails. If an error occurs, returns false and sets 'status'. - bool IR_ALWAYS_INLINE TryAddToHashTable(HashTableCtx* ht_ctx, Partition* partition, - HashTable* hash_tbl, TupleRow* in_row, uint32_t hash, int* remaining_capacity, - Status* status) WARN_UNUSED_RESULT; - - /// Initializes hash_partitions_. 'level' is the level for the partitions to create. - /// If 'single_partition_idx' is provided, it must be a number in range - /// [0, PARTITION_FANOUT), and only that partition is created - all others point to it. - /// Also sets ht_ctx_'s level to 'level'. - Status CreateHashPartitions( - int level, int single_partition_idx = -1) WARN_UNUSED_RESULT; - - /// Ensure that hash tables for all in-memory partitions are large enough to fit - /// 'num_rows' additional hash table entries. If there is not enough memory to - /// resize the hash tables, may spill partitions. 'aggregated_rows' is true if - /// we're currently partitioning aggregated rows. - Status CheckAndResizeHashPartitions( - bool aggregated_rows, int num_rows, const HashTableCtx* ht_ctx) WARN_UNUSED_RESULT; - - /// Prepares the next partition to return results from. On return, this function - /// initializes output_iterator_ and output_partition_. This either removes - /// a partition from aggregated_partitions_ (and is done) or removes the next - /// partition from aggregated_partitions_ and repartitions it. - Status NextPartition() WARN_UNUSED_RESULT; - - /// Tries to build the first partition in 'spilled_partitions_'. - /// If successful, set *built_partition to the partition. The caller owns the partition - /// and is responsible for closing it. If unsuccessful because the partition could not - /// fit in memory, set *built_partition to NULL and append the spilled partition to the - /// head of 'spilled_partitions_' so it can be processed by - /// RepartitionSpilledPartition(). - Status BuildSpilledPartition(Partition** built_partition) WARN_UNUSED_RESULT; - - /// Repartitions the first partition in 'spilled_partitions_' into PARTITION_FANOUT - /// output partitions. On success, each output partition is either: - /// * closed, if no rows were added to the partition. - /// * in 'spilled_partitions_', if the partition spilled. - /// * in 'aggregated_partitions_', if the output partition was not spilled. - Status RepartitionSpilledPartition() WARN_UNUSED_RESULT; - - /// Picks a partition from 'hash_partitions_' to spill. 'more_aggregate_rows' is passed - /// to Partition::Spill() when spilling the partition. See the Partition::Spill() - /// comment for further explanation. - Status SpillPartition(bool more_aggregate_rows) WARN_UNUSED_RESULT; - - /// Moves the partitions in hash_partitions_ to aggregated_partitions_ or - /// spilled_partitions_. Partitions moved to spilled_partitions_ are unpinned. - /// input_rows is the number of input rows that have been repartitioned. - /// Used for diagnostics. - Status MoveHashPartitions(int64_t input_rows) WARN_UNUSED_RESULT; - - /// Adds a partition to the front of 'spilled_partitions_' for later processing. - /// 'spilled_partitions_' uses LIFO so more finely partitioned partitions are processed - /// first). This allows us to delete pages earlier and bottom out the recursion - /// earlier and also improves time locality of access to spilled data on disk. - void PushSpilledPartition(Partition* partition); - - /// Calls Close() on every Partition in 'aggregated_partitions_', - /// 'spilled_partitions_', and 'hash_partitions_' and then resets the lists, - /// the vector and the partition pool. - void ClosePartitions(); - - /// Calls finalizes on all tuples starting at 'it'. - void CleanupHashTbl(const std::vector<AggFnEvaluator*>& agg_fn_evals, - HashTable::Iterator it); - - /// Codegen for updating aggregate expressions agg_fns_[agg_fn_idx] - /// and returns the IR function in 'fn'. Returns non-OK status if codegen - /// is unsuccessful. - Status CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx, - SlotDescriptor* slot_desc, llvm::Function** fn) WARN_UNUSED_RESULT; - - /// Codegen a call to a function implementing the UDA interface with input values - /// from 'input_vals'. 'dst_val' should contain the previous value of the aggregate - /// function, and 'updated_dst_val' is set to the new value after the Update or Merge - /// operation is applied. The instruction sequence for the UDA call is inserted at - /// the insert position of 'builder'. - Status CodegenCallUda(LlvmCodeGen* codegen, LlvmBuilder* builder, AggFn* agg_fn, - llvm::Value* agg_fn_ctx_arg, const std::vector<CodegenAnyVal>& input_vals, - const CodegenAnyVal& dst_val, CodegenAnyVal* updated_dst_val) WARN_UNUSED_RESULT; - - /// Codegen UpdateTuple(). Returns non-OK status if codegen is unsuccessful. - Status CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn) WARN_UNUSED_RESULT; - - /// Codegen the non-streaming process row batch loop. The loop has already been - /// compiled to IR and loaded into the codegen object. UpdateAggTuple has also been - /// codegen'd to IR. This function will modify the loop subsituting the statically - /// compiled functions with codegen'd ones. 'process_batch_fn_' or - /// 'process_batch_no_grouping_fn_' will be updated with the codegened function - /// depending on whether this is a grouping or non-grouping aggregation. - /// Assumes AGGREGATED_ROWS = false. - Status CodegenProcessBatch( - LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT; - - /// Codegen the materialization loop for streaming preaggregations. - /// 'process_batch_streaming_fn_' will be updated with the codegened function. - Status CodegenProcessBatchStreaming( - LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT; - - /// Compute minimum buffer reservation for grouping aggregations. - /// We need one buffer per partition, which is used either as the write buffer for the - /// aggregated stream or the unaggregated stream. We need an additional buffer to read - /// the stream we are currently repartitioning. The read buffer needs to be a max-sized - /// buffer to hold a max-sized row and we need one max-sized write buffer that is used - /// temporarily to append a row to any stream. - /// - /// If we need to serialize, we need an additional buffer while spilling a partition - /// as the partitions aggregate stream needs to be serialized and rewritten. - /// We do not spill streaming preaggregations, so we do not need to reserve any buffers. - int64_t MinReservation() const { - DCHECK(!grouping_exprs_.empty()); - // Must be kept in sync with AggregationNode.computeNodeResourceProfile() in fe. - if (is_streaming_preagg_) { - // Reserve at least one buffer and a 64kb hash table per partition. - return (resource_profile_.spillable_buffer_size + 64 * 1024) * PARTITION_FANOUT; - } - int num_buffers = PARTITION_FANOUT + 1 + (needs_serialize_ ? 1 : 0); - // Two of the buffers must fit the maximum row. - return resource_profile_.spillable_buffer_size * (num_buffers - 2) + - resource_profile_.max_row_buffer_size * 2; - } -}; -} - -#endif http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/streaming-aggregation-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/streaming-aggregation-node.cc b/be/src/exec/streaming-aggregation-node.cc new file mode 100644 index 0000000..4ad7820 --- /dev/null +++ b/be/src/exec/streaming-aggregation-node.cc @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/streaming-aggregation-node.h" + +#include <sstream> + +#include "gutil/strings/substitute.h" +#include "runtime/row-batch.h" +#include "runtime/runtime-state.h" +#include "util/runtime-profile-counters.h" + +#include "gen-cpp/PlanNodes_types.h" + +#include "common/names.h" + +namespace impala { + +StreamingAggregationNode::StreamingAggregationNode( + ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + : ExecNode(pool, tnode, descs), child_eos_(false) { + DCHECK(conjunct_evals_.empty()) << "Preaggs have no conjuncts"; + DCHECK(!tnode.agg_node.grouping_exprs.empty()) << "Streaming preaggs do grouping"; + DCHECK(limit_ == -1) << "Preaggs have no limits"; +} + +Status StreamingAggregationNode::Init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(ExecNode::Init(tnode, state)); + aggregator_.reset(new GroupingAggregator(this, pool_, tnode, state->desc_tbl())); + RETURN_IF_ERROR(aggregator_->Init(tnode, state)); + runtime_profile_->AddChild(aggregator_->runtime_profile()); + return Status::OK(); +} + +Status StreamingAggregationNode::Prepare(RuntimeState* state) { + SCOPED_TIMER(runtime_profile_->total_time_counter()); + RETURN_IF_ERROR(ExecNode::Prepare(state)); + aggregator_->SetDebugOptions(debug_options_); + RETURN_IF_ERROR(aggregator_->Prepare(state)); + state->CheckAndAddCodegenDisabledMessage(runtime_profile()); + return Status::OK(); +} + +void StreamingAggregationNode::Codegen(RuntimeState* state) { + DCHECK(state->ShouldCodegen()); + ExecNode::Codegen(state); + if (IsNodeCodegenDisabled()) return; + + aggregator_->Codegen(state); +} + +Status StreamingAggregationNode::Open(RuntimeState* state) { + SCOPED_TIMER(runtime_profile_->total_time_counter()); + // Open the child before consuming resources in this node. + RETURN_IF_ERROR(child(0)->Open(state)); + RETURN_IF_ERROR(ExecNode::Open(state)); + + RETURN_IF_ERROR(aggregator_->Open(state)); + + // Streaming preaggregations do all processing in GetNext(). + return Status::OK(); +} + +Status StreamingAggregationNode::GetNext( + RuntimeState* state, RowBatch* row_batch, bool* eos) { + SCOPED_TIMER(runtime_profile_->total_time_counter()); + RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state)); + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(QueryMaintenance(state)); + + if (ReachedLimit()) { + *eos = true; + return Status::OK(); + } + + bool aggregator_eos = false; + if (!child_eos_) { + // For streaming preaggregations, we process rows from the child as we go. + RETURN_IF_ERROR(GetRowsStreaming(state, row_batch)); + } else { + RETURN_IF_ERROR(aggregator_->GetNext(state, row_batch, &aggregator_eos)); + } + + *eos = aggregator_eos && child_eos_; + num_rows_returned_ += row_batch->num_rows(); + COUNTER_SET(rows_returned_counter_, num_rows_returned_); + return Status::OK(); +} + +Status StreamingAggregationNode::GetRowsStreaming( + RuntimeState* state, RowBatch* out_batch) { + DCHECK(!child_eos_); + + if (child_batch_ == nullptr) { + child_batch_.reset( + new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker())); + } + + do { + DCHECK_EQ(out_batch->num_rows(), 0); + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(QueryMaintenance(state)); + + RETURN_IF_ERROR(child(0)->GetNext(state, child_batch_.get(), &child_eos_)); + + RETURN_IF_ERROR(aggregator_->AddBatchStreaming(state, out_batch, child_batch_.get())); + child_batch_->Reset(); // All rows from child_batch_ were processed. + } while (out_batch->num_rows() == 0 && !child_eos_); + + if (child_eos_) { + child(0)->Close(state); + child_batch_.reset(); + RETURN_IF_ERROR(aggregator_->InputDone()); + } + + return Status::OK(); +} + +Status StreamingAggregationNode::Reset(RuntimeState* state) { + DCHECK(false) << "Cannot reset preaggregation"; + return Status("Cannot reset preaggregation"); +} + +void StreamingAggregationNode::Close(RuntimeState* state) { + if (is_closed()) return; + aggregator_->Close(state); + child_batch_.reset(); + ExecNode::Close(state); +} + +void StreamingAggregationNode::DebugString( + int indentation_level, stringstream* out) const { + *out << string(indentation_level * 2, ' '); + *out << "StreamingAggregationNode(" + << "aggregator=" << aggregator_->DebugString(); + ExecNode::DebugString(indentation_level, out); + *out << ")"; +} +} // namespace impala http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/streaming-aggregation-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/streaming-aggregation-node.h b/be/src/exec/streaming-aggregation-node.h new file mode 100644 index 0000000..8e06b2a --- /dev/null +++ b/be/src/exec/streaming-aggregation-node.h @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_EXEC_STREAMING_AGGREGATION_NODE_H +#define IMPALA_EXEC_STREAMING_AGGREGATION_NODE_H + +#include <memory> + +#include "exec/exec-node.h" +#include "exec/grouping-aggregator.h" + +namespace impala { + +class RowBatch; +class RuntimeState; + +/// Node for doing streaming partitioned hash aggregation. +/// +/// This node consumes the input from child(0) during GetNext() and then passes it to the +/// Aggregator, which does the actual work of aggregating. The aggregator will attempt to +/// aggregate the rows into its hash table, but if there is not enough memory available or +/// if the reduction from the aggregation is not very good, it will 'stream' the rows +/// through and return them without aggregating them instead of spilling. After all of the +/// input as been processed from child(0), subsequent calls to GetNext() will return any +/// rows that were aggregated in the Aggregator's hash table. +/// +/// Since the rows returned by GetNext() may be only partially aggregated if there are +/// memory contraints, this is a preliminary aggregation step that functions as an +/// optimization and will always be followed in the plan by an AggregationNode that does +/// the final aggregation. +/// +/// This node only supports grouping aggregations. +class StreamingAggregationNode : public ExecNode { + public: + StreamingAggregationNode( + ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + + virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override; + virtual Status Prepare(RuntimeState* state) override; + virtual void Codegen(RuntimeState* state) override; + virtual Status Open(RuntimeState* state) override; + virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override; + virtual Status Reset(RuntimeState* state) override; + virtual void Close(RuntimeState* state) override; + + virtual void DebugString(int indentation_level, std::stringstream* out) const override; + + private: + ///////////////////////////////////////// + /// BEGIN: Members that must be Reset() + + /// Row batch used as argument to GetNext() for the child node preaggregations. Store + /// in node to avoid reallocating for every GetNext() call when streaming. + std::unique_ptr<RowBatch> child_batch_; + + /// True if no more rows to process from child. + bool child_eos_; + + std::unique_ptr<GroupingAggregator> aggregator_; + + /// END: Members that must be Reset() + ///////////////////////////////////////// + + /// Get output rows from child for streaming pre-aggregation. Aggregates some rows with + /// hash table and passes through other rows converted into the intermediate + /// tuple format. Sets 'child_eos_' once all rows from child have been returned. + Status GetRowsStreaming(RuntimeState* state, RowBatch* row_batch) WARN_UNUSED_RESULT; +}; +} // namespace impala + +#endif // IMPALA_EXEC_STREAMING_AGGREGATION_NODE_H
