IMPALA-110 (part 2): Refactor PartitionedAggregationNode This patch refactors PartitionedAggregationNode in preparation for supporting multiple distinct operators in a query.
The primary goal of the refactor is to separate out the core aggregation functionality into a new type of object called an Aggregator. For now, each aggregation ExecNode will contain a single Aggregator. Then, future patches will extend the aggregation ExecNode to support taking a single input and processing it with multiple Aggregators, allowing us to support more exotic combinations of aggregate functions and groupings. Specifically, this patch splits PartitionedAggregationNode into five new classes: - Aggregator: a superclass containing the functionality that's shared between GroupingAggregator and NonGroupingAggregator. - GroupingAggregator: this class contains the bulk of the interesting aggregation code, including everything related to creating and updating partitions and hash tables, spilling, etc. - NonGroupingAggregator: this class handles the case of aggregations that don't have grouping exprs. Since these aggregations always result in just a single output row, the functionality here is relatively simple (eg. no spilling or streaming). - StreamingAggregationNode: this node performs a streaming preaggregation, where the input is retrieved from the child during GetNext() and passed to the GroupingAggregator (non-grouping do not support streaming) Eventually, we'll support a list of GroupingAggregators. - AggregationNode: this node performs a final aggregation, where the input is retrieved from the child during Open() and passed to the Aggregator. Currently the Aggregator can be either grouping or non-grouping. Eventually we'll support a list of GroupingAggregator and/or a single NonGroupingAggregator. Testing: - Passed a full exhaustive run. Change-Id: I9e7bb583f54aa4add3738bde7f57cf3511ac567e Reviewed-on: http://gerrit.cloudera.org:8080/10394 Reviewed-by: Thomas Marshall <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/010321d4 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/010321d4 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/010321d4 Branch: refs/heads/master Commit: 010321d404a9ad5b7338eeb24a4e9ac576cf4dff Parents: dde9308 Author: Thomas Tauber-Marshall <[email protected]> Authored: Thu May 10 19:39:57 2018 +0000 Committer: Impala Public Jenkins <[email protected]> Committed: Mon Jul 2 21:07:35 2018 +0000 ---------------------------------------------------------------------- be/src/codegen/gen_ir_descriptions.py | 14 +- be/src/codegen/impala-ir.cc | 3 +- be/src/exec/CMakeLists.txt | 10 +- be/src/exec/aggregation-node.cc | 132 ++ be/src/exec/aggregation-node.h | 60 + be/src/exec/aggregator.cc | 609 ++++++ be/src/exec/aggregator.h | 211 ++ be/src/exec/exec-node.cc | 9 +- be/src/exec/exec-node.h | 9 +- be/src/exec/grouping-aggregator-ir.cc | 241 +++ be/src/exec/grouping-aggregator-partition.cc | 218 +++ be/src/exec/grouping-aggregator.cc | 1098 +++++++++++ be/src/exec/grouping-aggregator.h | 624 ++++++ be/src/exec/non-grouping-aggregator-ir.cc | 30 + be/src/exec/non-grouping-aggregator.cc | 174 ++ be/src/exec/non-grouping-aggregator.h | 111 ++ be/src/exec/partitioned-aggregation-node-ir.cc | 253 --- be/src/exec/partitioned-aggregation-node.cc | 1955 ------------------- be/src/exec/partitioned-aggregation-node.h | 734 ------- be/src/exec/streaming-aggregation-node.cc | 153 ++ be/src/exec/streaming-aggregation-node.h | 85 + 21 files changed, 3774 insertions(+), 2959 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/codegen/gen_ir_descriptions.py ---------------------------------------------------------------------- diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py index dd2df9e..99a97ae 100755 --- a/be/src/codegen/gen_ir_descriptions.py +++ b/be/src/codegen/gen_ir_descriptions.py @@ -47,14 +47,12 @@ ir_functions = [ "_ZNK6impala14AggFnEvaluator11input_evalsEv"], ["AGG_FN_EVALUATOR_AGG_FN_CTX", "_ZNK6impala14AggFnEvaluator10agg_fn_ctxEv"], - ["PART_AGG_NODE_PROCESS_BATCH_UNAGGREGATED", - "_ZN6impala26PartitionedAggregationNode12ProcessBatchILb0EEENS_6StatusEPNS_8RowBatchENS_13TPrefetchMode4typeEPNS_12HashTableCtxE"], - ["PART_AGG_NODE_PROCESS_BATCH_AGGREGATED", - "_ZN6impala26PartitionedAggregationNode12ProcessBatchILb1EEENS_6StatusEPNS_8RowBatchENS_13TPrefetchMode4typeEPNS_12HashTableCtxE"], - ["PART_AGG_NODE_PROCESS_BATCH_NO_GROUPING", - "_ZN6impala26PartitionedAggregationNode22ProcessBatchNoGroupingEPNS_8RowBatchE"], - ["PART_AGG_NODE_PROCESS_BATCH_STREAMING", - "_ZN6impala26PartitionedAggregationNode21ProcessBatchStreamingEbNS_13TPrefetchMode4typeEPNS_8RowBatchES4_PNS_12HashTableCtxEPi"], + ["GROUPING_AGG_ADD_BATCH_IMPL", + "_ZN6impala18GroupingAggregator12AddBatchImplILb0EEENS_6StatusEPNS_8RowBatchENS_13TPrefetchMode4typeEPNS_12HashTableCtxE"], + ["NON_GROUPING_AGG_ADD_BATCH_IMPL", + "_ZN6impala21NonGroupingAggregator12AddBatchImplEPNS_8RowBatchE"], + ["GROUPING_AGG_ADD_BATCH_STREAMING_IMPL", + "_ZN6impala18GroupingAggregator21AddBatchStreamingImplEbNS_13TPrefetchMode4typeEPNS_8RowBatchES4_PNS_12HashTableCtxEPi"], ["AVG_UPDATE_BIGINT", "_ZN6impala18AggregateFunctions9AvgUpdateIN10impala_udf9BigIntValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE"], ["AVG_UPDATE_DOUBLE", http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/codegen/impala-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc index 9c5b3eb..0fa4fe9 100644 --- a/be/src/codegen/impala-ir.cc +++ b/be/src/codegen/impala-ir.cc @@ -26,11 +26,12 @@ #pragma clang diagnostic ignored "-Wheader-hygiene" #include "codegen/codegen-anyval-ir.cc" +#include "exec/grouping-aggregator-ir.cc" #include "exec/hash-table-ir.cc" #include "exec/hdfs-avro-scanner-ir.cc" #include "exec/hdfs-parquet-scanner-ir.cc" #include "exec/hdfs-scanner-ir.cc" -#include "exec/partitioned-aggregation-node-ir.cc" +#include "exec/non-grouping-aggregator-ir.cc" #include "exec/partitioned-hash-join-builder-ir.cc" #include "exec/partitioned-hash-join-node-ir.cc" #include "exec/select-node-ir.cc" http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 77c6e15..1753cb0 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -25,6 +25,8 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec") set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec") add_library(Exec + aggregation-node.cc + aggregator.cc analytic-eval-node.cc base-sequence-scanner.cc blocking-join-node.cc @@ -38,6 +40,9 @@ add_library(Exec exchange-node.cc external-data-source-executor.cc filter-context.cc + grouping-aggregator.cc + grouping-aggregator-ir.cc + grouping-aggregator-partition.cc hash-table.cc hbase-table-sink.cc hbase-table-writer.cc @@ -66,12 +71,12 @@ add_library(Exec incr-stats-util.cc nested-loop-join-builder.cc nested-loop-join-node.cc + non-grouping-aggregator.cc + non-grouping-aggregator-ir.cc parquet-column-readers.cc parquet-column-stats.cc parquet-metadata-utils.cc partial-sort-node.cc - partitioned-aggregation-node.cc - partitioned-aggregation-node-ir.cc partitioned-hash-join-builder.cc partitioned-hash-join-builder-ir.cc partitioned-hash-join-node.cc @@ -91,6 +96,7 @@ add_library(Exec select-node-ir.cc singular-row-src-node.cc sort-node.cc + streaming-aggregation-node.cc subplan-node.cc text-converter.cc topn-node.cc http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/aggregation-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc new file mode 100644 index 0000000..d25284d --- /dev/null +++ b/be/src/exec/aggregation-node.cc @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/aggregation-node.h" + +#include <sstream> + +#include "exec/grouping-aggregator.h" +#include "exec/non-grouping-aggregator.h" +#include "gutil/strings/substitute.h" +#include "runtime/row-batch.h" +#include "runtime/runtime-state.h" +#include "util/debug-util.h" +#include "util/runtime-profile-counters.h" + +#include "gen-cpp/PlanNodes_types.h" + +#include "common/names.h" + +namespace impala { + +AggregationNode::AggregationNode( + ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + : ExecNode(pool, tnode, descs) {} + +Status AggregationNode::Init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(ExecNode::Init(tnode, state)); + if (tnode.agg_node.grouping_exprs.empty()) { + aggregator_.reset(new NonGroupingAggregator(this, pool_, tnode, state->desc_tbl())); + } else { + aggregator_.reset(new GroupingAggregator(this, pool_, tnode, state->desc_tbl())); + } + RETURN_IF_ERROR(aggregator_->Init(tnode, state)); + runtime_profile_->AddChild(aggregator_->runtime_profile()); + return Status::OK(); +} + +Status AggregationNode::Prepare(RuntimeState* state) { + SCOPED_TIMER(runtime_profile_->total_time_counter()); + RETURN_IF_ERROR(ExecNode::Prepare(state)); + aggregator_->SetDebugOptions(debug_options_); + RETURN_IF_ERROR(aggregator_->Prepare(state)); + state->CheckAndAddCodegenDisabledMessage(runtime_profile()); + return Status::OK(); +} + +void AggregationNode::Codegen(RuntimeState* state) { + DCHECK(state->ShouldCodegen()); + ExecNode::Codegen(state); + if (IsNodeCodegenDisabled()) return; + aggregator_->Codegen(state); +} + +Status AggregationNode::Open(RuntimeState* state) { + SCOPED_TIMER(runtime_profile_->total_time_counter()); + // Open the child before consuming resources in this node. + RETURN_IF_ERROR(child(0)->Open(state)); + RETURN_IF_ERROR(ExecNode::Open(state)); + + RETURN_IF_ERROR(aggregator_->Open(state)); + + RowBatch batch(child(0)->row_desc(), state->batch_size(), mem_tracker()); + // Read all the rows from the child and process them. + bool eos = false; + do { + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(QueryMaintenance(state)); + RETURN_IF_ERROR(children_[0]->GetNext(state, &batch, &eos)); + RETURN_IF_ERROR(aggregator_->AddBatch(state, &batch)); + batch.Reset(); + } while (!eos); + + // The child can be closed at this point in most cases because we have consumed all of + // the input from the child and transfered ownership of the resources we need. The + // exception is if we are inside a subplan expecting to call Open()/GetNext() on the + // child again, + if (!IsInSubplan()) child(0)->Close(state); + + RETURN_IF_ERROR(aggregator_->InputDone()); + return Status::OK(); +} + +Status AggregationNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { + SCOPED_TIMER(runtime_profile_->total_time_counter()); + RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state)); + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(QueryMaintenance(state)); + + if (ReachedLimit()) { + *eos = true; + return Status::OK(); + } + + RETURN_IF_ERROR(aggregator_->GetNext(state, row_batch, eos)); + num_rows_returned_ += row_batch->num_rows(); + COUNTER_SET(rows_returned_counter_, num_rows_returned_); + return Status::OK(); +} + +Status AggregationNode::Reset(RuntimeState* state) { + RETURN_IF_ERROR(aggregator_->Reset(state)); + return ExecNode::Reset(state); +} + +void AggregationNode::Close(RuntimeState* state) { + if (is_closed()) return; + aggregator_->Close(state); + ExecNode::Close(state); +} + +void AggregationNode::DebugString(int indentation_level, stringstream* out) const { + *out << string(indentation_level * 2, ' '); + *out << "AggregationNode(" + << "aggregator=" << aggregator_->DebugString(); + ExecNode::DebugString(indentation_level, out); + *out << ")"; +} +} // namespace impala http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/aggregation-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/aggregation-node.h b/be/src/exec/aggregation-node.h new file mode 100644 index 0000000..527a62d --- /dev/null +++ b/be/src/exec/aggregation-node.h @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_EXEC_AGGREGATION_NODE_H +#define IMPALA_EXEC_AGGREGATION_NODE_H + +#include <memory> + +#include "exec/aggregator.h" +#include "exec/exec-node.h" + +namespace impala { + +class RowBatch; +class RuntimeState; + +/// Node for doing partitioned hash aggregation. +/// This node consumes the input from child(0) during Open() and then passes it to the +/// Aggregator, which does the actual work of aggregating. +class AggregationNode : public ExecNode { + public: + AggregationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + + virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override; + virtual Status Prepare(RuntimeState* state) override; + virtual void Codegen(RuntimeState* state) override; + virtual Status Open(RuntimeState* state) override; + virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override; + virtual Status Reset(RuntimeState* state) override; + virtual void Close(RuntimeState* state) override; + + virtual void DebugString(int indentation_level, std::stringstream* out) const override; + + private: + ///////////////////////////////////////// + /// BEGIN: Members that must be Reset() + + /// Performs the actual work of aggregating input rows. + std::unique_ptr<Aggregator> aggregator_; + + /// END: Members that must be Reset() + ///////////////////////////////////////// +}; +} // namespace impala + +#endif // IMPALA_EXEC_AGGREGATION_NODE_H http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/aggregator.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/aggregator.cc b/be/src/exec/aggregator.cc new file mode 100644 index 0000000..70178cc --- /dev/null +++ b/be/src/exec/aggregator.cc @@ -0,0 +1,609 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/aggregator.h" + +#include <sstream> + +#include "codegen/codegen-anyval.h" +#include "codegen/llvm-codegen.h" +#include "exec/exec-node.h" +#include "exprs/agg-fn-evaluator.h" +#include "exprs/expr-value.h" +#include "exprs/scalar-expr.h" +#include "gutil/strings/substitute.h" +#include "runtime/descriptors.h" +#include "runtime/mem-pool.h" +#include "runtime/mem-tracker.h" +#include "runtime/raw-value.h" +#include "runtime/runtime-state.h" +#include "runtime/tuple-row.h" +#include "runtime/tuple.h" +#include "util/runtime-profile-counters.h" + +#include "gen-cpp/PlanNodes_types.h" + +#include "common/names.h" + +namespace impala { + +const char* Aggregator::LLVM_CLASS_NAME = "class.impala::Aggregator"; + +Aggregator::Aggregator(ExecNode* exec_node, ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs, const std::string& name) + : id_(exec_node->id()), + pool_(pool), + intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id), + intermediate_tuple_desc_(descs.GetTupleDescriptor(intermediate_tuple_id_)), + output_tuple_id_(tnode.agg_node.output_tuple_id), + output_tuple_desc_(descs.GetTupleDescriptor(output_tuple_id_)), + row_desc_(*exec_node->row_desc()), + input_row_desc_(*exec_node->child(0)->row_desc()), + needs_finalize_(tnode.agg_node.need_finalize), + runtime_profile_(RuntimeProfile::Create(pool_, name)), + num_rows_returned_(0), + rows_returned_counter_(nullptr), + build_timer_(nullptr) {} + +Aggregator::~Aggregator() {} + +Status Aggregator::Init(const TPlanNode& tnode, RuntimeState* state) { + DCHECK(intermediate_tuple_desc_ != nullptr); + DCHECK(output_tuple_desc_ != nullptr); + DCHECK_EQ(intermediate_tuple_desc_->slots().size(), output_tuple_desc_->slots().size()); + + int j = num_grouping_exprs(); + for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i, ++j) { + SlotDescriptor* intermediate_slot_desc = intermediate_tuple_desc_->slots()[j]; + SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j]; + AggFn* agg_fn; + RETURN_IF_ERROR(AggFn::Create(tnode.agg_node.aggregate_functions[i], input_row_desc_, + *intermediate_slot_desc, *output_slot_desc, state, &agg_fn)); + agg_fns_.push_back(agg_fn); + } + + RETURN_IF_ERROR(ScalarExpr::Create(tnode.conjuncts, row_desc_, state, &conjuncts_)); + return Status::OK(); +} + +Status Aggregator::Prepare(RuntimeState* state) { + mem_tracker_.reset(new MemTracker( + runtime_profile_, -1, runtime_profile_->name(), state->instance_mem_tracker())); + expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker_.get(), false)); + expr_perm_pool_.reset(new MemPool(expr_mem_tracker_.get())); + expr_results_pool_.reset(new MemPool(expr_mem_tracker_.get())); + + RETURN_IF_ERROR(AggFnEvaluator::Create(agg_fns_, state, pool_, expr_perm_pool_.get(), + expr_results_pool_.get(), &agg_fn_evals_)); + RETURN_IF_ERROR(ScalarExprEvaluator::Create(conjuncts_, state, pool_, + expr_perm_pool_.get(), expr_results_pool_.get(), &conjunct_evals_)); + DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size()); + + rows_returned_counter_ = ADD_COUNTER(runtime_profile_, "RowsReturned", TUnit::UNIT); + build_timer_ = ADD_TIMER(runtime_profile(), "BuildTime"); + + return Status::OK(); +} + +Status Aggregator::Open(RuntimeState* state) { + RETURN_IF_ERROR(AggFnEvaluator::Open(agg_fn_evals_, state)); + RETURN_IF_ERROR(ScalarExprEvaluator::Open(conjunct_evals_, state)); + return Status::OK(); +} + +void Aggregator::Close(RuntimeState* state) { + // Close all the agg-fn-evaluators + AggFnEvaluator::Close(agg_fn_evals_, state); + AggFn::Close(agg_fns_); + ScalarExprEvaluator::Close(conjunct_evals_, state); + ScalarExpr::Close(conjuncts_); + + if (expr_perm_pool_.get() != nullptr) expr_perm_pool_->FreeAll(); + if (expr_results_pool_.get() != nullptr) expr_results_pool_->FreeAll(); + if (expr_mem_tracker_.get() != nullptr) expr_mem_tracker_->Close(); + if (mem_tracker_.get() != nullptr) mem_tracker_->Close(); +} + +// TODO: codegen this function. +void Aggregator::InitAggSlots( + const vector<AggFnEvaluator*>& agg_fn_evals, Tuple* intermediate_tuple) { + vector<SlotDescriptor*>::const_iterator slot_desc = + intermediate_tuple_desc_->slots().begin() + num_grouping_exprs(); + for (int i = 0; i < agg_fn_evals.size(); ++i, ++slot_desc) { + // To minimize branching on the UpdateTuple path, initialize the result value so that + // the Add() UDA function can ignore the NULL bit of its destination value. E.g. for + // SUM(), if we initialize the destination value to 0 (with the NULL bit set), we can + // just start adding to the destination value (rather than repeatedly checking the + // destination NULL bit. The codegen'd version of UpdateSlot() exploits this to + // eliminate a branch per value. + // + // For boolean and numeric types, the default values are false/0, so the nullable + // aggregate functions SUM() and AVG() produce the correct result. For MIN()/MAX(), + // initialize the value to max/min possible value for the same effect. + AggFnEvaluator* eval = agg_fn_evals[i]; + eval->Init(intermediate_tuple); + + DCHECK(agg_fns_[i] == &(eval->agg_fn())); + const AggFn* agg_fn = agg_fns_[i]; + const AggFn::AggregationOp agg_op = agg_fn->agg_op(); + if ((agg_op == AggFn::MIN || agg_op == AggFn::MAX) + && !agg_fn->intermediate_type().IsStringType() + && !agg_fn->intermediate_type().IsTimestampType()) { + ExprValue default_value; + void* default_value_ptr = nullptr; + if (agg_op == AggFn::MIN) { + default_value_ptr = default_value.SetToMax((*slot_desc)->type()); + } else { + DCHECK_EQ(agg_op, AggFn::MAX); + default_value_ptr = default_value.SetToMin((*slot_desc)->type()); + } + RawValue::Write(default_value_ptr, intermediate_tuple, *slot_desc, nullptr); + } + } +} + +void Aggregator::UpdateTuple( + AggFnEvaluator** agg_fn_evals, Tuple* tuple, TupleRow* row, bool is_merge) noexcept { + DCHECK(tuple != nullptr || agg_fns_.empty()); + for (int i = 0; i < agg_fns_.size(); ++i) { + if (is_merge) { + agg_fn_evals[i]->Merge(row->GetTuple(0), tuple); + } else { + agg_fn_evals[i]->Add(row, tuple); + } + } +} + +Tuple* Aggregator::GetOutputTuple( + const vector<AggFnEvaluator*>& agg_fn_evals, Tuple* tuple, MemPool* pool) { + DCHECK(tuple != nullptr || agg_fn_evals.empty()) << tuple; + Tuple* dst = tuple; + if (needs_finalize_ && intermediate_tuple_id_ != output_tuple_id_) { + dst = Tuple::Create(output_tuple_desc_->byte_size(), pool); + } + if (needs_finalize_) { + AggFnEvaluator::Finalize(agg_fn_evals, tuple, dst); + } else { + AggFnEvaluator::Serialize(agg_fn_evals, tuple); + } + // Copy grouping values from tuple to dst. + // TODO: Codegen this. + if (dst != tuple) { + int num_grouping_slots = num_grouping_exprs(); + for (int i = 0; i < num_grouping_slots; ++i) { + SlotDescriptor* src_slot_desc = intermediate_tuple_desc_->slots()[i]; + SlotDescriptor* dst_slot_desc = output_tuple_desc_->slots()[i]; + bool src_slot_null = tuple->IsNull(src_slot_desc->null_indicator_offset()); + void* src_slot = nullptr; + if (!src_slot_null) src_slot = tuple->GetSlot(src_slot_desc->tuple_offset()); + RawValue::Write(src_slot, dst, dst_slot_desc, nullptr); + } + } + return dst; +} + +// IR Generation for updating a single aggregation slot. Signature is: +// void UpdateSlot(AggFnEvaluator* agg_expr_eval, AggTuple* agg_tuple, char** row) +// +// The IR for sum(double_col), which is constructed directly with the IRBuilder, is: +// +// define void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval, +// <{ double, i8 }>* %agg_tuple, %"class.impala::TupleRow"* %row) #33 { +// entry: +// %input_evals_vector = call %"class.impala::ScalarExprEvaluator"** +// @_ZNK6impala14AggFnEvaluator11input_evalsEv( +// %"class.impala::AggFnEvaluator"* %agg_fn_eval) +// %0 = getelementptr %"class.impala::ScalarExprEvaluator"*, +// %"class.impala::ScalarExprEvaluator"** %input_evals_vector, i32 0 +// %input_eval = load %"class.impala::ScalarExprEvaluator"*, +// %"class.impala::ScalarExprEvaluator"** %0 +// %input0 = call { i8, double } @GetSlotRef(%"class.impala::ScalarExprEvaluator"* +// %input_eval, %"class.impala::TupleRow"* %row) +// %dst_slot_ptr = getelementptr inbounds <{ double, i8 }>, +// <{ double, i8 }>* %agg_tuple, i32 0, i32 0 +// %dst_val = load double, double* %dst_slot_ptr +// %1 = extractvalue { i8, double } %input0, 0 +// %is_null = trunc i8 %1 to i1 +// br i1 %is_null, label %ret, label %not_null +// +// ret: ; preds = %not_null, %entry +// ret void +// +// not_null: ; preds = %entry +// %val = extractvalue { i8, double } %input0, 1 +// %2 = fadd double %dst_val, %val +// %3 = bitcast <{ double, i8 }>* %agg_tuple to i8* +// %null_byte_ptr = getelementptr inbounds i8, i8* %3, i32 8 +// %null_byte = load i8, i8* %null_byte_ptr +// %null_bit_cleared = and i8 %null_byte, -2 +// store i8 %null_bit_cleared, i8* %null_byte_ptr +// store double %2, double* %dst_slot_ptr +// br label %ret +// } +// +// The IR for ndv(timestamp_col), which uses the UDA interface, is: +// +// define void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval, +// <{ [1024 x i8] }>* %agg_tuple, +// %"class.impala::TupleRow"* %row) #39 { +// entry: +// %dst_lowered_ptr = alloca { i64, i8* } +// %0 = alloca { i64, i64 } +// %input_evals_vector = call %"class.impala::ScalarExprEvaluator"** +// @_ZNK6impala14AggFnEvaluator11input_evalsEv( +// %"class.impala::AggFnEvaluator"* %agg_fn_eval) +// %1 = getelementptr %"class.impala::ScalarExprEvaluator"*, +// %"class.impala::ScalarExprEvaluator"** %input_evals_vector, i32 0 +// %input_eval = load %"class.impala::ScalarExprEvaluator"*, +// %"class.impala::ScalarExprEvaluator"** %1 +// %input0 = call { i64, i64 } @GetSlotRef( +// %"class.impala::ScalarExprEvaluator"* %input_eval, +// %"class.impala::TupleRow"* %row) +// %dst_slot_ptr = getelementptr inbounds <{ [1024 x i8] }>, +// <{ [1024 x i8] }>* %agg_tuple, i32 0, i32 0 +// %2 = bitcast [1024 x i8]* %dst_slot_ptr to i8* +// %dst = insertvalue { i64, i8* } zeroinitializer, i8* %2, 1 +// %3 = extractvalue { i64, i8* } %dst, 0 +// %4 = and i64 %3, 4294967295 +// %5 = or i64 %4, 4398046511104 +// %dst1 = insertvalue { i64, i8* } %dst, i64 %5, 0 +// %agg_fn_ctx = call %"class.impala_udf::FunctionContext"* +// @_ZNK6impala14AggFnEvaluator10agg_fn_ctxEv( +// %"class.impala::AggFnEvaluator"* %agg_fn_eval) +// store { i64, i64 } %input0, { i64, i64 }* %0 +// %input_unlowered_ptr = +// bitcast { i64, i64 }* %0 to %"struct.impala_udf::TimestampVal"* +// store { i64, i8* } %dst1, { i64, i8* }* %dst_lowered_ptr +// %dst_unlowered_ptr = +// bitcast { i64, i8* }* %dst_lowered_ptr to %"struct.impala_udf::StringVal"* +// call void @"void impala::AggregateFunctions::HllUpdate<impala_udf::TimestampVal>"( +// %"class.impala_udf::FunctionContext"* %agg_fn_ctx, +// %"struct.impala_udf::TimestampVal"* %input_unlowered_ptr, +// %"struct.impala_udf::StringVal"* %dst_unlowered_ptr) +// %anyval_result = load { i64, i8* }, { i64, i8* }* %dst_lowered_ptr +// br label %ret +// +// ret: ; preds = %entry +// ret void +// } +// +Status Aggregator::CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx, + SlotDescriptor* slot_desc, llvm::Function** fn) { + llvm::PointerType* agg_fn_eval_type = codegen->GetStructPtrType<AggFnEvaluator>(); + llvm::StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen); + if (tuple_struct == nullptr) { + return Status("Aggregator::CodegenUpdateSlot(): failed to generate " + "intermediate tuple desc"); + } + llvm::PointerType* tuple_ptr_type = codegen->GetPtrType(tuple_struct); + llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>(); + + LlvmCodeGen::FnPrototype prototype(codegen, "UpdateSlot", codegen->void_type()); + prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_fn_eval", agg_fn_eval_type)); + prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_tuple", tuple_ptr_type)); + prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type)); + + LlvmBuilder builder(codegen->context()); + llvm::Value* args[3]; + *fn = prototype.GeneratePrototype(&builder, &args[0]); + llvm::Value* agg_fn_eval_arg = args[0]; + llvm::Value* agg_tuple_arg = args[1]; + llvm::Value* row_arg = args[2]; + + // Get the vector of input expressions' evaluators. + llvm::Value* input_evals_vector = codegen->CodegenCallFunction(&builder, + IRFunction::AGG_FN_EVALUATOR_INPUT_EVALUATORS, agg_fn_eval_arg, + "input_evals_vector"); + + AggFn* agg_fn = agg_fns_[agg_fn_idx]; + const int num_inputs = agg_fn->GetNumChildren(); + DCHECK_GE(num_inputs, 1); + vector<CodegenAnyVal> input_vals; + for (int i = 0; i < num_inputs; ++i) { + ScalarExpr* input_expr = agg_fn->GetChild(i); + llvm::Function* input_expr_fn; + RETURN_IF_ERROR(input_expr->GetCodegendComputeFn(codegen, &input_expr_fn)); + DCHECK(input_expr_fn != nullptr); + + // Call input expr function with the matching evaluator to get src slot value. + llvm::Value* input_eval = + codegen->CodegenArrayAt(&builder, input_evals_vector, i, "input_eval"); + string input_name = Substitute("input$0", i); + CodegenAnyVal input_val = CodegenAnyVal::CreateCallWrapped(codegen, &builder, + input_expr->type(), input_expr_fn, + llvm::ArrayRef<llvm::Value*>({input_eval, row_arg}), input_name.c_str()); + input_vals.push_back(input_val); + } + + AggFn::AggregationOp agg_op = agg_fn->agg_op(); + const ColumnType& dst_type = agg_fn->intermediate_type(); + bool dst_is_int_or_float_or_bool = dst_type.IsIntegerType() + || dst_type.IsFloatingPointType() || dst_type.IsBooleanType(); + bool dst_is_numeric_or_bool = dst_is_int_or_float_or_bool || dst_type.IsDecimalType(); + + llvm::BasicBlock* ret_block = llvm::BasicBlock::Create(codegen->context(), "ret", *fn); + + // Emit the code to compute 'result' and set the NULL indicator if needed. First check + // for special cases where we can emit a very simple instruction sequence, then fall + // back to the general-purpose approach of calling the cross-compiled builtin UDA. + CodegenAnyVal& src = input_vals[0]; + + // 'dst_slot_ptr' points to the slot in the aggregate tuple to update. + llvm::Value* dst_slot_ptr = builder.CreateStructGEP( + nullptr, agg_tuple_arg, slot_desc->llvm_field_idx(), "dst_slot_ptr"); + // TODO: consider moving the following codegen logic to AggFn. + if (agg_op == AggFn::COUNT) { + src.CodegenBranchIfNull(&builder, ret_block); + llvm::Value* dst_value = builder.CreateLoad(dst_slot_ptr, "dst_val"); + llvm::Value* result = agg_fn->is_merge() ? + builder.CreateAdd(dst_value, src.GetVal(), "count_sum") : + builder.CreateAdd(dst_value, codegen->GetI64Constant(1), "count_inc"); + builder.CreateStore(result, dst_slot_ptr); + DCHECK(!slot_desc->is_nullable()); + } else if ((agg_op == AggFn::MIN || agg_op == AggFn::MAX) && dst_is_numeric_or_bool) { + bool is_min = agg_op == AggFn::MIN; + src.CodegenBranchIfNull(&builder, ret_block); + codegen->CodegenMinMax( + &builder, slot_desc->type(), src.GetVal(), dst_slot_ptr, is_min, *fn); + + // Dst may have been NULL, make sure to unset the NULL bit. + DCHECK(slot_desc->is_nullable()); + slot_desc->CodegenSetNullIndicator( + codegen, &builder, agg_tuple_arg, codegen->false_value()); + } else if (agg_op == AggFn::SUM && dst_is_int_or_float_or_bool) { + src.CodegenBranchIfNull(&builder, ret_block); + llvm::Value* dst_value = builder.CreateLoad(dst_slot_ptr, "dst_val"); + llvm::Value* result = dst_type.IsFloatingPointType() ? + builder.CreateFAdd(dst_value, src.GetVal()) : + builder.CreateAdd(dst_value, src.GetVal()); + builder.CreateStore(result, dst_slot_ptr); + + if (slot_desc->is_nullable()) { + slot_desc->CodegenSetNullIndicator( + codegen, &builder, agg_tuple_arg, codegen->false_value()); + } else { + // 'slot_desc' is not nullable if the aggregate function is sum_init_zero(), + // because the slot is initialized to be zero and the null bit is nonexistent. + DCHECK_EQ(agg_fn->fn_name(), "sum_init_zero"); + } + } else { + // The remaining cases are implemented using the UDA interface. + // Create intermediate argument 'dst' from 'dst_value' + CodegenAnyVal dst = CodegenAnyVal::GetNonNullVal(codegen, &builder, dst_type, "dst"); + + // For a subset of builtins we generate a different code sequence that exploits two + // properties of the builtins. First, NULL input values can be skipped. Second, the + // value of the slot was initialized in the right way in InitAggSlots() (e.g. 0 for + // SUM) that we get the right result if UpdateSlot() pretends that the NULL bit of + // 'dst' is unset. Empirically this optimisation makes TPC-H Q1 5-10% faster. + bool special_null_handling = !agg_fn->intermediate_type().IsStringType() + && !agg_fn->intermediate_type().IsTimestampType() + && (agg_op == AggFn::MIN || agg_op == AggFn::MAX || agg_op == AggFn::SUM + || agg_op == AggFn::AVG || agg_op == AggFn::NDV); + if (slot_desc->is_nullable()) { + if (special_null_handling) { + src.CodegenBranchIfNull(&builder, ret_block); + slot_desc->CodegenSetNullIndicator( + codegen, &builder, agg_tuple_arg, codegen->false_value()); + } else { + dst.SetIsNull(slot_desc->CodegenIsNull(codegen, &builder, agg_tuple_arg)); + } + } + dst.LoadFromNativePtr(dst_slot_ptr); + + // Get the FunctionContext object for the AggFnEvaluator. + llvm::Function* get_agg_fn_ctx_fn = + codegen->GetFunction(IRFunction::AGG_FN_EVALUATOR_AGG_FN_CTX, false); + DCHECK(get_agg_fn_ctx_fn != nullptr); + llvm::Value* agg_fn_ctx_val = + builder.CreateCall(get_agg_fn_ctx_fn, {agg_fn_eval_arg}, "agg_fn_ctx"); + + // Call the UDA to update/merge 'src' into 'dst', with the result stored in + // 'updated_dst_val'. + CodegenAnyVal updated_dst_val; + RETURN_IF_ERROR(CodegenCallUda( + codegen, &builder, agg_fn, agg_fn_ctx_val, input_vals, dst, &updated_dst_val)); + // Copy the value back to the slot. In the FIXED_UDA_INTERMEDIATE case, the + // UDA function writes directly to the slot so there is nothing to copy. + if (dst_type.type != TYPE_FIXED_UDA_INTERMEDIATE) { + updated_dst_val.StoreToNativePtr(dst_slot_ptr); + } + + if (slot_desc->is_nullable() && !special_null_handling) { + // Set NULL bit in the slot based on the return value. + llvm::Value* result_is_null = updated_dst_val.GetIsNull("result_is_null"); + slot_desc->CodegenSetNullIndicator( + codegen, &builder, agg_tuple_arg, result_is_null); + } + } + builder.CreateBr(ret_block); + + builder.SetInsertPoint(ret_block); + builder.CreateRetVoid(); + + // Avoid producing huge UpdateTuple() function after inlining - LLVM's optimiser + // memory/CPU usage scales super-linearly with function size. + // E.g. compute stats on all columns of a 1000-column table previously took 4 minutes to + // codegen because all the UpdateSlot() functions were inlined. + if (agg_fn_idx >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) { + codegen->SetNoInline(*fn); + } + + *fn = codegen->FinalizeFunction(*fn); + if (*fn == nullptr) { + return Status("Aggregator::CodegenUpdateSlot(): codegen'd " + "UpdateSlot() function failed verification, see log"); + } + return Status::OK(); +} + +Status Aggregator::CodegenCallUda(LlvmCodeGen* codegen, LlvmBuilder* builder, + AggFn* agg_fn, llvm::Value* agg_fn_ctx_val, const vector<CodegenAnyVal>& input_vals, + const CodegenAnyVal& dst_val, CodegenAnyVal* updated_dst_val) { + llvm::Function* uda_fn; + RETURN_IF_ERROR(agg_fn->CodegenUpdateOrMergeFunction(codegen, &uda_fn)); + + // Set up arguments for call to UDA, which are the FunctionContext*, followed by + // pointers to all input values, followed by a pointer to the destination value. + vector<llvm::Value*> uda_fn_args; + uda_fn_args.push_back(agg_fn_ctx_val); + + // Create pointers to input args to pass to uda_fn. We must use the unlowered type, + // e.g. IntVal, because the UDA interface expects the values to be passed as const + // references to the classes. + DCHECK_EQ(agg_fn->GetNumChildren(), input_vals.size()); + for (int i = 0; i < input_vals.size(); ++i) { + uda_fn_args.push_back(input_vals[i].GetUnloweredPtr("input_unlowered_ptr")); + } + + // Create pointer to dst to pass to uda_fn. We must use the unlowered type for the + // same reason as above. + llvm::Value* dst_lowered_ptr = dst_val.GetLoweredPtr("dst_lowered_ptr"); + const ColumnType& dst_type = agg_fn->intermediate_type(); + llvm::Type* dst_unlowered_ptr_type = + CodegenAnyVal::GetUnloweredPtrType(codegen, dst_type); + llvm::Value* dst_unlowered_ptr = builder->CreateBitCast( + dst_lowered_ptr, dst_unlowered_ptr_type, "dst_unlowered_ptr"); + uda_fn_args.push_back(dst_unlowered_ptr); + + // Call 'uda_fn' + builder->CreateCall(uda_fn, uda_fn_args); + + // Convert intermediate 'dst_arg' back to the native type. + llvm::Value* anyval_result = builder->CreateLoad(dst_lowered_ptr, "anyval_result"); + + *updated_dst_val = CodegenAnyVal(codegen, builder, dst_type, anyval_result); + return Status::OK(); +} + +// IR codegen for the UpdateTuple loop. This loop is query specific and based on the +// aggregate functions. The function signature must match the non- codegen'd UpdateTuple +// exactly. +// For the query: +// select count(*), count(int_col), sum(double_col) the IR looks like: +// +// define void @UpdateTuple(%"class.impala::Aggregator"* %this_ptr, +// %"class.impala::AggFnEvaluator"** %agg_fn_evals, %"class.impala::Tuple"* %tuple, +// %"class.impala::TupleRow"* %row, i1 %is_merge) #33 { +// entry: +// %tuple1 = bitcast %"class.impala::Tuple"* %tuple to <{ i64, i64, double, i8 }>* +// %src_slot = getelementptr inbounds <{ i64, i64, double, i8 }>, +// <{ i64, i64, double, i8 }>* %tuple1, i32 0, i32 0 +// %count_star_val = load i64, i64* %src_slot +// %count_star_inc = add i64 %count_star_val, 1 +// store i64 %count_star_inc, i64* %src_slot +// %0 = getelementptr %"class.impala::AggFnEvaluator"*, +// %"class.impala::AggFnEvaluator"** %agg_fn_evals, i32 1 +// %agg_fn_eval = +// load %"class.impala::AggFnEvaluator"*, %"class.impala::AggFnEvaluator"** %0 +// call void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval, +// <{ i64, i64, double, i8 }>* %tuple1, %"class.impala::TupleRow"* %row) +// %1 = getelementptr %"class.impala::AggFnEvaluator"*, +// %"class.impala::AggFnEvaluator"** %agg_fn_evals, i32 2 +// %agg_fn_eval2 = +// load %"class.impala::AggFnEvaluator"*, %"class.impala::AggFnEvaluator"** %1 +// call void @UpdateSlot.2(%"class.impala::AggFnEvaluator"* %agg_fn_eval2, +// <{ i64, i64, double, i8 }>* %tuple1, %"class.impala::TupleRow"* %row) +// ret void +// } +// +Status Aggregator::CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn) { + for (const SlotDescriptor* slot_desc : intermediate_tuple_desc_->slots()) { + if (slot_desc->type().type == TYPE_CHAR) { + return Status::Expected("Aggregator::CodegenUpdateTuple(): cannot " + "codegen CHAR in aggregations"); + } + } + + if (intermediate_tuple_desc_->GetLlvmStruct(codegen) == nullptr) { + return Status::Expected("Aggregator::CodegenUpdateTuple(): failed to" + " generate intermediate tuple desc"); + } + + // Get the types to match the UpdateTuple signature + llvm::PointerType* agg_node_ptr_type = codegen->GetStructPtrType<Aggregator>(); + llvm::PointerType* evals_type = codegen->GetStructPtrPtrType<AggFnEvaluator>(); + llvm::PointerType* tuple_ptr_type = codegen->GetStructPtrType<Tuple>(); + llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>(); + + llvm::StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen); + llvm::PointerType* tuple_ptr = codegen->GetPtrType(tuple_struct); + LlvmCodeGen::FnPrototype prototype(codegen, "UpdateTuple", codegen->void_type()); + prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", agg_node_ptr_type)); + prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_fn_evals", evals_type)); + prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple", tuple_ptr_type)); + prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type)); + prototype.AddArgument(LlvmCodeGen::NamedVariable("is_merge", codegen->bool_type())); + + LlvmBuilder builder(codegen->context()); + llvm::Value* args[5]; + *fn = prototype.GeneratePrototype(&builder, &args[0]); + llvm::Value* agg_fn_evals_arg = args[1]; + llvm::Value* tuple_arg = args[2]; + llvm::Value* row_arg = args[3]; + + // Cast the parameter types to the internal llvm runtime types. + // TODO: get rid of this by using right type in function signature + tuple_arg = builder.CreateBitCast(tuple_arg, tuple_ptr, "tuple"); + + // Loop over each expr and generate the IR for that slot. If the expr is not + // count(*), generate a helper IR function to update the slot and call that. + int j = num_grouping_exprs(); + for (int i = 0; i < agg_fns_.size(); ++i, ++j) { + SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[j]; + AggFn* agg_fn = agg_fns_[i]; + if (agg_fn->is_count_star()) { + // TODO: we should be able to hoist this up to the loop over the batch and just + // increment the slot by the number of rows in the batch. + int field_idx = slot_desc->llvm_field_idx(); + llvm::Value* const_one = codegen->GetI64Constant(1); + llvm::Value* slot_ptr = + builder.CreateStructGEP(nullptr, tuple_arg, field_idx, "src_slot"); + llvm::Value* slot_loaded = builder.CreateLoad(slot_ptr, "count_star_val"); + llvm::Value* count_inc = + builder.CreateAdd(slot_loaded, const_one, "count_star_inc"); + builder.CreateStore(count_inc, slot_ptr); + } else { + llvm::Function* update_slot_fn; + RETURN_IF_ERROR(CodegenUpdateSlot(codegen, i, slot_desc, &update_slot_fn)); + + // Load agg_fn_evals_[i] + llvm::Value* agg_fn_eval_val = + codegen->CodegenArrayAt(&builder, agg_fn_evals_arg, i, "agg_fn_eval"); + + // Call UpdateSlot(agg_fn_evals_[i], tuple, row); + llvm::Value* update_slot_args[] = {agg_fn_eval_val, tuple_arg, row_arg}; + builder.CreateCall(update_slot_fn, update_slot_args); + } + } + builder.CreateRetVoid(); + + // Avoid inlining big UpdateTuple function into outer loop - we're unlikely to get + // any benefit from it since the function call overhead will be amortized. + if (agg_fns_.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) { + codegen->SetNoInline(*fn); + } + + // CodegenProcessBatch() does the final optimizations. + *fn = codegen->FinalizeFunction(*fn); + if (*fn == nullptr) { + return Status("Aggregator::CodegenUpdateTuple(): codegen'd " + "UpdateTuple() function failed verification, see log"); + } + return Status::OK(); +} +} // namespace impala http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/aggregator.h ---------------------------------------------------------------------- diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h new file mode 100644 index 0000000..ab13d45 --- /dev/null +++ b/be/src/exec/aggregator.h @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_EXEC_AGGREGATOR_H +#define IMPALA_EXEC_AGGREGATOR_H + +#include <vector> + +#include "common/global-types.h" +#include "common/status.h" +#include "gen-cpp/Types_types.h" +#include "util/runtime-profile.h" + +namespace llvm { +class Function; +class Value; +} // namespace llvm + +namespace impala { + +class AggFn; +class AggFnEvaluator; +class CodegenAnyVal; +class DescriptorTbl; +class ExecNode; +class LlvmBuilder; +class LlvmCodeGen; +class MemPool; +class MemTracker; +class ObjectPool; +class RowBatch; +class RowDescriptor; +class RuntimeState; +class ScalarExpr; +class ScalarExprEvaluator; +class SlotDescriptor; +class TPlanNode; +class Tuple; +class TupleDescriptor; +class TupleRow; + +/// Base class for aggregating rows. Used in the AggregationNode and +/// StreamingAggregationNode. +/// +/// Rows are added by calling AddBatch(). Once all rows have been added, InputDone() must +/// be called and the results can be fetched with GetNext(). +class Aggregator { + public: + Aggregator(ExecNode* exec_node, ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs, const std::string& name); + virtual ~Aggregator(); + + /// Aggregators follow the same lifecycle as ExecNodes, except that after Open() and + /// before GetNext() rows should be added with AddBatch(), followed by InputDone()[ + virtual Status Init(const TPlanNode& tnode, RuntimeState* state) WARN_UNUSED_RESULT; + virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT; + virtual void Codegen(RuntimeState* state) = 0; + virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT; + virtual Status GetNext( + RuntimeState* state, RowBatch* row_batch, bool* eos) WARN_UNUSED_RESULT = 0; + virtual Status Reset(RuntimeState* state) WARN_UNUSED_RESULT = 0; + virtual void Close(RuntimeState* state); + + /// Adds all of the rows in 'batch' to the aggregation. + virtual Status AddBatch(RuntimeState* state, RowBatch* batch) = 0; + /// Indicates that all batches have been added. Must be called before GetNext(). + virtual Status InputDone() = 0; + + virtual int num_grouping_exprs() = 0; + RuntimeProfile* runtime_profile() { return runtime_profile_; } + + virtual void SetDebugOptions(const TDebugOptions& debug_options) = 0; + + virtual std::string DebugString(int indentation_level = 0) const = 0; + virtual void DebugString(int indentation_level, std::stringstream* out) const = 0; + + static const char* LLVM_CLASS_NAME; + + protected: + /// The id of the ExecNode this Aggregator corresponds to. + int id_; + ObjectPool* pool_; + + /// Account for peak memory used by this aggregator. + std::unique_ptr<MemTracker> mem_tracker_; + + /// MemTracker used by 'expr_perm_pool_' and 'expr_results_pool_'. + std::unique_ptr<MemTracker> expr_mem_tracker_; + + /// MemPool for allocations made by expression evaluators in this aggregator that are + /// "permanent" and live until Close() is called. Created in Prepare(). + std::unique_ptr<MemPool> expr_perm_pool_; + + /// MemPool for allocations made by expression evaluators in this aggregator that hold + /// intermediate or final results of expression evaluation. Should be cleared + /// periodically to free accumulated memory. QueryMaintenance() clears this pool, but + /// it may be appropriate for Aggregator implementation to clear it at other points in + /// execution where the memory is not needed. + std::unique_ptr<MemPool> expr_results_pool_; + + /// Tuple into which Update()/Merge()/Serialize() results are stored. + TupleId intermediate_tuple_id_; + TupleDescriptor* intermediate_tuple_desc_; + + /// Tuple into which Finalize() results are stored. Possibly the same as + /// the intermediate tuple. + TupleId output_tuple_id_; + TupleDescriptor* output_tuple_desc_; + + /// The RowDescriptor for the exec node this aggregator corresponds to. + const RowDescriptor& row_desc_; + /// The RowDescriptor for the child of the exec node this aggregator corresponds to. + const RowDescriptor& input_row_desc_; + + /// Certain aggregates require a finalize step, which is the final step of the + /// aggregate after consuming all input rows. The finalize step converts the aggregate + /// value into its final form. This is true if this aggregator contains aggregate that + /// requires a finalize step. + const bool needs_finalize_; + + /// The list of all aggregate operations for this aggregator. + std::vector<AggFn*> agg_fns_; + + /// Evaluators for each aggregate function. If this is a grouping aggregation, these + /// evaluators are only used to create cloned per-partition evaluators. The cloned + /// evaluators are then used to evaluate the functions. If this is a non-grouping + /// aggregation these evaluators are used directly to evaluate the functions. + /// + /// Permanent and result allocations for these allocators are allocated from + /// 'expr_perm_pool_' and 'expr_results_pool_' respectively. + std::vector<AggFnEvaluator*> agg_fn_evals_; + + /// Conjuncts and their evaluators in this aggregator. 'conjuncts_' live in the + /// query-state's object pool while the evaluators live in this aggregator's + /// object pool. + std::vector<ScalarExpr*> conjuncts_; + std::vector<ScalarExprEvaluator*> conjunct_evals_; + + /// Runtime profile for this aggregator. Owned by 'pool_'. + RuntimeProfile* const runtime_profile_; + + int64_t num_rows_returned_; + RuntimeProfile::Counter* rows_returned_counter_; + + /// Time spent processing the child rows + RuntimeProfile::Counter* build_timer_; + + /// Initializes the aggregate function slots of an intermediate tuple. + /// Any var-len data is allocated from the FunctionContexts. + void InitAggSlots( + const std::vector<AggFnEvaluator*>& agg_fn_evals, Tuple* intermediate_tuple); + + /// Updates the given aggregation intermediate tuple with aggregation values computed + /// over 'row' using 'agg_fn_evals'. Whether the agg fn evaluator calls Update() or + /// Merge() is controlled by the evaluator itself, unless enforced explicitly by passing + /// in is_merge == true. The override is needed to merge spilled and non-spilled rows + /// belonging to the same partition independent of whether the agg fn evaluators have + /// is_merge() == true. + /// This function is replaced by codegen (which is why we don't use a vector argument + /// for agg_fn_evals).. Any var-len data is allocated from the FunctionContexts. + /// TODO: Fix the arguments order. Need to update CodegenUpdateTuple() too. + void UpdateTuple(AggFnEvaluator** agg_fn_evals, Tuple* tuple, TupleRow* row, + bool is_merge = false) noexcept; + + /// Called on the intermediate tuple of each group after all input rows have been + /// consumed and aggregated. Computes the final aggregate values to be returned in + /// GetNext() using the agg fn evaluators' Serialize() or Finalize(). + /// For the Finalize() case if the output tuple is different from the intermediate + /// tuple, then a new tuple is allocated from 'pool' to hold the final result. + /// Grouping values are copied into the output tuple and the the output tuple holding + /// the finalized/serialized aggregate values is returned. + /// TODO: Coordinate the allocation of new tuples with the release of memory + /// so as not to make memory consumption blow up. + Tuple* GetOutputTuple( + const std::vector<AggFnEvaluator*>& agg_fn_evals, Tuple* tuple, MemPool* pool); + + /// Codegen for updating aggregate expressions agg_fns_[agg_fn_idx] + /// and returns the IR function in 'fn'. Returns non-OK status if codegen + /// is unsuccessful. + Status CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx, + SlotDescriptor* slot_desc, llvm::Function** fn) WARN_UNUSED_RESULT; + + /// Codegen a call to a function implementing the UDA interface with input values + /// from 'input_vals'. 'dst_val' should contain the previous value of the aggregate + /// function, and 'updated_dst_val' is set to the new value after the Update or Merge + /// operation is applied. The instruction sequence for the UDA call is inserted at + /// the insert position of 'builder'. + Status CodegenCallUda(LlvmCodeGen* codegen, LlvmBuilder* builder, AggFn* agg_fn, + llvm::Value* agg_fn_ctx_arg, const std::vector<CodegenAnyVal>& input_vals, + const CodegenAnyVal& dst_val, CodegenAnyVal* updated_dst_val) WARN_UNUSED_RESULT; + + /// Codegen UpdateTuple(). Returns non-OK status if codegen is unsuccessful. + Status CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn) WARN_UNUSED_RESULT; +}; +} // namespace impala + +#endif // IMPALA_EXEC_AGGREGATOR_H http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/exec-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index eeefaed..384d65d 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -29,6 +29,7 @@ #include "common/status.h" #include "exprs/scalar-expr.h" #include "exprs/scalar-expr-evaluator.h" +#include "exec/aggregation-node.h" #include "exec/analytic-eval-node.h" #include "exec/cardinality-check-node.h" #include "exec/data-source-scan-node.h" @@ -42,11 +43,11 @@ #include "exec/kudu-util.h" #include "exec/nested-loop-join-node.h" #include "exec/partial-sort-node.h" -#include "exec/partitioned-aggregation-node.h" #include "exec/partitioned-hash-join-node.h" #include "exec/select-node.h" #include "exec/singular-row-src-node.h" #include "exec/sort-node.h" +#include "exec/streaming-aggregation-node.h" #include "exec/subplan-node.h" #include "exec/topn-node.h" #include "exec/union-node.h" @@ -303,7 +304,11 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode, } break; case TPlanNodeType::AGGREGATION_NODE: - *node = pool->Add(new PartitionedAggregationNode(pool, tnode, descs)); + if (tnode.agg_node.use_streaming_preaggregation) { + *node = pool->Add(new StreamingAggregationNode(pool, tnode, descs)); + } else { + *node = pool->Add(new AggregationNode(pool, tnode, descs)); + } break; case TPlanNodeType::HASH_JOIN_NODE: *node = pool->Add(new PartitionedHashJoinNode(pool, tnode, descs)); http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/exec-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h index ad9ae10..9a87a56 100644 --- a/be/src/exec/exec-node.h +++ b/be/src/exec/exec-node.h @@ -211,6 +211,7 @@ class ExecNode { MemTracker* expr_mem_tracker() { return expr_mem_tracker_.get(); } MemPool* expr_perm_pool() { return expr_perm_pool_.get(); } MemPool* expr_results_pool() { return expr_results_pool_.get(); } + const TBackendResourceProfile& resource_profile() { return resource_profile_; } bool is_closed() const { return is_closed_; } /// Return true if codegen was disabled by the planner for this ExecNode. Does not @@ -220,6 +221,10 @@ class ExecNode { /// Extract node id from p->name(). static int GetNodeIdFromProfile(RuntimeProfile* p); + /// Returns true if this node is inside the right-hand side plan tree of a SubplanNode. + /// Valid to call in or after Prepare(). + bool IsInSubplan() const { return containing_subplan_ != NULL; } + /// Names of counters shared by all exec nodes static const std::string ROW_THROUGHPUT_COUNTER; @@ -322,10 +327,6 @@ class ExecNode { /// Set by SubplanNode::Init(). Not owned. SubplanNode* containing_subplan_; - /// Returns true if this node is inside the right-hand side plan tree of a SubplanNode. - /// Valid to call in or after Prepare(). - bool IsInSubplan() const { return containing_subplan_ != NULL; } - /// If true, codegen should be disabled for this exec node. const bool disable_codegen_; http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/grouping-aggregator-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/grouping-aggregator-ir.cc b/be/src/exec/grouping-aggregator-ir.cc new file mode 100644 index 0000000..d3dbf17 --- /dev/null +++ b/be/src/exec/grouping-aggregator-ir.cc @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/grouping-aggregator.h" + +#include "exec/hash-table.inline.h" +#include "exprs/agg-fn-evaluator.h" +#include "runtime/row-batch.h" +#include "runtime/tuple-row.h" + +using namespace impala; + +template <bool AGGREGATED_ROWS> +Status GroupingAggregator::AddBatchImpl(RowBatch* batch, + TPrefetchMode::type prefetch_mode, HashTableCtx* __restrict__ ht_ctx) { + DCHECK(!hash_partitions_.empty()); + DCHECK(!is_streaming_preagg_); + + // Make sure that no resizes will happen when inserting individual rows to the hash + // table of each partition by pessimistically assuming that all the rows in each batch + // will end up to the same partition. + // TODO: Once we have a histogram with the number of rows per partition, we will have + // accurate resize calls. + RETURN_IF_ERROR( + CheckAndResizeHashPartitions(AGGREGATED_ROWS, batch->num_rows(), ht_ctx)); + + HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); + const int cache_size = expr_vals_cache->capacity(); + const int num_rows = batch->num_rows(); + for (int group_start = 0; group_start < num_rows; group_start += cache_size) { + EvalAndHashPrefetchGroup<AGGREGATED_ROWS>(batch, group_start, prefetch_mode, ht_ctx); + + FOREACH_ROW_LIMIT(batch, group_start, cache_size, batch_iter) { + RETURN_IF_ERROR(ProcessRow<AGGREGATED_ROWS>(batch_iter.Get(), ht_ctx)); + expr_vals_cache->NextRow(); + } + DCHECK(expr_vals_cache->AtEnd()); + } + return Status::OK(); +} + +template <bool AGGREGATED_ROWS> +void IR_ALWAYS_INLINE GroupingAggregator::EvalAndHashPrefetchGroup(RowBatch* batch, + int start_row_idx, TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx) { + HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); + const int cache_size = expr_vals_cache->capacity(); + + expr_vals_cache->Reset(); + FOREACH_ROW_LIMIT(batch, start_row_idx, cache_size, batch_iter) { + TupleRow* row = batch_iter.Get(); + bool is_null; + if (AGGREGATED_ROWS) { + is_null = !ht_ctx->EvalAndHashBuild(row); + } else { + is_null = !ht_ctx->EvalAndHashProbe(row); + } + // Hoist lookups out of non-null branch to speed up non-null case. + const uint32_t hash = expr_vals_cache->CurExprValuesHash(); + const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS); + HashTable* hash_tbl = GetHashTable(partition_idx); + if (is_null) { + expr_vals_cache->SetRowNull(); + } else if (prefetch_mode != TPrefetchMode::NONE) { + if (LIKELY(hash_tbl != nullptr)) hash_tbl->PrefetchBucket<false>(hash); + } + expr_vals_cache->NextRow(); + } + + expr_vals_cache->ResetForRead(); +} + +template <bool AGGREGATED_ROWS> +Status GroupingAggregator::ProcessRow( + TupleRow* __restrict__ row, HashTableCtx* __restrict__ ht_ctx) { + HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); + // Hoist lookups out of non-null branch to speed up non-null case. + const uint32_t hash = expr_vals_cache->CurExprValuesHash(); + const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS); + if (expr_vals_cache->IsRowNull()) return Status::OK(); + // To process this row, we first see if it can be aggregated or inserted into this + // partition's hash table. If we need to insert it and that fails, due to OOM, we + // spill the partition. The partition to spill is not necessarily dst_partition, + // so we can try again to insert the row. + HashTable* hash_tbl = GetHashTable(partition_idx); + Partition* dst_partition = hash_partitions_[partition_idx]; + DCHECK(dst_partition != nullptr); + DCHECK_EQ(dst_partition->is_spilled(), hash_tbl == nullptr); + if (hash_tbl == nullptr) { + // This partition is already spilled, just append the row. + return AppendSpilledRow<AGGREGATED_ROWS>(dst_partition, row); + } + + DCHECK(dst_partition->aggregated_row_stream->is_pinned()); + bool found; + // Find the appropriate bucket in the hash table. There will always be a free + // bucket because we checked the size above. + HashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found); + DCHECK(!it.AtEnd()) << "Hash table had no free buckets"; + if (AGGREGATED_ROWS) { + // If the row is already an aggregate row, it cannot match anything in the + // hash table since we process the aggregate rows first. These rows should + // have been aggregated in the initial pass. + DCHECK(!found); + } else if (found) { + // Row is already in hash table. Do the aggregation and we're done. + UpdateTuple(dst_partition->agg_fn_evals.data(), it.GetTuple(), row); + return Status::OK(); + } + + // If we are seeing this result row for the first time, we need to construct the + // result row and initialize it. + return AddIntermediateTuple<AGGREGATED_ROWS>(dst_partition, row, hash, it); +} + +template <bool AGGREGATED_ROWS> +Status GroupingAggregator::AddIntermediateTuple(Partition* __restrict__ partition, + TupleRow* __restrict__ row, uint32_t hash, HashTable::Iterator insert_it) { + while (true) { + DCHECK(partition->aggregated_row_stream->is_pinned()); + Tuple* intermediate_tuple = ConstructIntermediateTuple(partition->agg_fn_evals, + partition->aggregated_row_stream.get(), &add_batch_status_); + + if (LIKELY(intermediate_tuple != nullptr)) { + UpdateTuple( + partition->agg_fn_evals.data(), intermediate_tuple, row, AGGREGATED_ROWS); + // After copying and initializing the tuple, insert it into the hash table. + insert_it.SetTuple(intermediate_tuple, hash); + return Status::OK(); + } else if (!add_batch_status_.ok()) { + return std::move(add_batch_status_); + } + + // We did not have enough memory to add intermediate_tuple to the stream. + RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS)); + if (partition->is_spilled()) { + return AppendSpilledRow<AGGREGATED_ROWS>(partition, row); + } + } +} + +Status GroupingAggregator::AddBatchStreamingImpl(bool needs_serialize, + TPrefetchMode::type prefetch_mode, RowBatch* in_batch, RowBatch* out_batch, + HashTableCtx* __restrict__ ht_ctx, int remaining_capacity[PARTITION_FANOUT]) { + DCHECK(is_streaming_preagg_); + DCHECK_EQ(out_batch->num_rows(), 0); + DCHECK_LE(in_batch->num_rows(), out_batch->capacity()); + + RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows()); + HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); + const int num_rows = in_batch->num_rows(); + const int cache_size = expr_vals_cache->capacity(); + for (int group_start = 0; group_start < num_rows; group_start += cache_size) { + EvalAndHashPrefetchGroup<false>(in_batch, group_start, prefetch_mode, ht_ctx); + + FOREACH_ROW_LIMIT(in_batch, group_start, cache_size, in_batch_iter) { + // Hoist lookups out of non-null branch to speed up non-null case. + TupleRow* in_row = in_batch_iter.Get(); + const uint32_t hash = expr_vals_cache->CurExprValuesHash(); + const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS); + if (!expr_vals_cache->IsRowNull() + && !TryAddToHashTable(ht_ctx, hash_partitions_[partition_idx], + GetHashTable(partition_idx), in_row, hash, + &remaining_capacity[partition_idx], &add_batch_status_)) { + RETURN_IF_ERROR(std::move(add_batch_status_)); + // Tuple is not going into hash table, add it to the output batch. + Tuple* intermediate_tuple = ConstructIntermediateTuple( + agg_fn_evals_, out_batch->tuple_data_pool(), &add_batch_status_); + if (UNLIKELY(intermediate_tuple == nullptr)) { + DCHECK(!add_batch_status_.ok()); + return std::move(add_batch_status_); + } + UpdateTuple(agg_fn_evals_.data(), intermediate_tuple, in_row); + out_batch_iterator.Get()->SetTuple(0, intermediate_tuple); + out_batch_iterator.Next(); + out_batch->CommitLastRow(); + } + DCHECK(add_batch_status_.ok()); + expr_vals_cache->NextRow(); + } + DCHECK(expr_vals_cache->AtEnd()); + } + if (needs_serialize) { + FOREACH_ROW(out_batch, 0, out_batch_iter) { + AggFnEvaluator::Serialize(agg_fn_evals_, out_batch_iter.Get()->GetTuple(0)); + } + } + + return Status::OK(); +} + +bool GroupingAggregator::TryAddToHashTable(HashTableCtx* __restrict__ ht_ctx, + Partition* __restrict__ partition, HashTable* __restrict__ hash_tbl, + TupleRow* __restrict__ in_row, uint32_t hash, int* __restrict__ remaining_capacity, + Status* status) { + DCHECK(remaining_capacity != nullptr); + DCHECK_EQ(hash_tbl, partition->hash_tbl.get()); + DCHECK_GE(*remaining_capacity, 0); + bool found; + // This is called from ProcessBatchStreaming() so the rows are not aggregated. + HashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found); + Tuple* intermediate_tuple; + if (found) { + intermediate_tuple = it.GetTuple(); + } else if (*remaining_capacity == 0) { + return false; + } else { + intermediate_tuple = ConstructIntermediateTuple( + partition->agg_fn_evals, partition->aggregated_row_stream.get(), status); + if (LIKELY(intermediate_tuple != nullptr)) { + it.SetTuple(intermediate_tuple, hash); + --(*remaining_capacity); + } else { + // Avoid repeatedly trying to add tuples when under memory pressure. + *remaining_capacity = 0; + return false; + } + } + + UpdateTuple(partition->agg_fn_evals.data(), intermediate_tuple, in_row); + return true; +} + +// Instantiate required templates. +template Status GroupingAggregator::AddBatchImpl<false>( + RowBatch*, TPrefetchMode::type, HashTableCtx*); +template Status GroupingAggregator::AddBatchImpl<true>( + RowBatch*, TPrefetchMode::type, HashTableCtx*); http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/grouping-aggregator-partition.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/grouping-aggregator-partition.cc b/be/src/exec/grouping-aggregator-partition.cc new file mode 100644 index 0000000..8fe08f4 --- /dev/null +++ b/be/src/exec/grouping-aggregator-partition.cc @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/grouping-aggregator.h" + +#include <set> +#include <sstream> + +#include "exec/exec-node.h" +#include "exec/hash-table.inline.h" +#include "exprs/agg-fn-evaluator.h" +#include "runtime/descriptors.h" +#include "runtime/mem-pool.h" +#include "runtime/row-batch.h" +#include "runtime/runtime-state.h" +#include "util/runtime-profile-counters.h" + +#include "gen-cpp/PlanNodes_types.h" + +namespace impala { + +GroupingAggregator::Partition::~Partition() { + DCHECK(is_closed); +} + +Status GroupingAggregator::Partition::InitStreams() { + agg_fn_perm_pool.reset(new MemPool(parent->expr_mem_tracker_.get())); + DCHECK_EQ(agg_fn_evals.size(), 0); + AggFnEvaluator::ShallowClone(parent->partition_pool_.get(), agg_fn_perm_pool.get(), + parent->expr_results_pool_.get(), parent->agg_fn_evals_, &agg_fn_evals); + // Varlen aggregate function results are stored outside of aggregated_row_stream because + // BufferedTupleStream doesn't support relocating varlen data stored in the stream. + auto agg_slot = + parent->intermediate_tuple_desc_->slots().begin() + parent->grouping_exprs_.size(); + std::set<SlotId> external_varlen_slots; + for (; agg_slot != parent->intermediate_tuple_desc_->slots().end(); ++agg_slot) { + if ((*agg_slot)->type().IsVarLenStringType()) { + external_varlen_slots.insert((*agg_slot)->id()); + } + } + + aggregated_row_stream.reset( + new BufferedTupleStream(parent->state_, &parent->intermediate_row_desc_, + parent->buffer_pool_client(), parent->resource_profile_.spillable_buffer_size, + parent->resource_profile_.max_row_buffer_size, external_varlen_slots)); + RETURN_IF_ERROR(aggregated_row_stream->Init(parent->id_, true)); + bool got_buffer; + RETURN_IF_ERROR(aggregated_row_stream->PrepareForWrite(&got_buffer)); + DCHECK(got_buffer) << "Buffer included in reservation " << parent->id_ << "\n" + << parent->buffer_pool_client()->DebugString() << "\n" + << parent->DebugString(2); + if (!parent->is_streaming_preagg_) { + unaggregated_row_stream.reset( + new BufferedTupleStream(parent->state_, &parent->input_row_desc_, + parent->buffer_pool_client(), parent->resource_profile_.spillable_buffer_size, + parent->resource_profile_.max_row_buffer_size)); + // This stream is only used to spill, no need to ever have this pinned. + RETURN_IF_ERROR(unaggregated_row_stream->Init(parent->id_, false)); + // Save memory by waiting until we spill to allocate the write buffer for the + // unaggregated row stream. + DCHECK(!unaggregated_row_stream->has_write_iterator()); + } + return Status::OK(); +} + +Status GroupingAggregator::Partition::InitHashTable(bool* got_memory) { + DCHECK(aggregated_row_stream != nullptr); + DCHECK(hash_tbl == nullptr); + // We use the upper PARTITION_FANOUT num bits to pick the partition so only the + // remaining bits can be used for the hash table. + // TODO: we could switch to 64 bit hashes and then we don't need a max size. + // It might be reasonable to limit individual hash table size for other reasons + // though. Always start with small buffers. + hash_tbl.reset(HashTable::Create(parent->ht_allocator_.get(), false, 1, nullptr, + 1L << (32 - NUM_PARTITIONING_BITS), PAGG_DEFAULT_HASH_TABLE_SZ)); + // Please update the error message in CreateHashPartitions() if initial size of + // hash table changes. + return hash_tbl->Init(got_memory); +} + +Status GroupingAggregator::Partition::SerializeStreamForSpilling() { + DCHECK(!parent->is_streaming_preagg_); + if (parent->needs_serialize_) { + // We need to do a lot more work in this case. This step effectively does a merge + // aggregation in this node. We need to serialize the intermediates, spill the + // intermediates and then feed them into the aggregate function's merge step. + // This is often used when the intermediate is a string type, meaning the current + // (before serialization) in-memory layout is not the on-disk block layout. + // The disk layout does not support mutable rows. We need to rewrite the stream + // into the on disk format. + // TODO: if it happens to not be a string, we could serialize in place. This is + // a future optimization since it is very unlikely to have a serialize phase + // for those UDAs. + DCHECK(parent->serialize_stream_.get() != nullptr); + DCHECK(!parent->serialize_stream_->is_pinned()); + + // Serialize and copy the spilled partition's stream into the new stream. + Status status; + BufferedTupleStream* new_stream = parent->serialize_stream_.get(); + HashTable::Iterator it = hash_tbl->Begin(parent->ht_ctx_.get()); + while (!it.AtEnd()) { + Tuple* tuple = it.GetTuple(); + it.Next(); + AggFnEvaluator::Serialize(agg_fn_evals, tuple); + if (UNLIKELY(!new_stream->AddRow(reinterpret_cast<TupleRow*>(&tuple), &status))) { + DCHECK(!status.ok()) << "Stream was unpinned - AddRow() only fails on error"; + // Even if we can't add to new_stream, finish up processing this agg stream to + // make clean up easier (someone has to finalize this stream and we don't want to + // remember where we are). + parent->CleanupHashTbl(agg_fn_evals, it); + hash_tbl->Close(); + hash_tbl.reset(); + aggregated_row_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES); + return status; + } + } + + aggregated_row_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES); + aggregated_row_stream.swap(parent->serialize_stream_); + // Recreate the serialize_stream (and reserve 1 buffer) now in preparation for + // when we need to spill again. We need to have this available before we need + // to spill to make sure it is available. This should be acquirable since we just + // freed at least one buffer from this partition's (old) aggregated_row_stream. + parent->serialize_stream_.reset( + new BufferedTupleStream(parent->state_, &parent->intermediate_row_desc_, + parent->buffer_pool_client(), parent->resource_profile_.spillable_buffer_size, + parent->resource_profile_.max_row_buffer_size)); + status = parent->serialize_stream_->Init(parent->id_, false); + if (status.ok()) { + bool got_buffer; + status = parent->serialize_stream_->PrepareForWrite(&got_buffer); + DCHECK(!status.ok() || got_buffer) << "Accounted in min reservation"; + } + if (!status.ok()) { + hash_tbl->Close(); + hash_tbl.reset(); + return status; + } + DCHECK(parent->serialize_stream_->has_write_iterator()); + } + return Status::OK(); +} + +Status GroupingAggregator::Partition::Spill(bool more_aggregate_rows) { + DCHECK(!parent->is_streaming_preagg_); + DCHECK(!is_closed); + DCHECK(!is_spilled()); + RETURN_IF_ERROR(parent->state_->StartSpilling(parent->mem_tracker_.get())); + + RETURN_IF_ERROR(SerializeStreamForSpilling()); + + // Free the in-memory result data. + AggFnEvaluator::Close(agg_fn_evals, parent->state_); + agg_fn_evals.clear(); + + if (agg_fn_perm_pool.get() != nullptr) { + agg_fn_perm_pool->FreeAll(); + agg_fn_perm_pool.reset(); + } + + hash_tbl->Close(); + hash_tbl.reset(); + + // Unpin the stream to free memory, but leave a write buffer in place so we can + // continue appending rows to one of the streams in the partition. + DCHECK(aggregated_row_stream->has_write_iterator()); + DCHECK(!unaggregated_row_stream->has_write_iterator()); + if (more_aggregate_rows) { + aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT); + } else { + aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL); + bool got_buffer; + RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer)); + DCHECK(got_buffer) << "Accounted in min reservation" + << parent->buffer_pool_client()->DebugString(); + } + + COUNTER_ADD(parent->num_spilled_partitions_, 1); + if (parent->num_spilled_partitions_->value() == 1) { + parent->runtime_profile()->AppendExecOption("Spilled"); + } + return Status::OK(); +} + +void GroupingAggregator::Partition::Close(bool finalize_rows) { + if (is_closed) return; + is_closed = true; + if (aggregated_row_stream.get() != nullptr) { + if (finalize_rows && hash_tbl.get() != nullptr) { + // We need to walk all the rows and Finalize them here so the UDA gets a chance + // to cleanup. If the hash table is gone (meaning this was spilled), the rows + // should have been finalized/serialized in Spill(). + parent->CleanupHashTbl(agg_fn_evals, hash_tbl->Begin(parent->ht_ctx_.get())); + } + aggregated_row_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES); + } + if (hash_tbl.get() != nullptr) hash_tbl->Close(); + if (unaggregated_row_stream.get() != nullptr) { + unaggregated_row_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES); + } + for (AggFnEvaluator* eval : agg_fn_evals) eval->Close(parent->state_); + if (agg_fn_perm_pool.get() != nullptr) agg_fn_perm_pool->FreeAll(); +} +} // namespace impala
