http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/grouping-aggregator.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc new file mode 100644 index 0000000..60001a6 --- /dev/null +++ b/be/src/exec/grouping-aggregator.cc @@ -0,0 +1,1098 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/grouping-aggregator.h" + +#include <sstream> + +#include "codegen/llvm-codegen.h" +#include "exec/exec-node.h" +#include "exec/hash-table.inline.h" +#include "exprs/agg-fn-evaluator.h" +#include "exprs/scalar-expr.h" +#include "exprs/slot-ref.h" +#include "gutil/strings/substitute.h" +#include "runtime/buffered-tuple-stream.inline.h" +#include "runtime/descriptors.h" +#include "runtime/exec-env.h" +#include "runtime/mem-pool.h" +#include "runtime/mem-tracker.h" +#include "runtime/row-batch.h" +#include "runtime/runtime-state.h" +#include "runtime/tuple-row.h" +#include "runtime/tuple.h" +#include "util/runtime-profile-counters.h" + +#include "gen-cpp/PlanNodes_types.h" + +#include "common/names.h" + +namespace impala { + +/// The minimum reduction factor (input rows divided by output rows) to grow hash tables +/// in a streaming preaggregation, given that the hash tables are currently the given +/// size or above. The sizes roughly correspond to hash table sizes where the bucket +/// arrays will fit in a cache level. Intuitively, we don't want the working set of the +/// aggregation to expand to the next level of cache unless we're reducing the input +/// enough to outweigh the increased memory latency we'll incur for each hash table +/// lookup. +/// +/// Note that the current reduction achieved is not always a good estimate of the +/// final reduction. It may be biased either way depending on the ordering of the +/// input. If the input order is random, we will underestimate the final reduction +/// factor because the probability of a row having the same key as a previous row +/// increases as more input is processed. If the input order is correlated with the +/// key, skew may bias the estimate. If high cardinality keys appear first, we +/// may overestimate and if low cardinality keys appear first, we underestimate. +/// To estimate the eventual reduction achieved, we estimate the final reduction +/// using the planner's estimated input cardinality and the assumption that input +/// is in a random order. This means that we assume that the reduction factor will +/// increase over time. +struct StreamingHtMinReductionEntry { + // Use 'streaming_ht_min_reduction' if the total size of hash table bucket directories + // in bytes is greater than this threshold. + int min_ht_mem; + // The minimum reduction factor to expand the hash tables. + double streaming_ht_min_reduction; +}; + +// TODO: experimentally tune these values and also programmatically get the cache size +// of the machine that we're running on. +static const StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = { + // Expand up to L2 cache always. + {0, 0.0}, + // Expand into L3 cache if we look like we're getting some reduction. + {256 * 1024, 1.1}, + // Expand into main memory if we're getting a significant reduction. + {2 * 1024 * 1024, 2.0}, +}; + +static const int STREAMING_HT_MIN_REDUCTION_SIZE = + sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]); + +GroupingAggregator::GroupingAggregator(ExecNode* exec_node, ObjectPool* pool, + const TPlanNode& tnode, const DescriptorTbl& descs) + : Aggregator(exec_node, pool, tnode, descs, "GroupingAggregator"), + intermediate_row_desc_(intermediate_tuple_desc_, false), + is_streaming_preagg_(tnode.agg_node.use_streaming_preaggregation), + needs_serialize_(false), + output_partition_(nullptr), + resource_profile_(exec_node->resource_profile()), + num_input_rows_(0), + is_in_subplan_(exec_node->IsInSubplan()), + limit_(exec_node->limit()), + add_batch_impl_fn_(nullptr), + add_batch_streaming_impl_fn_(nullptr), + ht_resize_timer_(nullptr), + get_results_timer_(nullptr), + num_hash_buckets_(nullptr), + partitions_created_(nullptr), + max_partition_level_(nullptr), + num_row_repartitioned_(nullptr), + num_repartitions_(nullptr), + num_spilled_partitions_(nullptr), + largest_partition_percent_(nullptr), + streaming_timer_(nullptr), + num_passthrough_rows_(nullptr), + preagg_estimated_reduction_(nullptr), + preagg_streaming_ht_min_reduction_(nullptr), + estimated_input_cardinality_(tnode.agg_node.estimated_input_cardinality), + partition_eos_(false), + partition_pool_(new ObjectPool()) { + DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS); +} + +Status GroupingAggregator::Init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(ScalarExpr::Create( + tnode.agg_node.grouping_exprs, input_row_desc_, state, &grouping_exprs_)); + + // Construct build exprs from intermediate_row_desc_ + for (int i = 0; i < grouping_exprs_.size(); ++i) { + SlotDescriptor* desc = intermediate_tuple_desc_->slots()[i]; + DCHECK(desc->type().type == TYPE_NULL || desc->type() == grouping_exprs_[i]->type()); + // Hack to avoid TYPE_NULL SlotRefs. + SlotRef* build_expr = + pool_->Add(desc->type().type != TYPE_NULL ? new SlotRef(desc) : + new SlotRef(desc, TYPE_BOOLEAN)); + build_exprs_.push_back(build_expr); + RETURN_IF_ERROR(build_expr->Init(intermediate_row_desc_, state)); + if (build_expr->type().IsVarLenStringType()) string_grouping_exprs_.push_back(i); + } + + RETURN_IF_ERROR(Aggregator::Init(tnode, state)); + for (int i = 0; i < agg_fns_.size(); ++i) { + needs_serialize_ |= agg_fns_[i]->SupportsSerialize(); + } + return Status::OK(); +} + +Status GroupingAggregator::Prepare(RuntimeState* state) { + RETURN_IF_ERROR(Aggregator::Prepare(state)); + state_ = state; + tuple_pool_.reset(new MemPool(mem_tracker_.get())); + + ht_resize_timer_ = ADD_TIMER(runtime_profile(), "HTResizeTime"); + get_results_timer_ = ADD_TIMER(runtime_profile(), "GetResultsTime"); + num_hash_buckets_ = ADD_COUNTER(runtime_profile(), "HashBuckets", TUnit::UNIT); + partitions_created_ = ADD_COUNTER(runtime_profile(), "PartitionsCreated", TUnit::UNIT); + largest_partition_percent_ = + runtime_profile()->AddHighWaterMarkCounter("LargestPartitionPercent", TUnit::UNIT); + if (is_streaming_preagg_) { + runtime_profile()->AppendExecOption("Streaming Preaggregation"); + streaming_timer_ = ADD_TIMER(runtime_profile(), "StreamingTime"); + num_passthrough_rows_ = + ADD_COUNTER(runtime_profile(), "RowsPassedThrough", TUnit::UNIT); + preagg_estimated_reduction_ = + ADD_COUNTER(runtime_profile(), "ReductionFactorEstimate", TUnit::DOUBLE_VALUE); + preagg_streaming_ht_min_reduction_ = ADD_COUNTER( + runtime_profile(), "ReductionFactorThresholdToExpand", TUnit::DOUBLE_VALUE); + } else { + num_row_repartitioned_ = + ADD_COUNTER(runtime_profile(), "RowsRepartitioned", TUnit::UNIT); + num_repartitions_ = ADD_COUNTER(runtime_profile(), "NumRepartitions", TUnit::UNIT); + num_spilled_partitions_ = + ADD_COUNTER(runtime_profile(), "SpilledPartitions", TUnit::UNIT); + max_partition_level_ = + runtime_profile()->AddHighWaterMarkCounter("MaxPartitionLevel", TUnit::UNIT); + } + + RETURN_IF_ERROR(HashTableCtx::Create(pool_, state, build_exprs_, grouping_exprs_, true, + vector<bool>(build_exprs_.size(), true), state->fragment_hash_seed(), + MAX_PARTITION_DEPTH, 1, expr_perm_pool_.get(), expr_results_pool_.get(), + expr_results_pool_.get(), &ht_ctx_)); + + reservation_manager_.Init( + Substitute("GroupingAggregator id=$0 ptr=$1", id_, this), runtime_profile_, + mem_tracker_.get(), resource_profile_, debug_options_); + return Status::OK(); +} + +void GroupingAggregator::Codegen(RuntimeState* state) { + LlvmCodeGen* codegen = state->codegen(); + DCHECK(codegen != nullptr); + TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode; + Status codegen_status = is_streaming_preagg_ ? + CodegenAddBatchStreamingImpl(codegen, prefetch_mode) : + CodegenAddBatchImpl(codegen, prefetch_mode); + runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status); +} + +Status GroupingAggregator::Open(RuntimeState* state) { + RETURN_IF_ERROR(Aggregator::Open(state)); + + // Claim reservation after the child has been opened to reduce the peak reservation + // requirement. + if (!buffer_pool_client()->is_registered()) { + DCHECK_GE(resource_profile_.min_reservation, MinReservation()); + RETURN_IF_ERROR(reservation_manager_.ClaimBufferReservation(state)); + } + + DCHECK(ht_ctx_.get() != nullptr); + RETURN_IF_ERROR(ht_ctx_->Open(state)); + + if (ht_allocator_ == nullptr) { + // Allocate 'serialize_stream_' and 'ht_allocator_' on the first Open() call. + ht_allocator_.reset(new Suballocator(state_->exec_env()->buffer_pool(), + buffer_pool_client(), resource_profile_.spillable_buffer_size)); + + if (!is_streaming_preagg_ && needs_serialize_) { + serialize_stream_.reset(new BufferedTupleStream(state, &intermediate_row_desc_, + buffer_pool_client(), resource_profile_.spillable_buffer_size, + resource_profile_.max_row_buffer_size)); + RETURN_IF_ERROR(serialize_stream_->Init(id_, false)); + bool got_buffer; + // Reserve the memory for 'serialize_stream_' so we don't need to scrounge up + // another buffer during spilling. + RETURN_IF_ERROR(serialize_stream_->PrepareForWrite(&got_buffer)); + DCHECK(got_buffer) << "Accounted in min reservation" + << buffer_pool_client()->DebugString(); + DCHECK(serialize_stream_->has_write_iterator()); + } + } + RETURN_IF_ERROR(CreateHashPartitions(0)); + return Status::OK(); +} + +Status GroupingAggregator::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { + if (!partition_eos_) { + RETURN_IF_ERROR(GetRowsFromPartition(state, row_batch)); + } + *eos = partition_eos_; + return Status::OK(); +} + +Status GroupingAggregator::GetRowsFromPartition( + RuntimeState* state, RowBatch* row_batch) { + DCHECK(!row_batch->AtCapacity()); + if (output_iterator_.AtEnd()) { + // Done with this partition, move onto the next one. + if (output_partition_ != nullptr) { + output_partition_->Close(false); + output_partition_ = nullptr; + } + if (aggregated_partitions_.empty() && spilled_partitions_.empty()) { + // No more partitions, all done. + partition_eos_ = true; + return Status::OK(); + } + // Process next partition. + RETURN_IF_ERROR(NextPartition()); + DCHECK(output_partition_ != nullptr); + } + + SCOPED_TIMER(get_results_timer_); + + // The output row batch may reference memory allocated by Serialize() or Finalize(), + // allocating that memory directly from the row batch's pool means we can safely return + // the batch. + vector<ScopedResultsPool> allocate_from_batch_pool = ScopedResultsPool::Create( + output_partition_->agg_fn_evals, row_batch->tuple_data_pool()); + int count = 0; + const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size()); + // Keeping returning rows from the current partition. + while (!output_iterator_.AtEnd() && !row_batch->AtCapacity()) { + // This loop can go on for a long time if the conjuncts are very selective. Do query + // maintenance every N iterations. + if ((count++ & (N - 1)) == 0) { + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(QueryMaintenance(state)); + } + + int row_idx = row_batch->AddRow(); + TupleRow* row = row_batch->GetRow(row_idx); + Tuple* intermediate_tuple = output_iterator_.GetTuple(); + Tuple* output_tuple = GetOutputTuple(output_partition_->agg_fn_evals, + intermediate_tuple, row_batch->tuple_data_pool()); + output_iterator_.Next(); + row->SetTuple(0, output_tuple); + DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size()); + if (ExecNode::EvalConjuncts(conjunct_evals_.data(), conjuncts_.size(), row)) { + row_batch->CommitLastRow(); + ++num_rows_returned_; + if (ReachedLimit()) break; + } + } + + COUNTER_SET(rows_returned_counter_, num_rows_returned_); + partition_eos_ = ReachedLimit(); + if (output_iterator_.AtEnd()) row_batch->MarkNeedsDeepCopy(); + + return Status::OK(); +} + +bool GroupingAggregator::ShouldExpandPreaggHashTables() const { + int64_t ht_mem = 0; + int64_t ht_rows = 0; + for (int i = 0; i < PARTITION_FANOUT; ++i) { + HashTable* ht = hash_partitions_[i]->hash_tbl.get(); + ht_mem += ht->CurrentMemSize(); + ht_rows += ht->size(); + } + + // Need some rows in tables to have valid statistics. + if (ht_rows == 0) return true; + + // Find the appropriate reduction factor in our table for the current hash table sizes. + int cache_level = 0; + while (cache_level + 1 < STREAMING_HT_MIN_REDUCTION_SIZE + && ht_mem >= STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) { + ++cache_level; + } + + // Compare the number of rows in the hash table with the number of input rows that + // were aggregated into it. Exclude passed through rows from this calculation since + // they were not in hash tables. + const int64_t aggregated_input_rows = num_input_rows_ - num_rows_returned_; + const int64_t expected_input_rows = estimated_input_cardinality_ - num_rows_returned_; + double current_reduction = static_cast<double>(aggregated_input_rows) / ht_rows; + + // TODO: workaround for IMPALA-2490: subplan node rows_returned counter may be + // inaccurate, which could lead to a divide by zero below. + if (aggregated_input_rows <= 0) return true; + + // Extrapolate the current reduction factor (r) using the formula + // R = 1 + (N / n) * (r - 1), where R is the reduction factor over the full input data + // set, N is the number of input rows, excluding passed-through rows, and n is the + // number of rows inserted or merged into the hash tables. This is a very rough + // approximation but is good enough to be useful. + // TODO: consider collecting more statistics to better estimate reduction. + double estimated_reduction = aggregated_input_rows >= expected_input_rows ? + current_reduction : + 1 + (expected_input_rows / aggregated_input_rows) * (current_reduction - 1); + double min_reduction = + STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction; + + COUNTER_SET(preagg_estimated_reduction_, estimated_reduction); + COUNTER_SET(preagg_streaming_ht_min_reduction_, min_reduction); + return estimated_reduction > min_reduction; +} + +void GroupingAggregator::CleanupHashTbl( + const vector<AggFnEvaluator*>& agg_fn_evals, HashTable::Iterator it) { + if (!needs_finalize_ && !needs_serialize_) return; + + // Iterate through the remaining rows in the hash table and call Serialize/Finalize on + // them in order to free any memory allocated by UDAs. + if (needs_finalize_) { + // Finalize() requires a dst tuple but we don't actually need the result, + // so allocate a single dummy tuple to avoid accumulating memory. + Tuple* dummy_dst = nullptr; + dummy_dst = Tuple::Create(output_tuple_desc_->byte_size(), tuple_pool_.get()); + while (!it.AtEnd()) { + Tuple* tuple = it.GetTuple(); + AggFnEvaluator::Finalize(agg_fn_evals, tuple, dummy_dst); + it.Next(); + // Free any expr result allocations to prevent them accumulating excessively. + expr_results_pool_->Clear(); + } + } else { + while (!it.AtEnd()) { + Tuple* tuple = it.GetTuple(); + AggFnEvaluator::Serialize(agg_fn_evals, tuple); + it.Next(); + // Free any expr result allocations to prevent them accumulating excessively. + expr_results_pool_->Clear(); + } + } +} + +Status GroupingAggregator::Reset(RuntimeState* state) { + DCHECK(!is_streaming_preagg_) << "Cannot reset preaggregation"; + partition_eos_ = false; + // Reset the HT and the partitions for this grouping agg. + ht_ctx_->set_level(0); + ClosePartitions(); + return Status::OK(); +} + +void GroupingAggregator::Close(RuntimeState* state) { + // Iterate through the remaining rows in the hash table and call Serialize/Finalize on + // them in order to free any memory allocated by UDAs + if (output_partition_ != nullptr) { + CleanupHashTbl(output_partition_->agg_fn_evals, output_iterator_); + output_partition_->Close(false); + } + + ClosePartitions(); + + if (tuple_pool_.get() != nullptr) tuple_pool_->FreeAll(); + if (ht_ctx_.get() != nullptr) ht_ctx_->Close(state); + ht_ctx_.reset(); + if (serialize_stream_.get() != nullptr) { + serialize_stream_->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES); + } + ScalarExpr::Close(grouping_exprs_); + ScalarExpr::Close(build_exprs_); + + reservation_manager_.Close(state); + // Must be called after tuple_pool_ is freed, so that mem_tracker_ can be closed. + Aggregator::Close(state); +} + +Status GroupingAggregator::AddBatch(RuntimeState* state, RowBatch* batch) { + SCOPED_TIMER(build_timer_); + num_input_rows_ += batch->num_rows(); + + TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode; + if (add_batch_impl_fn_ != nullptr) { + RETURN_IF_ERROR(add_batch_impl_fn_(this, batch, prefetch_mode, ht_ctx_.get())); + } else { + RETURN_IF_ERROR(AddBatchImpl<false>(batch, prefetch_mode, ht_ctx_.get())); + } + + return Status::OK(); +} + +Status GroupingAggregator::AddBatchStreaming( + RuntimeState* state, RowBatch* out_batch, RowBatch* child_batch) { + SCOPED_TIMER(streaming_timer_); + num_input_rows_ += child_batch->num_rows(); + + int remaining_capacity[PARTITION_FANOUT]; + bool ht_needs_expansion = false; + for (int i = 0; i < PARTITION_FANOUT; ++i) { + HashTable* hash_tbl = GetHashTable(i); + remaining_capacity[i] = hash_tbl->NumInsertsBeforeResize(); + ht_needs_expansion |= remaining_capacity[i] < child_batch->num_rows(); + } + + // Stop expanding hash tables if we're not reducing the input sufficiently. As our + // hash tables expand out of each level of cache hierarchy, every hash table lookup + // will take longer. We also may not be able to expand hash tables because of memory + // pressure. In this case HashTable::CheckAndResize() will fail. In either case we + // should always use the remaining space in the hash table to avoid wasting memory. + if (ht_needs_expansion && ShouldExpandPreaggHashTables()) { + for (int i = 0; i < PARTITION_FANOUT; ++i) { + HashTable* ht = GetHashTable(i); + if (remaining_capacity[i] < child_batch->num_rows()) { + SCOPED_TIMER(ht_resize_timer_); + bool resized; + RETURN_IF_ERROR( + ht->CheckAndResize(child_batch->num_rows(), ht_ctx_.get(), &resized)); + if (resized) { + remaining_capacity[i] = ht->NumInsertsBeforeResize(); + } + } + } + } + + TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode; + if (add_batch_streaming_impl_fn_ != nullptr) { + RETURN_IF_ERROR(add_batch_streaming_impl_fn_(this, needs_serialize_, prefetch_mode, + child_batch, out_batch, ht_ctx_.get(), remaining_capacity)); + } else { + RETURN_IF_ERROR(AddBatchStreamingImpl(needs_serialize_, prefetch_mode, child_batch, + out_batch, ht_ctx_.get(), remaining_capacity)); + } + + num_rows_returned_ += out_batch->num_rows(); + COUNTER_SET(num_passthrough_rows_, num_rows_returned_); + return Status::OK(); +} + +Status GroupingAggregator::InputDone() { + return MoveHashPartitions(num_input_rows_); +} + +Tuple* GroupingAggregator::ConstructIntermediateTuple( + const vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool, Status* status) noexcept { + const int fixed_size = intermediate_tuple_desc_->byte_size(); + const int varlen_size = GroupingExprsVarlenSize(); + const int tuple_data_size = fixed_size + varlen_size; + uint8_t* tuple_data = pool->TryAllocate(tuple_data_size); + if (UNLIKELY(tuple_data == nullptr)) { + string details = Substitute("Cannot perform aggregation at aggregator with id $0. " + "Failed to allocate $1 bytes for intermediate tuple.", + id_, tuple_data_size); + *status = pool->mem_tracker()->MemLimitExceeded(state_, details, tuple_data_size); + return nullptr; + } + memset(tuple_data, 0, fixed_size); + Tuple* intermediate_tuple = reinterpret_cast<Tuple*>(tuple_data); + uint8_t* varlen_data = tuple_data + fixed_size; + CopyGroupingValues(intermediate_tuple, varlen_data, varlen_size); + InitAggSlots(agg_fn_evals, intermediate_tuple); + return intermediate_tuple; +} + +Tuple* GroupingAggregator::ConstructIntermediateTuple( + const vector<AggFnEvaluator*>& agg_fn_evals, BufferedTupleStream* stream, + Status* status) noexcept { + DCHECK(stream != nullptr && status != nullptr); + // Allocate space for the entire tuple in the stream. + const int fixed_size = intermediate_tuple_desc_->byte_size(); + const int varlen_size = GroupingExprsVarlenSize(); + const int tuple_size = fixed_size + varlen_size; + uint8_t* tuple_data = stream->AddRowCustomBegin(tuple_size, status); + if (UNLIKELY(tuple_data == nullptr)) { + // If we failed to allocate and did not hit an error (indicated by a non-ok status), + // the caller of this function can try to free some space, e.g. through spilling, and + // re-attempt to allocate space for this row. + return nullptr; + } + Tuple* tuple = reinterpret_cast<Tuple*>(tuple_data); + tuple->Init(fixed_size); + uint8_t* varlen_buffer = tuple_data + fixed_size; + CopyGroupingValues(tuple, varlen_buffer, varlen_size); + InitAggSlots(agg_fn_evals, tuple); + stream->AddRowCustomEnd(tuple_size); + return tuple; +} + +int GroupingAggregator::GroupingExprsVarlenSize() { + int varlen_size = 0; + // TODO: The hash table could compute this as it hashes. + for (int expr_idx : string_grouping_exprs_) { + StringValue* sv = reinterpret_cast<StringValue*>(ht_ctx_->ExprValue(expr_idx)); + // Avoid branching by multiplying length by null bit. + varlen_size += sv->len * !ht_ctx_->ExprValueNull(expr_idx); + } + return varlen_size; +} + +// TODO: codegen this function. +void GroupingAggregator::CopyGroupingValues( + Tuple* intermediate_tuple, uint8_t* buffer, int varlen_size) { + // Copy over all grouping slots (the variable length data is copied below). + for (int i = 0; i < grouping_exprs_.size(); ++i) { + SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[i]; + if (ht_ctx_->ExprValueNull(i)) { + intermediate_tuple->SetNull(slot_desc->null_indicator_offset()); + } else { + void* src = ht_ctx_->ExprValue(i); + void* dst = intermediate_tuple->GetSlot(slot_desc->tuple_offset()); + memcpy(dst, src, slot_desc->slot_size()); + } + } + + for (int expr_idx : string_grouping_exprs_) { + if (ht_ctx_->ExprValueNull(expr_idx)) continue; + + SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[expr_idx]; + // ptr and len were already copied to the fixed-len part of string value + StringValue* sv = reinterpret_cast<StringValue*>( + intermediate_tuple->GetSlot(slot_desc->tuple_offset())); + memcpy(buffer, sv->ptr, sv->len); + sv->ptr = reinterpret_cast<char*>(buffer); + buffer += sv->len; + } +} + +template <bool AGGREGATED_ROWS> +Status GroupingAggregator::AppendSpilledRow( + Partition* __restrict__ partition, TupleRow* __restrict__ row) { + DCHECK(!is_streaming_preagg_); + DCHECK(partition->is_spilled()); + BufferedTupleStream* stream = AGGREGATED_ROWS ? + partition->aggregated_row_stream.get() : + partition->unaggregated_row_stream.get(); + DCHECK(!stream->is_pinned()); + Status status; + if (LIKELY(stream->AddRow(row, &status))) return Status::OK(); + RETURN_IF_ERROR(status); + + // Keep trying to free memory by spilling until we succeed or hit an error. + // Running out of partitions to spill is treated as an error by SpillPartition(). + while (true) { + RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS)); + if (stream->AddRow(row, &status)) return Status::OK(); + RETURN_IF_ERROR(status); + } +} + +void GroupingAggregator::SetDebugOptions(const TDebugOptions& debug_options) { + debug_options_ = debug_options; +} + +string GroupingAggregator::DebugString(int indentation_level) const { + stringstream ss; + DebugString(indentation_level, &ss); + return ss.str(); +} + +void GroupingAggregator::DebugString(int indentation_level, stringstream* out) const { + *out << string(indentation_level * 2, ' '); + *out << "GroupingAggregator(" + << "intermediate_tuple_id=" << intermediate_tuple_id_ + << " output_tuple_id=" << output_tuple_id_ << " needs_finalize=" << needs_finalize_ + << " grouping_exprs=" << ScalarExpr::DebugString(grouping_exprs_) + << " agg_exprs=" << AggFn::DebugString(agg_fns_); + *out << ")"; +} + +Status GroupingAggregator::CreateHashPartitions(int level, int single_partition_idx) { + if (is_streaming_preagg_) DCHECK_EQ(level, 0); + if (UNLIKELY(level >= MAX_PARTITION_DEPTH)) { + return Status( + TErrorCode::PARTITIONED_AGG_MAX_PARTITION_DEPTH, id_, MAX_PARTITION_DEPTH); + } + ht_ctx_->set_level(level); + + DCHECK(hash_partitions_.empty()); + int num_partitions_created = 0; + for (int i = 0; i < PARTITION_FANOUT; ++i) { + hash_tbls_[i] = nullptr; + if (single_partition_idx == -1 || i == single_partition_idx) { + Partition* new_partition = partition_pool_->Add(new Partition(this, level, i)); + ++num_partitions_created; + hash_partitions_.push_back(new_partition); + RETURN_IF_ERROR(new_partition->InitStreams()); + } else { + hash_partitions_.push_back(nullptr); + } + } + // Now that all the streams are reserved (meaning we have enough memory to execute + // the algorithm), allocate the hash tables. These can fail and we can still continue. + for (int i = 0; i < PARTITION_FANOUT; ++i) { + Partition* partition = hash_partitions_[i]; + if (partition == nullptr) continue; + if (partition->aggregated_row_stream == nullptr) { + // Failed to create the aggregated row stream - cannot create a hash table. + // Just continue with a NULL hash table so rows will be passed through. + DCHECK(is_streaming_preagg_); + } else { + bool got_memory; + RETURN_IF_ERROR(partition->InitHashTable(&got_memory)); + // Spill the partition if we cannot create a hash table for a merge aggregation. + if (UNLIKELY(!got_memory)) { + DCHECK(!is_streaming_preagg_) << "Preagg reserves enough memory for hash tables"; + // If we're repartitioning, we will be writing aggregated rows first. + RETURN_IF_ERROR(partition->Spill(level > 0)); + } + } + hash_tbls_[i] = partition->hash_tbl.get(); + } + // In this case we did not have to repartition, so ensure that while building the hash + // table all rows will be inserted into the partition at 'single_partition_idx' in case + // a non deterministic grouping expression causes a row to hash to a different + // partition index. + if (single_partition_idx != -1) { + Partition* partition = hash_partitions_[single_partition_idx]; + for (int i = 0; i < PARTITION_FANOUT; ++i) { + hash_partitions_[i] = partition; + hash_tbls_[i] = partition->hash_tbl.get(); + } + } + + COUNTER_ADD(partitions_created_, num_partitions_created); + if (!is_streaming_preagg_) { + COUNTER_SET(max_partition_level_, level); + } + return Status::OK(); +} + +Status GroupingAggregator::CheckAndResizeHashPartitions( + bool partitioning_aggregated_rows, int num_rows, const HashTableCtx* ht_ctx) { + DCHECK(!is_streaming_preagg_); + for (int i = 0; i < PARTITION_FANOUT; ++i) { + Partition* partition = hash_partitions_[i]; + if (partition == nullptr) continue; + while (!partition->is_spilled()) { + { + SCOPED_TIMER(ht_resize_timer_); + bool resized; + RETURN_IF_ERROR(partition->hash_tbl->CheckAndResize(num_rows, ht_ctx, &resized)); + if (resized) break; + } + RETURN_IF_ERROR(SpillPartition(partitioning_aggregated_rows)); + } + } + return Status::OK(); +} + +Status GroupingAggregator::NextPartition() { + DCHECK(output_partition_ == nullptr); + + if (!is_in_subplan_ && spilled_partitions_.empty()) { + // All partitions are in memory. Release reservation that was used for previous + // partitions that is no longer needed. If we have spilled partitions, we want to + // hold onto all reservation in case it is needed to process the spilled partitions. + DCHECK(!buffer_pool_client()->has_unpinned_pages()); + Status status = reservation_manager_.ReleaseUnusedReservation(); + DCHECK(status.ok()) << "Should not fail - all partitions are in memory so there are " + << "no unpinned pages. " << status.GetDetail(); + } + + // Keep looping until we get to a partition that fits in memory. + Partition* partition = nullptr; + while (true) { + // First return partitions that are fully aggregated (and in memory). + if (!aggregated_partitions_.empty()) { + partition = aggregated_partitions_.front(); + DCHECK(!partition->is_spilled()); + aggregated_partitions_.pop_front(); + break; + } + + // No aggregated partitions in memory - we should not be using any reservation aside + // from 'serialize_stream_'. + DCHECK_EQ(serialize_stream_ != nullptr ? serialize_stream_->BytesPinned(false) : 0, + buffer_pool_client()->GetUsedReservation()) + << buffer_pool_client()->DebugString(); + + // Try to fit a single spilled partition in memory. We can often do this because + // we only need to fit 1/PARTITION_FANOUT of the data in memory. + // TODO: in some cases when the partition probably won't fit in memory it could + // be better to skip directly to repartitioning. + RETURN_IF_ERROR(BuildSpilledPartition(&partition)); + if (partition != nullptr) break; + + // If we can't fit the partition in memory, repartition it. + RETURN_IF_ERROR(RepartitionSpilledPartition()); + } + DCHECK(!partition->is_spilled()); + DCHECK(partition->hash_tbl.get() != nullptr); + DCHECK(partition->aggregated_row_stream->is_pinned()); + + output_partition_ = partition; + output_iterator_ = output_partition_->hash_tbl->Begin(ht_ctx_.get()); + COUNTER_ADD(num_hash_buckets_, output_partition_->hash_tbl->num_buckets()); + return Status::OK(); +} + +Status GroupingAggregator::BuildSpilledPartition(Partition** built_partition) { + DCHECK(!spilled_partitions_.empty()); + DCHECK(!is_streaming_preagg_); + // Leave the partition in 'spilled_partitions_' to be closed if we hit an error. + Partition* src_partition = spilled_partitions_.front(); + DCHECK(src_partition->is_spilled()); + + // Create a new hash partition from the rows of the spilled partition. This is simpler + // than trying to finish building a partially-built partition in place. We only + // initialise one hash partition that all rows in 'src_partition' will hash to. + RETURN_IF_ERROR(CreateHashPartitions(src_partition->level, src_partition->idx)); + Partition* dst_partition = hash_partitions_[src_partition->idx]; + DCHECK(dst_partition != nullptr); + + // Rebuild the hash table over spilled aggregate rows then start adding unaggregated + // rows to the hash table. It's possible the partition will spill at either stage. + // In that case we need to finish processing 'src_partition' so that all rows are + // appended to 'dst_partition'. + // TODO: if the partition spills again but the aggregation reduces the input + // significantly, we could do better here by keeping the incomplete hash table in + // memory and only spilling unaggregated rows that didn't fit in the hash table + // (somewhat similar to the passthrough pre-aggregation). + RETURN_IF_ERROR(ProcessStream<true>(src_partition->aggregated_row_stream.get())); + RETURN_IF_ERROR(ProcessStream<false>(src_partition->unaggregated_row_stream.get())); + src_partition->Close(false); + spilled_partitions_.pop_front(); + hash_partitions_.clear(); + + if (dst_partition->is_spilled()) { + PushSpilledPartition(dst_partition); + *built_partition = nullptr; + // Spilled the partition - we should not be using any reservation except from + // 'serialize_stream_'. + DCHECK_EQ(serialize_stream_ != nullptr ? serialize_stream_->BytesPinned(false) : 0, + buffer_pool_client()->GetUsedReservation()) + << buffer_pool_client()->DebugString(); + } else { + *built_partition = dst_partition; + } + return Status::OK(); +} + +Status GroupingAggregator::RepartitionSpilledPartition() { + DCHECK(!spilled_partitions_.empty()); + DCHECK(!is_streaming_preagg_); + // Leave the partition in 'spilled_partitions_' to be closed if we hit an error. + Partition* partition = spilled_partitions_.front(); + DCHECK(partition->is_spilled()); + + // Create the new hash partitions to repartition into. This will allocate a + // write buffer for each partition's aggregated row stream. + RETURN_IF_ERROR(CreateHashPartitions(partition->level + 1)); + COUNTER_ADD(num_repartitions_, 1); + + // Rows in this partition could have been spilled into two streams, depending + // on if it is an aggregated intermediate, or an unaggregated row. Aggregated + // rows are processed first to save a hash table lookup in AddBatchImpl(). + RETURN_IF_ERROR(ProcessStream<true>(partition->aggregated_row_stream.get())); + + // Prepare write buffers so we can append spilled rows to unaggregated partitions. + for (Partition* hash_partition : hash_partitions_) { + if (!hash_partition->is_spilled()) continue; + // The aggregated rows have been repartitioned. Free up at least a buffer's worth of + // reservation and use it to pin the unaggregated write buffer. + hash_partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL); + bool got_buffer; + RETURN_IF_ERROR( + hash_partition->unaggregated_row_stream->PrepareForWrite(&got_buffer)); + DCHECK(got_buffer) << "Accounted in min reservation" + << buffer_pool_client()->DebugString(); + } + RETURN_IF_ERROR(ProcessStream<false>(partition->unaggregated_row_stream.get())); + + COUNTER_ADD(num_row_repartitioned_, partition->aggregated_row_stream->num_rows()); + COUNTER_ADD(num_row_repartitioned_, partition->unaggregated_row_stream->num_rows()); + + partition->Close(false); + spilled_partitions_.pop_front(); + + // Done processing this partition. Move the new partitions into + // spilled_partitions_/aggregated_partitions_. + int64_t num_input_rows = partition->aggregated_row_stream->num_rows() + + partition->unaggregated_row_stream->num_rows(); + RETURN_IF_ERROR(MoveHashPartitions(num_input_rows)); + return Status::OK(); +} + +template <bool AGGREGATED_ROWS> +Status GroupingAggregator::ProcessStream(BufferedTupleStream* input_stream) { + DCHECK(!is_streaming_preagg_); + if (input_stream->num_rows() > 0) { + while (true) { + bool got_buffer = false; + RETURN_IF_ERROR(input_stream->PrepareForRead(true, &got_buffer)); + if (got_buffer) break; + // Did not have a buffer to read the input stream. Spill and try again. + RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS)); + } + + TPrefetchMode::type prefetch_mode = state_->query_options().prefetch_mode; + bool eos = false; + const RowDescriptor* desc = + AGGREGATED_ROWS ? &intermediate_row_desc_ : &input_row_desc_; + RowBatch batch(desc, state_->batch_size(), mem_tracker_.get()); + do { + RETURN_IF_ERROR(input_stream->GetNext(&batch, &eos)); + RETURN_IF_ERROR( + AddBatchImpl<AGGREGATED_ROWS>(&batch, prefetch_mode, ht_ctx_.get())); + RETURN_IF_ERROR(QueryMaintenance(state_)); + batch.Reset(); + } while (!eos); + } + input_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES); + return Status::OK(); +} + +Status GroupingAggregator::SpillPartition(bool more_aggregate_rows) { + int64_t max_freed_mem = 0; + int partition_idx = -1; + + // Iterate over the partitions and pick the largest partition that is not spilled. + for (int i = 0; i < hash_partitions_.size(); ++i) { + if (hash_partitions_[i] == nullptr) continue; + if (hash_partitions_[i]->is_closed) continue; + if (hash_partitions_[i]->is_spilled()) continue; + // Pass 'true' because we need to keep the write block pinned. See Partition::Spill(). + int64_t mem = hash_partitions_[i]->aggregated_row_stream->BytesPinned(true); + mem += hash_partitions_[i]->hash_tbl->ByteSize(); + mem += hash_partitions_[i]->agg_fn_perm_pool->total_reserved_bytes(); + DCHECK_GT(mem, 0); // At least the hash table buckets should occupy memory. + if (mem > max_freed_mem) { + max_freed_mem = mem; + partition_idx = i; + } + } + DCHECK_NE(partition_idx, -1) << "Should have been able to spill a partition to " + << "reclaim memory: " + << buffer_pool_client()->DebugString(); + // Remove references to the destroyed hash table from 'hash_tbls_'. + // Additionally, we might be dealing with a rebuilt spilled partition, where all + // partitions point to a single in-memory partition. This also ensures that 'hash_tbls_' + // remains consistent in that case. + for (int i = 0; i < PARTITION_FANOUT; ++i) { + if (hash_partitions_[i] == hash_partitions_[partition_idx]) hash_tbls_[i] = nullptr; + } + return hash_partitions_[partition_idx]->Spill(more_aggregate_rows); +} + +Status GroupingAggregator::MoveHashPartitions(int64_t num_input_rows) { + DCHECK(!hash_partitions_.empty()); + stringstream ss; + ss << "PA(node_id=" << id_ << ") partitioned(level=" << hash_partitions_[0]->level + << ") " << num_input_rows << " rows into:" << endl; + for (int i = 0; i < hash_partitions_.size(); ++i) { + Partition* partition = hash_partitions_[i]; + if (partition == nullptr) continue; + // We might be dealing with a rebuilt spilled partition, where all partitions are + // pointing to a single in-memory partition, so make sure we only proceed for the + // right partition. + if (i != partition->idx) continue; + int64_t aggregated_rows = 0; + if (partition->aggregated_row_stream != nullptr) { + aggregated_rows = partition->aggregated_row_stream->num_rows(); + } + int64_t unaggregated_rows = 0; + if (partition->unaggregated_row_stream != nullptr) { + unaggregated_rows = partition->unaggregated_row_stream->num_rows(); + } + double total_rows = aggregated_rows + unaggregated_rows; + double percent = total_rows * 100 / num_input_rows; + ss << " " << i << " " << (partition->is_spilled() ? "spilled" : "not spilled") + << " (fraction=" << fixed << setprecision(2) << percent << "%)" << endl + << " #aggregated rows:" << aggregated_rows << endl + << " #unaggregated rows: " << unaggregated_rows << endl; + + // TODO: update counters to support doubles. + COUNTER_SET(largest_partition_percent_, static_cast<int64_t>(percent)); + + if (total_rows == 0) { + partition->Close(false); + } else if (partition->is_spilled()) { + PushSpilledPartition(partition); + } else { + aggregated_partitions_.push_back(partition); + } + } + VLOG(2) << ss.str(); + hash_partitions_.clear(); + return Status::OK(); +} + +void GroupingAggregator::PushSpilledPartition(Partition* partition) { + DCHECK(partition->is_spilled()); + DCHECK(partition->hash_tbl == nullptr); + // Ensure all pages in the spilled partition's streams are unpinned by invalidating + // the streams' read and write iterators. We may need all the memory to process the + // next spilled partitions. + partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL); + partition->unaggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL); + spilled_partitions_.push_front(partition); +} + +void GroupingAggregator::ClosePartitions() { + for (Partition* partition : hash_partitions_) { + if (partition != nullptr) partition->Close(true); + } + hash_partitions_.clear(); + for (Partition* partition : aggregated_partitions_) partition->Close(true); + aggregated_partitions_.clear(); + for (Partition* partition : spilled_partitions_) partition->Close(true); + spilled_partitions_.clear(); + memset(hash_tbls_, 0, sizeof(hash_tbls_)); + partition_pool_->Clear(); +} + +int64_t GroupingAggregator::MinReservation() const { + // Must be kept in sync with AggregationNode.computeNodeResourceProfile() in fe. + if (is_streaming_preagg_) { + // Reserve at least one buffer and a 64kb hash table per partition. + return (resource_profile_.spillable_buffer_size + 64 * 1024) * PARTITION_FANOUT; + } + int num_buffers = PARTITION_FANOUT + 1 + (needs_serialize_ ? 1 : 0); + // Two of the buffers must fit the maximum row. + return resource_profile_.spillable_buffer_size * (num_buffers - 2) + + resource_profile_.max_row_buffer_size * 2; +} + +Status GroupingAggregator::QueryMaintenance(RuntimeState* state) { + expr_results_pool_->Clear(); + return state->CheckQueryState(); +} + +BufferPool::ClientHandle* GroupingAggregator::buffer_pool_client() { + return reservation_manager_.buffer_pool_client(); +} + +Status GroupingAggregator::CodegenAddBatchImpl( + LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) { + llvm::Function* update_tuple_fn; + RETURN_IF_ERROR(CodegenUpdateTuple(codegen, &update_tuple_fn)); + + // Get the cross compiled update row batch function + IRFunction::Type ir_fn = IRFunction::GROUPING_AGG_ADD_BATCH_IMPL; + llvm::Function* add_batch_impl_fn = codegen->GetFunction(ir_fn, true); + DCHECK(add_batch_impl_fn != nullptr); + + int replaced; + // Codegen for grouping using hash table + + // Replace prefetch_mode with constant so branches can be optimised out. + llvm::Value* prefetch_mode_arg = codegen->GetArgument(add_batch_impl_fn, 3); + prefetch_mode_arg->replaceAllUsesWith(codegen->GetI32Constant(prefetch_mode)); + + // The codegen'd AddBatchImpl function is only used in Open() with level_ = 0, + // so don't use murmur hash + llvm::Function* hash_fn; + RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(codegen, /* use murmur */ false, &hash_fn)); + + // Codegen HashTable::Equals<true> + llvm::Function* build_equals_fn; + RETURN_IF_ERROR(ht_ctx_->CodegenEquals(codegen, true, &build_equals_fn)); + + // Codegen for evaluating input rows + llvm::Function* eval_grouping_expr_fn; + RETURN_IF_ERROR(ht_ctx_->CodegenEvalRow(codegen, false, &eval_grouping_expr_fn)); + + // Replace call sites + replaced = + codegen->ReplaceCallSites(add_batch_impl_fn, eval_grouping_expr_fn, "EvalProbeRow"); + DCHECK_EQ(replaced, 1); + + replaced = codegen->ReplaceCallSites(add_batch_impl_fn, hash_fn, "HashRow"); + DCHECK_EQ(replaced, 1); + + replaced = codegen->ReplaceCallSites(add_batch_impl_fn, build_equals_fn, "Equals"); + DCHECK_EQ(replaced, 1); + + HashTableCtx::HashTableReplacedConstants replaced_constants; + const bool stores_duplicates = false; + RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants( + codegen, stores_duplicates, 1, add_batch_impl_fn, &replaced_constants)); + DCHECK_GE(replaced_constants.stores_nulls, 1); + DCHECK_GE(replaced_constants.finds_some_nulls, 1); + DCHECK_GE(replaced_constants.stores_duplicates, 1); + DCHECK_GE(replaced_constants.stores_tuples, 1); + DCHECK_GE(replaced_constants.quadratic_probing, 1); + + replaced = codegen->ReplaceCallSites(add_batch_impl_fn, update_tuple_fn, "UpdateTuple"); + DCHECK_GE(replaced, 1); + add_batch_impl_fn = codegen->FinalizeFunction(add_batch_impl_fn); + if (add_batch_impl_fn == nullptr) { + return Status("GroupingAggregator::CodegenAddBatchImpl(): codegen'd " + "AddBatchImpl() function failed verification, see log"); + } + + void** codegened_fn_ptr = reinterpret_cast<void**>(&add_batch_impl_fn_); + codegen->AddFunctionToJit(add_batch_impl_fn, codegened_fn_ptr); + return Status::OK(); +} + +Status GroupingAggregator::CodegenAddBatchStreamingImpl( + LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) { + DCHECK(is_streaming_preagg_); + + IRFunction::Type ir_fn = IRFunction::GROUPING_AGG_ADD_BATCH_STREAMING_IMPL; + llvm::Function* add_batch_streaming_impl_fn = codegen->GetFunction(ir_fn, true); + DCHECK(add_batch_streaming_impl_fn != nullptr); + + // Make needs_serialize arg constant so dead code can be optimised out. + llvm::Value* needs_serialize_arg = codegen->GetArgument(add_batch_streaming_impl_fn, 2); + needs_serialize_arg->replaceAllUsesWith(codegen->GetBoolConstant(needs_serialize_)); + + // Replace prefetch_mode with constant so branches can be optimised out. + llvm::Value* prefetch_mode_arg = codegen->GetArgument(add_batch_streaming_impl_fn, 3); + prefetch_mode_arg->replaceAllUsesWith(codegen->GetI32Constant(prefetch_mode)); + + llvm::Function* update_tuple_fn; + RETURN_IF_ERROR(CodegenUpdateTuple(codegen, &update_tuple_fn)); + + // We only use the top-level hash function for streaming aggregations. + llvm::Function* hash_fn; + RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(codegen, false, &hash_fn)); + + // Codegen HashTable::Equals + llvm::Function* equals_fn; + RETURN_IF_ERROR(ht_ctx_->CodegenEquals(codegen, true, &equals_fn)); + + // Codegen for evaluating input rows + llvm::Function* eval_grouping_expr_fn; + RETURN_IF_ERROR(ht_ctx_->CodegenEvalRow(codegen, false, &eval_grouping_expr_fn)); + + // Replace call sites + int replaced = codegen->ReplaceCallSites( + add_batch_streaming_impl_fn, update_tuple_fn, "UpdateTuple"); + DCHECK_EQ(replaced, 2); + + replaced = codegen->ReplaceCallSites( + add_batch_streaming_impl_fn, eval_grouping_expr_fn, "EvalProbeRow"); + DCHECK_EQ(replaced, 1); + + replaced = codegen->ReplaceCallSites(add_batch_streaming_impl_fn, hash_fn, "HashRow"); + DCHECK_EQ(replaced, 1); + + replaced = codegen->ReplaceCallSites(add_batch_streaming_impl_fn, equals_fn, "Equals"); + DCHECK_EQ(replaced, 1); + + HashTableCtx::HashTableReplacedConstants replaced_constants; + const bool stores_duplicates = false; + RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants( + codegen, stores_duplicates, 1, add_batch_streaming_impl_fn, &replaced_constants)); + DCHECK_GE(replaced_constants.stores_nulls, 1); + DCHECK_GE(replaced_constants.finds_some_nulls, 1); + DCHECK_GE(replaced_constants.stores_duplicates, 1); + DCHECK_GE(replaced_constants.stores_tuples, 1); + DCHECK_GE(replaced_constants.quadratic_probing, 1); + + DCHECK(add_batch_streaming_impl_fn != nullptr); + add_batch_streaming_impl_fn = codegen->FinalizeFunction(add_batch_streaming_impl_fn); + if (add_batch_streaming_impl_fn == nullptr) { + return Status("GroupingAggregator::CodegenAddBatchStreamingImpl(): codegen'd " + "AddBatchStreamingImpl() function failed verification, see log"); + } + + codegen->AddFunctionToJit(add_batch_streaming_impl_fn, + reinterpret_cast<void**>(&add_batch_streaming_impl_fn_)); + return Status::OK(); +} + +// Instantiate required templates. +template Status GroupingAggregator::AppendSpilledRow<false>(Partition*, TupleRow*); +template Status GroupingAggregator::AppendSpilledRow<true>(Partition*, TupleRow*); +} // namespace impala
http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/grouping-aggregator.h ---------------------------------------------------------------------- diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h new file mode 100644 index 0000000..0d1b893 --- /dev/null +++ b/be/src/exec/grouping-aggregator.h @@ -0,0 +1,624 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_EXEC_GROUPING_AGGREGATOR_H +#define IMPALA_EXEC_GROUPING_AGGREGATOR_H + +#include <deque> +#include <memory> +#include <vector> + +#include "exec/aggregator.h" +#include "exec/hash-table.h" +#include "runtime/buffered-tuple-stream.h" +#include "runtime/bufferpool/suballocator.h" +#include "runtime/descriptors.h" +#include "runtime/mem-pool.h" +#include "runtime/reservation-manager.h" + +namespace impala { + +class AggFnEvaluator; +class LlvmCodeGen; +class RowBatch; +class RuntimeState; +class Tuple; + +/// Aggregator for doing grouping aggregations. Input is passed to the aggregator through +/// AddBatch(), or AddBatchStreaming() if this is a pre-agg. Then: +/// 1. Each row is hashed and we pick a dst partition (hash_partitions_). +/// 2. If the dst partition is not spilled, we probe into the partitions hash table +/// to aggregate/insert the row. +/// 3. If the partition is already spilled, the input row is spilled. +/// 4. When all the input is consumed, we walk hash_partitions_, put the spilled ones +/// into spilled_partitions_ and the non-spilled ones into aggregated_partitions_. +/// aggregated_partitions_ contain partitions that are fully processed and the result +/// can just be returned. Partitions in spilled_partitions_ need to be repartitioned +/// and we just repeat these steps. +// +/// Each partition contains these structures: +/// 1) Hash Table for aggregated rows. This contains just the hash table directory +/// structure but not the rows themselves. This is NULL for spilled partitions when +/// we stop maintaining the hash table. +/// 2) MemPool for var-len result data for rows in the hash table. If the aggregate +/// function returns a string, we cannot append it to the tuple stream as that +/// structure is immutable. Instead, when we need to spill, we sweep and copy the +/// rows into a tuple stream. +/// 3) Aggregated tuple stream for rows that are/were in the hash table. This stream +/// contains rows that are aggregated. When the partition is not spilled, this stream +/// is pinned and contains the memory referenced by the hash table. +/// In the case where the aggregate function does not return a string (meaning the +/// size of all the slots is known when the row is constructed), this stream contains +/// all the memory for the result rows and the MemPool (2) is not used. +/// 4) Unaggregated tuple stream. Stream to spill unaggregated rows. +/// Rows in this stream always have child(0)'s layout. +/// +/// Buffering: Each stream and hash table needs to maintain at least one buffer when +/// it is being read or written. The streams for a given agg use a uniform buffer size, +/// except when processing rows larger than that buffer size. In that case, the agg uses +/// BufferedTupleStream's variable buffer size support to handle larger rows up to the +/// maximum row size. Only two max-sized buffers are needed for the agg to spill: one +/// to hold rows being read from a spilled input stream and another for a temporary write +/// buffer when adding a row to an output stream. +/// +/// Two-phase aggregation: we support two-phase distributed aggregations, where +/// pre-aggregrations attempt to reduce the size of data before shuffling data across the +/// network to be merged by the merge aggregation node. This aggregator supports a +/// streaming mode for pre-aggregations where it maintains a hash table of aggregated +/// rows, but can pass through unaggregated rows (after transforming them into the +/// same tuple format as aggregated rows) when a heuristic determines that it is better +/// to send rows across the network instead of consuming additional memory and CPU +/// resources to expand its hash table. The planner decides whether a given +/// pre-aggregation should use the streaming preaggregation algorithm or the same +/// blocking aggregation algorithm as used in merge aggregations. +/// TODO: make this less of a heuristic by factoring in the cost of the exchange vs the +/// cost of the pre-aggregation. +/// +/// Handling memory pressure: the node uses two different strategies for responding to +/// memory pressure, depending on whether it is a streaming pre-aggregation or not. If +/// the node is a streaming preaggregation, it stops growing its hash table further by +/// converting unaggregated rows into the aggregated tuple format and passing them +/// through. If the node is not a streaming pre-aggregation, it responds to memory +/// pressure by spilling partitions to disk. +/// +/// TODO: Buffer rows before probing into the hash table? +/// TODO: After spilling, we can still maintain a very small hash table just to remove +/// some number of rows (from likely going to disk). +/// TODO: Consider allowing to spill the hash table structure in addition to the rows. +/// TODO: Do we want to insert a buffer before probing into the partition's hash table? +/// TODO: Use a prefetch/batched probe interface. +/// TODO: Return rows from the aggregated_row_stream rather than the HT. +/// TODO: Think about spilling heuristic. +/// TODO: When processing a spilled partition, we have a lot more information and can +/// size the partitions/hash tables better. +/// TODO: Start with unpartitioned (single partition) and switch to partitioning and +/// spilling only if the size gets large, say larger than the LLC. +/// TODO: Simplify or cleanup the various uses of agg_fn_ctx, agg_fn_ctx_, and ctx. +/// There are so many contexts in use that a plain "ctx" variable should never be used. +/// Likewise, it's easy to mixup the agg fn ctxs, there should be a way to simplify this. +/// TODO: support an Init() method with an initial value in the UDAF interface. +class GroupingAggregator : public Aggregator { + public: + GroupingAggregator(ExecNode* exec_node, ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs); + + virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override; + virtual Status Prepare(RuntimeState* state) override; + virtual void Codegen(RuntimeState* state) override; + virtual Status Open(RuntimeState* state) override; + virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override; + virtual Status Reset(RuntimeState* state) override; + virtual void Close(RuntimeState* state) override; + + virtual Status AddBatch(RuntimeState* state, RowBatch* batch) override; + /// Used to insert input rows if this is a streaming pre-agg. Tries to aggregate all of + /// the rows of 'child_batch', but if there isn't enough memory available rows will be + /// streamed through and returned in 'out_batch'. AddBatch() and AddBatchStreaming() + /// should not be called on the same GroupingAggregator. + Status AddBatchStreaming( + RuntimeState* state, RowBatch* out_batch, RowBatch* child_batch); + virtual Status InputDone() override WARN_UNUSED_RESULT; + + virtual int num_grouping_exprs() override { return grouping_exprs_.size(); } + + virtual void SetDebugOptions(const TDebugOptions& debug_options) override; + + virtual std::string DebugString(int indentation_level = 0) const override; + virtual void DebugString(int indentation_level, std::stringstream* out) const override; + + private: + struct Partition; + + /// Number of initial partitions to create. Must be a power of 2. + static const int PARTITION_FANOUT = 16; + + /// Needs to be the log(PARTITION_FANOUT). + /// We use the upper bits to pick the partition and lower bits in the HT. + /// TODO: different hash functions here too? We don't need that many bits to pick + /// the partition so this might be okay. + static const int NUM_PARTITIONING_BITS = 4; + + /// Maximum number of times we will repartition. The maximum build table we can process + /// (if we have enough scratch disk space) in case there is no skew is: + /// MEM_LIMIT * (PARTITION_FANOUT ^ MAX_PARTITION_DEPTH). + /// In the case where there is skew, repartitioning is unlikely to help (assuming a + /// reasonable hash function). + /// Note that we need to have at least as many SEED_PRIMES in HashTableCtx. + /// TODO: we can revisit and try harder to explicitly detect skew. + static const int MAX_PARTITION_DEPTH = 16; + + /// Default initial number of buckets in a hash table. + /// TODO: rethink this ? + static const int64_t PAGG_DEFAULT_HASH_TABLE_SZ = 1024; + + /// Codegen doesn't allow for automatic Status variables because then exception + /// handling code is needed to destruct the Status, and our function call substitution + /// doesn't know how to deal with the LLVM IR 'invoke' instruction. Workaround that by + /// placing the Status here so exceptions won't need to destruct it. + /// TODO: fix IMPALA-1948 and remove this. + Status add_batch_status_; + + /// Row with the intermediate tuple as its only tuple. + /// Construct a new row desc for preparing the build exprs because neither the child's + /// nor this node's output row desc may contain the intermediate tuple, e.g., + /// in a single-node plan with an intermediate tuple different from the output tuple. + /// Lives in the query state's obj_pool. + RowDescriptor intermediate_row_desc_; + + /// True if this is first phase of a two-phase distributed aggregation for which we + /// are doing a streaming preaggregation. + const bool is_streaming_preagg_; + + /// True if any of the evaluators require the serialize step. + bool needs_serialize_; + + /// Exprs used to evaluate input rows + std::vector<ScalarExpr*> grouping_exprs_; + + /// Exprs used to insert constructed aggregation tuple into the hash table. + /// All the exprs are simply SlotRefs for the intermediate tuple. + std::vector<ScalarExpr*> build_exprs_; + + /// Indices of grouping exprs with var-len string types in grouping_exprs_. + /// We need to do more work for var-len expressions when allocating and spilling rows. + /// All var-len grouping exprs have type string. + std::vector<int> string_grouping_exprs_; + + RuntimeState* state_; + + /// Allocator for hash table memory. + std::unique_ptr<Suballocator> ht_allocator_; + + /// MemPool used to allocate memory during Close() when creating new output tuples. The + /// pool should not be Reset() to allow amortizing memory allocation over a series of + /// Reset()/Open()/GetNext()* calls. + std::unique_ptr<MemPool> tuple_pool_; + + /// The current partition and iterator to the next row in its hash table that we need + /// to return in GetNext() + Partition* output_partition_; + HashTable::Iterator output_iterator_; + + /// Resource information sent from the frontend. + const TBackendResourceProfile resource_profile_; + + ReservationManager reservation_manager_; + BufferPool::ClientHandle* buffer_pool_client(); + + /// The number of rows that have been passed to AddBatch() or AddBatchStreaming(). + int64_t num_input_rows_; + + /// True if this aggregator is being executed in a subplan. + const bool is_in_subplan_; + + int64_t limit_; // -1: no limit + bool ReachedLimit() { return limit_ != -1 && num_rows_returned_ >= limit_; } + + typedef Status (*AddBatchImplFn)( + GroupingAggregator*, RowBatch*, TPrefetchMode::type, HashTableCtx*); + /// Jitted AddBatchImpl function pointer. Null if codegen is disabled. + AddBatchImplFn add_batch_impl_fn_; + + typedef Status (*AddBatchStreamingImplFn)(GroupingAggregator*, bool, + TPrefetchMode::type, RowBatch*, RowBatch*, HashTableCtx*, int[PARTITION_FANOUT]); + /// Jitted AddBatchStreamingImpl function pointer. Null if codegen is disabled. + AddBatchStreamingImplFn add_batch_streaming_impl_fn_; + + /// Total time spent resizing hash tables. + RuntimeProfile::Counter* ht_resize_timer_; + + /// Time spent returning the aggregated rows + RuntimeProfile::Counter* get_results_timer_; + + /// Total number of hash buckets across all partitions. + RuntimeProfile::Counter* num_hash_buckets_; + + /// Total number of partitions created. + RuntimeProfile::Counter* partitions_created_; + + /// Level of max partition (i.e. number of repartitioning steps). + RuntimeProfile::HighWaterMarkCounter* max_partition_level_; + + /// Number of rows that have been repartitioned. + RuntimeProfile::Counter* num_row_repartitioned_; + + /// Number of partitions that have been repartitioned. + RuntimeProfile::Counter* num_repartitions_; + + /// Number of partitions that have been spilled. + RuntimeProfile::Counter* num_spilled_partitions_; + + /// The largest fraction after repartitioning. This is expected to be + /// 1 / PARTITION_FANOUT. A value much larger indicates skew. + RuntimeProfile::HighWaterMarkCounter* largest_partition_percent_; + + /// Time spent in streaming preagg algorithm. + RuntimeProfile::Counter* streaming_timer_; + + /// The number of rows passed through without aggregation. + RuntimeProfile::Counter* num_passthrough_rows_; + + /// The estimated reduction of the preaggregation. + RuntimeProfile::Counter* preagg_estimated_reduction_; + + /// Expose the minimum reduction factor to continue growing the hash tables. + RuntimeProfile::Counter* preagg_streaming_ht_min_reduction_; + + /// The estimated number of input rows from the planner. + int64_t estimated_input_cardinality_; + + TDebugOptions debug_options_; + + ///////////////////////////////////////// + /// BEGIN: Members that must be Reset() + + /// If true, no more rows to output from partitions. + bool partition_eos_; + + /// Used for hash-related functionality, such as evaluating rows and calculating hashes. + /// It also owns the evaluators for the grouping and build expressions used during hash + /// table insertion and probing. + boost::scoped_ptr<HashTableCtx> ht_ctx_; + + /// Object pool that holds the Partition objects in hash_partitions_. + std::unique_ptr<ObjectPool> partition_pool_; + + /// Current partitions we are partitioning into. IMPALA-5788: For the case where we + /// rebuild a spilled partition that fits in memory, all pointers in this vector will + /// point to a single in-memory partition. + std::vector<Partition*> hash_partitions_; + + /// Cache for hash tables in 'hash_partitions_'. IMPALA-5788: For the case where we + /// rebuild a spilled partition that fits in memory, all pointers in this array will + /// point to the hash table that is a part of a single in-memory partition. + HashTable* hash_tbls_[PARTITION_FANOUT]; + + /// All partitions that have been spilled and need further processing. + std::deque<Partition*> spilled_partitions_; + + /// All partitions that are aggregated and can just return the results in GetNext(). + /// After consuming all the input, hash_partitions_ is split into spilled_partitions_ + /// and aggregated_partitions_, depending on if it was spilled or not. + std::deque<Partition*> aggregated_partitions_; + + /// END: Members that must be Reset() + ///////////////////////////////////////// + + /// The hash table and streams (aggregated and unaggregated) for an individual + /// partition. The streams of each partition always (i.e. regardless of level) + /// initially use small buffers. Streaming pre-aggregations do not spill and do not + /// require an unaggregated stream. + struct Partition { + Partition(GroupingAggregator* parent, int level, int idx) + : parent(parent), is_closed(false), level(level), idx(idx) {} + + ~Partition(); + + /// Initializes aggregated_row_stream and unaggregated_row_stream (if a spilling + /// aggregation), allocating one buffer for each. Spilling merge aggregations must + /// have enough reservation for the initial buffer for the stream, so this should + /// not fail due to OOM. Preaggregations do not reserve any buffers: if does not + /// have enough reservation for the initial buffer, the aggregated row stream is not + /// created and an OK status is returned. + Status InitStreams() WARN_UNUSED_RESULT; + + /// Initializes the hash table. 'aggregated_row_stream' must be non-NULL. + /// Sets 'got_memory' to true if the hash table was initialised or false on OOM. + Status InitHashTable(bool* got_memory) WARN_UNUSED_RESULT; + + /// Called in case we need to serialize aggregated rows. This step effectively does + /// a merge aggregation in this aggregator. + Status SerializeStreamForSpilling() WARN_UNUSED_RESULT; + + /// Closes this partition. If finalize_rows is true, this iterates over all rows + /// in aggregated_row_stream and finalizes them (this is only used in the cancellation + /// path). + void Close(bool finalize_rows); + + /// Spill this partition. 'more_aggregate_rows' = true means that more aggregate rows + /// may be appended to the the partition before appending unaggregated rows. On + /// success, one of the streams is left with a write iterator: the aggregated stream + /// if 'more_aggregate_rows' is true or the unaggregated stream otherwise. + Status Spill(bool more_aggregate_rows) WARN_UNUSED_RESULT; + + bool is_spilled() const { return hash_tbl.get() == nullptr; } + + GroupingAggregator* parent; + + /// If true, this partition is closed and there is nothing left to do. + bool is_closed; + + /// How many times rows in this partition have been repartitioned. Partitions created + /// from the aggregator's input is level 0, 1 after the first repartitionining, etc. + const int level; + + /// The index of this partition within 'hash_partitions_' at its level. + const int idx; + + /// Hash table for this partition. + /// Can be NULL if this partition is no longer maintaining a hash table (i.e. + /// is spilled or we are passing through all rows for this partition). + std::unique_ptr<HashTable> hash_tbl; + + /// Clone of parent's agg_fn_evals_. Permanent allocations come from + /// 'agg_fn_perm_pool' and result allocations come from 'expr_results_pool_'. + std::vector<AggFnEvaluator*> agg_fn_evals; + + /// Pool for permanent allocations for this partition's 'agg_fn_evals'. Freed at the + /// same times as 'agg_fn_evals' are closed: either when the partition is closed or + /// when it is spilled. + std::unique_ptr<MemPool> agg_fn_perm_pool; + + /// Tuple stream used to store aggregated rows. When the partition is not spilled, + /// (meaning the hash table is maintained), this stream is pinned and contains the + /// memory referenced by the hash table. When it is spilled, this consumes reservation + /// for a write buffer only during repartitioning of aggregated rows. + /// + /// For streaming preaggs, this may be NULL if sufficient memory is not available. + /// In that case hash_tbl is also NULL and all rows for the partition will be passed + /// through. + std::unique_ptr<BufferedTupleStream> aggregated_row_stream; + + /// Unaggregated rows that are spilled. Always NULL for streaming pre-aggregations. + /// Always unpinned. Has a write buffer allocated when the partition is spilled and + /// unaggregated rows are being processed. + std::unique_ptr<BufferedTupleStream> unaggregated_row_stream; + }; + + /// Stream used to store serialized spilled rows. Only used if needs_serialize_ + /// is set. This stream is never pinned and only used in Partition::Spill as a + /// a temporary buffer. + std::unique_ptr<BufferedTupleStream> serialize_stream_; + + /// Accessor for 'hash_tbls_' that verifies consistency with the partitions. + HashTable* ALWAYS_INLINE GetHashTable(int partition_idx) { + HashTable* ht = hash_tbls_[partition_idx]; + DCHECK_EQ(ht, hash_partitions_[partition_idx]->hash_tbl.get()); + return ht; + } + + /// Copies grouping values stored in 'ht_ctx_' that were computed over 'current_row_' + /// using 'grouping_expr_evals_'. Aggregation expr slots are set to their initial + /// values. Returns NULL if there was not enough memory to allocate the tuple or errors + /// occurred. In which case, 'status' is set. Allocates tuple and var-len data for + /// grouping exprs from stream. Var-len data for aggregate exprs is allocated from the + /// FunctionContexts, so is stored outside the stream. If stream's small buffers get + /// full, it will attempt to switch to IO-buffers. + Tuple* ConstructIntermediateTuple(const std::vector<AggFnEvaluator*>& agg_fn_evals, + BufferedTupleStream* stream, Status* status) noexcept; + + /// Constructs intermediate tuple, allocating memory from pool instead of the stream. + /// Returns NULL and sets status if there is not enough memory to allocate the tuple. + Tuple* ConstructIntermediateTuple(const std::vector<AggFnEvaluator*>& agg_fn_evals, + MemPool* pool, Status* status) noexcept; + + /// Returns the number of bytes of variable-length data for the grouping values stored + /// in 'ht_ctx_'. + int GroupingExprsVarlenSize(); + + /// Initializes intermediate tuple by copying grouping values stored in 'ht_ctx_' that + /// that were computed over 'current_row_' using 'grouping_expr_evals_'. Writes the + /// var-len data into buffer. 'buffer' points to the start of a buffer of at least the + /// size of the variable-length data: 'varlen_size'. + void CopyGroupingValues(Tuple* intermediate_tuple, uint8_t* buffer, int varlen_size); + + /// Processes a batch of rows. This is the core function of the algorithm. We partition + /// the rows into hash_partitions_, spilling as necessary. + /// If AGGREGATED_ROWS is true, it means that the rows in the batch are already + /// pre-aggregated. + /// 'prefetch_mode' specifies the prefetching mode in use. If it's not PREFETCH_NONE, + /// hash table buckets will be prefetched based on the hash values computed. Note + /// that 'prefetch_mode' will be substituted with constants during codegen time. + // + /// This function is replaced by codegen. We pass in ht_ctx_.get() as an argument for + /// performance. + template <bool AGGREGATED_ROWS> + Status IR_ALWAYS_INLINE AddBatchImpl(RowBatch* batch, TPrefetchMode::type prefetch_mode, + HashTableCtx* ht_ctx) WARN_UNUSED_RESULT; + + /// Evaluates the rows in 'batch' starting at 'start_row_idx' and stores the results in + /// the expression values cache in 'ht_ctx'. The number of rows evaluated depends on + /// the capacity of the cache. 'prefetch_mode' specifies the prefetching mode in use. + /// If it's not PREFETCH_NONE, hash table buckets for the computed hashes will be + /// prefetched. Note that codegen replaces 'prefetch_mode' with a constant. + template <bool AGGREGATED_ROWS> + void EvalAndHashPrefetchGroup(RowBatch* batch, int start_row_idx, + TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx); + + /// This function processes each individual row in AddBatchImpl(). Must be inlined into + /// AddBatchImpl for codegen to substitute function calls with codegen'd versions. + /// May spill partitions if not enough memory is available. + template <bool AGGREGATED_ROWS> + Status IR_ALWAYS_INLINE ProcessRow( + TupleRow* row, HashTableCtx* ht_ctx) WARN_UNUSED_RESULT; + + /// Create a new intermediate tuple in partition, initialized with row. ht_ctx is + /// the context for the partition's hash table and hash is the precomputed hash of + /// the row. The row can be an unaggregated or aggregated row depending on + /// AGGREGATED_ROWS. Spills partitions if necessary to append the new intermediate + /// tuple to the partition's stream. Must be inlined into AddBatchImpl for codegen + /// to substitute function calls with codegen'd versions. insert_it is an iterator + /// for insertion returned from HashTable::FindBuildRowBucket(). + template <bool AGGREGATED_ROWS> + Status IR_ALWAYS_INLINE AddIntermediateTuple(Partition* partition, TupleRow* row, + uint32_t hash, HashTable::Iterator insert_it) WARN_UNUSED_RESULT; + + /// Append a row to a spilled partition. The row may be aggregated or unaggregated + /// according to AGGREGATED_ROWS. May spill partitions if needed to append the row + /// buffers. + template <bool AGGREGATED_ROWS> + Status IR_ALWAYS_INLINE AppendSpilledRow( + Partition* partition, TupleRow* row) WARN_UNUSED_RESULT; + + /// Reads all the rows from input_stream and process them by calling AddBatchImpl(). + template <bool AGGREGATED_ROWS> + Status ProcessStream(BufferedTupleStream* input_stream) WARN_UNUSED_RESULT; + + /// Get rows for the next rowbatch from the next partition. Sets 'partition_eos_' to + /// true if all rows from all partitions have been returned or the limit is reached. + Status GetRowsFromPartition( + RuntimeState* state, RowBatch* row_batch) WARN_UNUSED_RESULT; + + /// Return true if we should keep expanding hash tables in the preagg. If false, + /// the preagg should pass through any rows it can't fit in its tables. + bool ShouldExpandPreaggHashTables() const; + + /// Streaming processing of in_batch from child. Rows from child are either aggregated + /// into the hash table or added to 'out_batch' in the intermediate tuple format. + /// 'in_batch' is processed entirely, and 'out_batch' must have enough capacity to + /// store all of the rows in 'in_batch'. + /// 'needs_serialize' is an argument so that codegen can replace it with a constant, + /// rather than using the member variable 'needs_serialize_'. + /// 'prefetch_mode' specifies the prefetching mode in use. If it's not PREFETCH_NONE, + /// hash table buckets will be prefetched based on the hash values computed. Note + /// that 'prefetch_mode' will be substituted with constants during codegen time. + /// 'remaining_capacity' is an array with PARTITION_FANOUT entries with the number of + /// additional rows that can be added to the hash table per partition. It is updated + /// by AddBatchStreamingImpl() when it inserts new rows. + /// 'ht_ctx' is passed in as a way to avoid aliasing of 'this' confusing the optimiser. + Status AddBatchStreamingImpl(bool needs_serialize, TPrefetchMode::type prefetch_mode, + RowBatch* in_batch, RowBatch* out_batch, HashTableCtx* ht_ctx, + int remaining_capacity[PARTITION_FANOUT]) WARN_UNUSED_RESULT; + + /// Tries to add intermediate to the hash table 'hash_tbl' of 'partition' for streaming + /// aggregation. The input row must have been evaluated with 'ht_ctx', with 'hash' set + /// to the corresponding hash. If the tuple already exists in the hash table, update + /// the tuple and return true. Otherwise try to create a new entry in the hash table, + /// returning true if successful or false if the table is full. 'remaining_capacity' + /// keeps track of how many more entries can be added to the hash table so we can avoid + /// retrying inserts. It is decremented if an insert succeeds and set to zero if an + /// insert fails. If an error occurs, returns false and sets 'status'. + bool IR_ALWAYS_INLINE TryAddToHashTable(HashTableCtx* ht_ctx, Partition* partition, + HashTable* hash_tbl, TupleRow* in_row, uint32_t hash, int* remaining_capacity, + Status* status) WARN_UNUSED_RESULT; + + /// Initializes hash_partitions_. 'level' is the level for the partitions to create. + /// If 'single_partition_idx' is provided, it must be a number in range + /// [0, PARTITION_FANOUT), and only that partition is created - all others point to it. + /// Also sets ht_ctx_'s level to 'level'. + Status CreateHashPartitions( + int level, int single_partition_idx = -1) WARN_UNUSED_RESULT; + + /// Ensure that hash tables for all in-memory partitions are large enough to fit + /// 'num_rows' additional hash table entries. If there is not enough memory to + /// resize the hash tables, may spill partitions. 'aggregated_rows' is true if + /// we're currently partitioning aggregated rows. + Status CheckAndResizeHashPartitions( + bool aggregated_rows, int num_rows, const HashTableCtx* ht_ctx) WARN_UNUSED_RESULT; + + /// Prepares the next partition to return results from. On return, this function + /// initializes output_iterator_ and output_partition_. This either removes + /// a partition from aggregated_partitions_ (and is done) or removes the next + /// partition from aggregated_partitions_ and repartitions it. + Status NextPartition() WARN_UNUSED_RESULT; + + /// Tries to build the first partition in 'spilled_partitions_'. + /// If successful, set *built_partition to the partition. The caller owns the partition + /// and is responsible for closing it. If unsuccessful because the partition could not + /// fit in memory, set *built_partition to NULL and append the spilled partition to the + /// head of 'spilled_partitions_' so it can be processed by + /// RepartitionSpilledPartition(). + Status BuildSpilledPartition(Partition** built_partition) WARN_UNUSED_RESULT; + + /// Repartitions the first partition in 'spilled_partitions_' into PARTITION_FANOUT + /// output partitions. On success, each output partition is either: + /// * closed, if no rows were added to the partition. + /// * in 'spilled_partitions_', if the partition spilled. + /// * in 'aggregated_partitions_', if the output partition was not spilled. + Status RepartitionSpilledPartition() WARN_UNUSED_RESULT; + + /// Picks a partition from 'hash_partitions_' to spill. 'more_aggregate_rows' is passed + /// to Partition::Spill() when spilling the partition. See the Partition::Spill() + /// comment for further explanation. + Status SpillPartition(bool more_aggregate_rows) WARN_UNUSED_RESULT; + + /// Moves the partitions in hash_partitions_ to aggregated_partitions_ or + /// spilled_partitions_. Partitions moved to spilled_partitions_ are unpinned. + /// input_rows is the number of input rows that have been repartitioned. + /// Used for diagnostics. + Status MoveHashPartitions(int64_t input_rows) WARN_UNUSED_RESULT; + + /// Adds a partition to the front of 'spilled_partitions_' for later processing. + /// 'spilled_partitions_' uses LIFO so more finely partitioned partitions are processed + /// first). This allows us to delete pages earlier and bottom out the recursion + /// earlier and also improves time locality of access to spilled data on disk. + void PushSpilledPartition(Partition* partition); + + /// Calls Close() on every Partition in 'aggregated_partitions_', + /// 'spilled_partitions_', and 'hash_partitions_' and then resets the lists, + /// the vector and the partition pool. + void ClosePartitions(); + + /// Calls finalizes on all tuples starting at 'it'. + void CleanupHashTbl( + const std::vector<AggFnEvaluator*>& agg_fn_evals, HashTable::Iterator it); + + /// Clears 'expr_results_pool_' and returns the result of state->CheckQueryState(). + /// Aggregators should call this periodically, e.g. once per input row batch. This + /// should not be called outside the main execution thread. + /// TODO: IMPALA-2399: replace QueryMaintenance() - see JIRA for more details. + Status QueryMaintenance(RuntimeState* state) WARN_UNUSED_RESULT; + + /// Codegen the non-streaming add row batch loop. The loop has already been compiled to + /// IR and loaded into the codegen object. UpdateAggTuple has also been codegen'd to IR. + /// This function will modify the loop subsituting the statically compiled functions + /// with codegen'd ones. 'add_batch_impl_fn_' will be updated with the codegened + // function. + /// Assumes AGGREGATED_ROWS = false. + Status CodegenAddBatchImpl( + LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT; + + /// Codegen the materialization loop for streaming preaggregations. + /// 'add_batch_streaming_impl_fn_' will be updated with the codegened function. + Status CodegenAddBatchStreamingImpl( + LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT; + + /// Compute minimum buffer reservation for grouping aggregations. + /// We need one buffer per partition, which is used either as the write buffer for the + /// aggregated stream or the unaggregated stream. We need an additional buffer to read + /// the stream we are currently repartitioning. The read buffer needs to be a max-sized + /// buffer to hold a max-sized row and we need one max-sized write buffer that is used + /// temporarily to append a row to any stream. + /// + /// If we need to serialize, we need an additional buffer while spilling a partition + /// as the partitions aggregate stream needs to be serialized and rewritten. + /// We do not spill streaming preaggregations, so we do not need to reserve any buffers. + int64_t MinReservation() const; +}; +} // namespace impala + +#endif // IMPALA_EXEC_GROUPING_AGGREGATOR_H http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/non-grouping-aggregator-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/non-grouping-aggregator-ir.cc b/be/src/exec/non-grouping-aggregator-ir.cc new file mode 100644 index 0000000..335cbc7 --- /dev/null +++ b/be/src/exec/non-grouping-aggregator-ir.cc @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/non-grouping-aggregator.h" + +#include "runtime/row-batch.h" + +using namespace impala; + +Status NonGroupingAggregator::AddBatchImpl(RowBatch* batch) { + Tuple* output_tuple = singleton_output_tuple_; + FOREACH_ROW(batch, 0, batch_iter) { + UpdateTuple(agg_fn_evals_.data(), output_tuple, batch_iter.Get()); + } + return Status::OK(); +}
