IMPALA-110: Support for multiple DISTINCT This patch adds support for having multiple aggregate functions in a single SELECT block that use DISTINCT over different sets of columns.
Planner design: - The existing tree-based plan shape with a two-phased aggregation is maintained. - Existing plans are not changed. - Aggregates are grouped into 'aggregation classes' based on their expressions in the distinct portion which may be empty for non-distinct aggregates. - The aggregation framework is generalized to simultaneously process multiple aggregation classes within the tree-based plan. This process splits the results of different aggregation classes into separate rows, so a final aggregation is needed to transpose the results into the desired form. - Main challenge: Each aggregation class consumes and produces different tuples, so conceptually a union-type of tuples flows through the runtime. The tuple union is represented by a TupleRow with one tuple per aggregation class. Only one tuple in such a TupleRow is non-NULL. - Backend exec nodes in the aggregation plan will be aware of this tuple-union either explicitly in their implementation or by relying on expressions that distinguish the aggregation classes. - To distinguish the aggregation classes, e.g. in hash exchanges, CASE expressions are crafted to hash/group on the appropriate slots. Deferred FE work: - Beautify/condense the long CASE exprs - Push applicable conjuncts into individual aggregators before the transposition step - Added a few testing TODOs to reduce the size of this patch - Decide whether we want to change existing plans to the new model Execution design: - Previous patches separated out aggregation logic from the exec node into Aggregators. This is extended to support multiple Aggregators per node, with different grouping and aggregating functions. - There is a fast path for aggregations with only one aggregator, which leaves the execution essentially unchanged from before. - When there are multiple aggregators, the first aggregation node in the plan replicates its input to each aggregator. The output of this step is rows where only a single tuple is non-null, corresponding to the aggregator that produced the row. - A new expr is introduced, ValidTupleId, which takes one of these rows and returns which tuple is non-null. - For additional aggregation nodes, the input is split apart into 'mini-batches' according to which aggregator the row corresponds to. Testing: - Added analyzer and planner tests - Added end-to-end queries tests - Ran hdfs/core tests - Added support in the query generator and ran in a loop. Change-Id: I055402eaef6d81e5f70e850d9f8a621e766830a4 Reviewed-on: http://gerrit.cloudera.org:8080/10771 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/df53ec23 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/df53ec23 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/df53ec23 Branch: refs/heads/master Commit: df53ec2385190bba2b3cefb43b094cde6d33642f Parents: 91673fe Author: Thomas Tauber-Marshall <tmarsh...@cloudera.com> Authored: Tue May 15 16:41:18 2018 +0000 Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Committed: Wed Sep 26 03:54:49 2018 +0000 ---------------------------------------------------------------------- be/src/codegen/gen_ir_descriptions.py | 2 +- be/src/exec/CMakeLists.txt | 1 + be/src/exec/aggregation-node-base.cc | 115 ++ be/src/exec/aggregation-node-base.h | 67 + be/src/exec/aggregation-node.cc | 110 +- be/src/exec/aggregation-node.h | 19 +- be/src/exec/aggregator.cc | 34 +- be/src/exec/aggregator.h | 38 +- be/src/exec/exec-node.cc | 2 +- be/src/exec/grouping-aggregator-ir.cc | 16 +- be/src/exec/grouping-aggregator.cc | 63 +- be/src/exec/grouping-aggregator.h | 78 +- be/src/exec/non-grouping-aggregator.cc | 17 +- be/src/exec/non-grouping-aggregator.h | 20 +- be/src/exec/streaming-aggregation-node.cc | 136 +- be/src/exec/streaming-aggregation-node.h | 23 +- be/src/exprs/CMakeLists.txt | 1 + be/src/exprs/aggregate-functions-ir.cc | 93 ++ be/src/exprs/aggregate-functions.h | 10 + be/src/exprs/scalar-expr.cc | 4 + be/src/exprs/valid-tuple-id.cc | 67 + be/src/exprs/valid-tuple-id.h | 52 + be/src/runtime/row-batch.h | 8 + common/thrift/Exprs.thrift | 3 +- common/thrift/PlanNodes.thrift | 60 +- .../apache/impala/analysis/AggregateInfo.java | 173 +-- .../impala/analysis/AggregateInfoBase.java | 14 +- .../java/org/apache/impala/analysis/Expr.java | 1 + .../impala/analysis/MultiAggregateInfo.java | 688 ++++++++++ .../apache/impala/analysis/NumericLiteral.java | 4 + .../org/apache/impala/analysis/SelectStmt.java | 113 +- .../apache/impala/analysis/StmtRewriter.java | 35 +- .../org/apache/impala/analysis/UnionStmt.java | 19 +- .../impala/analysis/ValidTupleIdExpr.java | 96 ++ .../impala/catalog/AggregateFunction.java | 22 + .../org/apache/impala/catalog/BuiltinsDb.java | 81 ++ .../apache/impala/planner/AggregationNode.java | 442 ++++-- .../impala/planner/DistributedPlanner.java | 130 +- .../impala/planner/SingleNodePlanner.java | 109 +- .../impala/analysis/AnalyzeExprsTest.java | 106 +- .../impala/analysis/AnalyzeStmtsTest.java | 87 +- .../apache/impala/analysis/AnalyzerTest.java | 60 - .../org/apache/impala/planner/PlannerTest.java | 25 + .../queries/PlannerTest/distinct.test | 45 + .../PlannerTest/multiple-distinct-limit.test | 181 +++ .../multiple-distinct-materialization.test | 972 +++++++++++++ .../multiple-distinct-predicates.test | 498 +++++++ .../queries/PlannerTest/multiple-distinct.test | 1280 ++++++++++++++++++ .../QueryTest/multiple-distinct-aggs.test | 424 ++++++ .../queries/QueryTest/spilling-aggs.test | 60 + tests/query_test/test_aggregation.py | 8 +- 51 files changed, 5881 insertions(+), 831 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/codegen/gen_ir_descriptions.py ---------------------------------------------------------------------- diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py index 99a97ae..8175b59 100755 --- a/be/src/codegen/gen_ir_descriptions.py +++ b/be/src/codegen/gen_ir_descriptions.py @@ -52,7 +52,7 @@ ir_functions = [ ["NON_GROUPING_AGG_ADD_BATCH_IMPL", "_ZN6impala21NonGroupingAggregator12AddBatchImplEPNS_8RowBatchE"], ["GROUPING_AGG_ADD_BATCH_STREAMING_IMPL", - "_ZN6impala18GroupingAggregator21AddBatchStreamingImplEbNS_13TPrefetchMode4typeEPNS_8RowBatchES4_PNS_12HashTableCtxEPi"], + "_ZN6impala18GroupingAggregator21AddBatchStreamingImplEibNS_13TPrefetchMode4typeEPNS_8RowBatchES4_PNS_12HashTableCtxEPi"], ["AVG_UPDATE_BIGINT", "_ZN6impala18AggregateFunctions9AvgUpdateIN10impala_udf9BigIntValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE"], ["AVG_UPDATE_DOUBLE", http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 4544b95..385aa49 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -26,6 +26,7 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec") add_library(Exec aggregation-node.cc + aggregation-node-base.cc aggregator.cc analytic-eval-node.cc base-sequence-scanner.cc http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/aggregation-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/aggregation-node-base.cc b/be/src/exec/aggregation-node-base.cc new file mode 100644 index 0000000..5de54be --- /dev/null +++ b/be/src/exec/aggregation-node-base.cc @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/aggregation-node-base.h" + +#include "exec/grouping-aggregator.h" +#include "exec/non-grouping-aggregator.h" +#include "runtime/runtime-state.h" +#include "util/runtime-profile-counters.h" + +#include "common/names.h" + +namespace impala { + +AggregationNodeBase::AggregationNodeBase( + ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + : ExecNode(pool, tnode, descs), replicate_input_(tnode.agg_node.replicate_input) {} + +Status AggregationNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) { + // The conjuncts will be evaluated in the Aggregator, so don't pass them to the + // ExecNode. TODO: remove this once we assign conjuncts directly to Aggregators. + TPlanNode tnode_no_conjuncts(tnode); + tnode_no_conjuncts.__set_conjuncts(std::vector<TExpr>()); + RETURN_IF_ERROR(ExecNode::Init(tnode_no_conjuncts, state)); + + int num_ags = tnode.agg_node.aggregators.size(); + for (int i = 0; i < num_ags; ++i) { + const TAggregator& agg = tnode.agg_node.aggregators[i]; + unique_ptr<Aggregator> node; + if (agg.grouping_exprs.empty()) { + node.reset(new NonGroupingAggregator(this, pool_, agg, state->desc_tbl(), i)); + } else { + node.reset(new GroupingAggregator(this, pool_, agg, state->desc_tbl(), + tnode.agg_node.estimated_input_cardinality, i)); + } + aggs_.push_back(std::move(node)); + RETURN_IF_ERROR(aggs_[i]->Init(agg, state, tnode.conjuncts)); + runtime_profile_->AddChild(aggs_[i]->runtime_profile()); + } + DCHECK(aggs_.size() > 0); + return Status::OK(); +} + +Status AggregationNodeBase::Prepare(RuntimeState* state) { + SCOPED_TIMER(runtime_profile_->total_time_counter()); + RETURN_IF_ERROR(ExecNode::Prepare(state)); + for (auto& agg : aggs_) { + agg->SetDebugOptions(debug_options_); + RETURN_IF_ERROR(agg->Prepare(state)); + } + state->CheckAndAddCodegenDisabledMessage(runtime_profile()); + return Status::OK(); +} + +void AggregationNodeBase::Codegen(RuntimeState* state) { + DCHECK(state->ShouldCodegen()); + ExecNode::Codegen(state); + if (IsNodeCodegenDisabled()) return; + for (auto& agg : aggs_) agg->Codegen(state); +} + +Status AggregationNodeBase::Reset(RuntimeState* state) { + for (auto& agg : aggs_) RETURN_IF_ERROR(agg->Reset(state)); + curr_output_agg_idx_ = 0; + return ExecNode::Reset(state); +} + +Status AggregationNodeBase::SplitMiniBatches( + RowBatch* batch, vector<unique_ptr<RowBatch>>* mini_batches) { + int num_tuples = child(0)->row_desc()->tuple_descriptors().size(); + int num_rows = batch->num_rows(); + int last_agg_idx = 0; + for (int i = 0; i < num_rows; ++i) { + TupleRow* src_row = batch->GetRow(i); + int dst_agg_idx = -1; + // Optimization that bets that the index of non-null agg will be the same from one + // tuple to the next. + if (src_row->GetTuple(last_agg_idx) != nullptr) { + dst_agg_idx = last_agg_idx; + } else { + for (int j = 0; j < num_tuples; ++j) { + if (src_row->GetTuple(j) != nullptr) { + dst_agg_idx = j; + last_agg_idx = j; + break; + } + } + } + + DCHECK_GE(dst_agg_idx, 0); + DCHECK_LT(dst_agg_idx, num_tuples); + + RowBatch* mini_batch = (*mini_batches)[dst_agg_idx].get(); + TupleRow* dst_row = mini_batch->GetRow(mini_batch->AddRow()); + batch->CopyRow(src_row, dst_row); + mini_batch->CommitLastRow(); + } + return Status::OK(); +} + +} // namespace impala http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/aggregation-node-base.h ---------------------------------------------------------------------- diff --git a/be/src/exec/aggregation-node-base.h b/be/src/exec/aggregation-node-base.h new file mode 100644 index 0000000..7dfab34 --- /dev/null +++ b/be/src/exec/aggregation-node-base.h @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_EXEC_AGGREGATION_NODE_BASE_H +#define IMPALA_EXEC_AGGREGATION_NODE_BASE_H + +#include <memory> + +#include "exec/aggregator.h" +#include "exec/exec-node.h" + +namespace impala { + +/// Base class containing common code for the ExecNodes that do aggregation, +/// AggregationNode and StreamingAggregationNode. +class AggregationNodeBase : public ExecNode { + public: + AggregationNodeBase( + ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + + virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override; + virtual Status Prepare(RuntimeState* state) override; + virtual void Codegen(RuntimeState* state) override; + virtual Status Reset(RuntimeState* state) override; + + protected: + /// If true, the input to this node should be passed into each Aggregator in 'aggs_'. + /// Otherwise, the input should be divided between the Aggregators using + /// AggregationNodeBase::SplitMiniBatches(). + const bool replicate_input_; + + ///////////////////////////////////////// + /// BEGIN: Members that must be Reset() + + /// Performs the actual work of aggregating input rows. + std::vector<std::unique_ptr<Aggregator>> aggs_; + + /// The index in 'aggs_' of the Aggregator which we are currently returning rows from in + /// GetNext(). + int curr_output_agg_idx_ = 0; + + /// END: Members that must be Reset() + ///////////////////////////////////////// + + /// Splits the rows of 'batch' up according to which tuple of the row is non-null such + /// that a row with tuple 'i' non-null is copied into the batch 'mini_batches[i]'. + /// It is expected that all rows of 'batch' have exactly 1 non-null tuple. + Status SplitMiniBatches( + RowBatch* batch, std::vector<std::unique_ptr<RowBatch>>* mini_batches); +}; +} // namespace impala + +#endif // IMPALA_EXEC_AGGREGATION_NODE_BASE_H http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/aggregation-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc index 9f76768..eabe7da 100644 --- a/be/src/exec/aggregation-node.cc +++ b/be/src/exec/aggregation-node.cc @@ -19,11 +19,10 @@ #include <sstream> -#include "exec/grouping-aggregator.h" -#include "exec/non-grouping-aggregator.h" #include "gutil/strings/substitute.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" +#include "runtime/tuple-row.h" #include "util/debug-util.h" #include "util/runtime-profile-counters.h" @@ -35,38 +34,10 @@ namespace impala { AggregationNode::AggregationNode( ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : ExecNode(pool, tnode, descs) {} - -Status AggregationNode::Init(const TPlanNode& tnode, RuntimeState* state) { - // The conjuncts will be evaluated in the Aggregator, so don't pass them to the - // ExecNode. TODO: remove this once we assign conjuncts directly to Aggregators. - TPlanNode tnode_no_conjuncts(tnode); - tnode_no_conjuncts.__set_conjuncts(std::vector<TExpr>()); - RETURN_IF_ERROR(ExecNode::Init(tnode_no_conjuncts, state)); - if (tnode.agg_node.grouping_exprs.empty()) { - aggregator_.reset(new NonGroupingAggregator(this, pool_, tnode, state->desc_tbl())); - } else { - aggregator_.reset(new GroupingAggregator(this, pool_, tnode, state->desc_tbl())); + : AggregationNodeBase(pool, tnode, descs) { + for (int i = 0; i < tnode.agg_node.aggregators.size(); ++i) { + DCHECK(!tnode.agg_node.aggregators[i].use_streaming_preaggregation); } - RETURN_IF_ERROR(aggregator_->Init(tnode, state)); - runtime_profile_->AddChild(aggregator_->runtime_profile()); - return Status::OK(); -} - -Status AggregationNode::Prepare(RuntimeState* state) { - SCOPED_TIMER(runtime_profile_->total_time_counter()); - RETURN_IF_ERROR(ExecNode::Prepare(state)); - aggregator_->SetDebugOptions(debug_options_); - RETURN_IF_ERROR(aggregator_->Prepare(state)); - state->CheckAndAddCodegenDisabledMessage(runtime_profile()); - return Status::OK(); -} - -void AggregationNode::Codegen(RuntimeState* state) { - DCHECK(state->ShouldCodegen()); - ExecNode::Codegen(state); - if (IsNodeCodegenDisabled()) return; - aggregator_->Codegen(state); } Status AggregationNode::Open(RuntimeState* state) { @@ -74,17 +45,53 @@ Status AggregationNode::Open(RuntimeState* state) { // Open the child before consuming resources in this node. RETURN_IF_ERROR(child(0)->Open(state)); RETURN_IF_ERROR(ExecNode::Open(state)); + for (auto& agg : aggs_) RETURN_IF_ERROR(agg->Open(state)); + RowBatch child_batch(child(0)->row_desc(), state->batch_size(), mem_tracker()); + + int num_aggs = aggs_.size(); + // Create mini batches. + vector<unique_ptr<RowBatch>> mini_batches; + if (!replicate_input_ && num_aggs > 1) { + for (int i = 0; i < num_aggs; ++i) { + mini_batches.push_back(make_unique<RowBatch>( + child(0)->row_desc(), state->batch_size(), mem_tracker())); + } + } - RETURN_IF_ERROR(aggregator_->Open(state)); - - RowBatch batch(child(0)->row_desc(), state->batch_size(), mem_tracker()); // Read all the rows from the child and process them. bool eos = false; do { RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(children_[0]->GetNext(state, &batch, &eos)); - RETURN_IF_ERROR(aggregator_->AddBatch(state, &batch)); - batch.Reset(); + RETURN_IF_ERROR(children_[0]->GetNext(state, &child_batch, &eos)); + + if (num_aggs == 1) { + RETURN_IF_ERROR(aggs_[0]->AddBatch(state, &child_batch)); + child_batch.Reset(); + continue; + } + + if (replicate_input_) { + for (auto& agg : aggs_) RETURN_IF_ERROR(agg->AddBatch(state, &child_batch)); + child_batch.Reset(); + continue; + } + + // Separate input batch into mini batches destined for the different aggs. + int num_tuples = child(0)->row_desc()->tuple_descriptors().size(); + DCHECK_EQ(num_aggs, num_tuples); + int num_rows = child_batch.num_rows(); + if (num_rows > 0) { + RETURN_IF_ERROR(SplitMiniBatches(&child_batch, &mini_batches)); + + for (int i = 0; i < num_tuples; ++i) { + RowBatch* mini_batch = mini_batches[i].get(); + if (mini_batch->num_rows() > 0) { + RETURN_IF_ERROR(aggs_[i]->AddBatch(state, mini_batch)); + mini_batch->Reset(); + } + } + } + child_batch.Reset(); } while (!eos); // The child can be closed at this point in most cases because we have consumed all of @@ -93,7 +100,7 @@ Status AggregationNode::Open(RuntimeState* state) { // child again, if (!IsInSubplan()) child(0)->Close(state); - RETURN_IF_ERROR(aggregator_->InputDone()); + for (auto& agg : aggs_) RETURN_IF_ERROR(agg->InputDone()); return Status::OK(); } @@ -102,35 +109,38 @@ Status AggregationNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state)); RETURN_IF_CANCELLED(state); - if (ReachedLimit()) { + if (curr_output_agg_idx_ >= aggs_.size() || ReachedLimit()) { *eos = true; return Status::OK(); } - RETURN_IF_ERROR(aggregator_->GetNext(state, row_batch, eos)); + // With multiple Aggregators, each will only set a single tuple per row. We rely on the + // other tuples to be null to detect which Aggregator set which row. + if (aggs_.size() > 1) row_batch->ClearTuplePointers(); + + bool pagg_eos = false; + RETURN_IF_ERROR(aggs_[curr_output_agg_idx_]->GetNext(state, row_batch, &pagg_eos)); + if (pagg_eos) ++curr_output_agg_idx_; + + *eos = ReachedLimit() || (pagg_eos && curr_output_agg_idx_ >= aggs_.size()); num_rows_returned_ += row_batch->num_rows(); COUNTER_SET(rows_returned_counter_, num_rows_returned_); return Status::OK(); } -Status AggregationNode::Reset(RuntimeState* state) { - RETURN_IF_ERROR(aggregator_->Reset(state)); - return ExecNode::Reset(state); -} - void AggregationNode::Close(RuntimeState* state) { if (is_closed()) return; // All expr mem allocations should happen in the Aggregator. DCHECK(expr_results_pool() == nullptr || expr_results_pool()->total_allocated_bytes() == 0); - aggregator_->Close(state); + for (auto& agg : aggs_) agg->Close(state); ExecNode::Close(state); } void AggregationNode::DebugString(int indentation_level, stringstream* out) const { *out << string(indentation_level * 2, ' '); - *out << "AggregationNode(" - << "aggregator=" << aggregator_->DebugString(); + *out << "AggregationNode("; + for (auto& agg : aggs_) agg->DebugString(indentation_level, out); ExecNode::DebugString(indentation_level, out); *out << ")"; } http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/aggregation-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/aggregation-node.h b/be/src/exec/aggregation-node.h index 527a62d..dfec577 100644 --- a/be/src/exec/aggregation-node.h +++ b/be/src/exec/aggregation-node.h @@ -20,8 +20,7 @@ #include <memory> -#include "exec/aggregator.h" -#include "exec/exec-node.h" +#include "exec/aggregation-node-base.h" namespace impala { @@ -31,29 +30,15 @@ class RuntimeState; /// Node for doing partitioned hash aggregation. /// This node consumes the input from child(0) during Open() and then passes it to the /// Aggregator, which does the actual work of aggregating. -class AggregationNode : public ExecNode { +class AggregationNode : public AggregationNodeBase { public: AggregationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override; - virtual Status Prepare(RuntimeState* state) override; - virtual void Codegen(RuntimeState* state) override; virtual Status Open(RuntimeState* state) override; virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override; - virtual Status Reset(RuntimeState* state) override; virtual void Close(RuntimeState* state) override; virtual void DebugString(int indentation_level, std::stringstream* out) const override; - - private: - ///////////////////////////////////////// - /// BEGIN: Members that must be Reset() - - /// Performs the actual work of aggregating input rows. - std::unique_ptr<Aggregator> aggregator_; - - /// END: Members that must be Reset() - ///////////////////////////////////////// }; } // namespace impala http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/aggregator.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/aggregator.cc b/be/src/exec/aggregator.cc index 9e79bf4..a63d364 100644 --- a/be/src/exec/aggregator.cc +++ b/be/src/exec/aggregator.cc @@ -43,41 +43,41 @@ namespace impala { const char* Aggregator::LLVM_CLASS_NAME = "class.impala::Aggregator"; -Aggregator::Aggregator(ExecNode* exec_node, ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs, const std::string& name) +Aggregator::Aggregator(ExecNode* exec_node, ObjectPool* pool, + const TAggregator& taggregator, const DescriptorTbl& descs, const std::string& name, + int agg_idx) : id_(exec_node->id()), exec_node_(exec_node), + agg_idx_(agg_idx), pool_(pool), - intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id), + intermediate_tuple_id_(taggregator.intermediate_tuple_id), intermediate_tuple_desc_(descs.GetTupleDescriptor(intermediate_tuple_id_)), - output_tuple_id_(tnode.agg_node.output_tuple_id), + output_tuple_id_(taggregator.output_tuple_id), output_tuple_desc_(descs.GetTupleDescriptor(output_tuple_id_)), row_desc_(*exec_node->row_desc()), input_row_desc_(*exec_node->child(0)->row_desc()), - needs_finalize_(tnode.agg_node.need_finalize), - runtime_profile_(RuntimeProfile::Create(pool_, name)), - num_rows_returned_(0), - rows_returned_counter_(nullptr), - build_timer_(nullptr) {} + needs_finalize_(taggregator.need_finalize), + runtime_profile_(RuntimeProfile::Create(pool_, name)) {} Aggregator::~Aggregator() {} -Status Aggregator::Init(const TPlanNode& tnode, RuntimeState* state) { +Status Aggregator::Init(const TAggregator& taggregator, RuntimeState* state, + const std::vector<TExpr>& conjuncts) { DCHECK(intermediate_tuple_desc_ != nullptr); DCHECK(output_tuple_desc_ != nullptr); DCHECK_EQ(intermediate_tuple_desc_->slots().size(), output_tuple_desc_->slots().size()); - int j = num_grouping_exprs(); - for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i, ++j) { + int j = GetNumGroupingExprs(); + for (int i = 0; i < taggregator.aggregate_functions.size(); ++i, ++j) { SlotDescriptor* intermediate_slot_desc = intermediate_tuple_desc_->slots()[j]; SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j]; AggFn* agg_fn; - RETURN_IF_ERROR(AggFn::Create(tnode.agg_node.aggregate_functions[i], input_row_desc_, + RETURN_IF_ERROR(AggFn::Create(taggregator.aggregate_functions[i], input_row_desc_, *intermediate_slot_desc, *output_slot_desc, state, &agg_fn)); agg_fns_.push_back(agg_fn); } - RETURN_IF_ERROR(ScalarExpr::Create(tnode.conjuncts, row_desc_, state, &conjuncts_)); + RETURN_IF_ERROR(ScalarExpr::Create(conjuncts, row_desc_, state, &conjuncts_)); return Status::OK(); } @@ -123,7 +123,7 @@ void Aggregator::Close(RuntimeState* state) { void Aggregator::InitAggSlots( const vector<AggFnEvaluator*>& agg_fn_evals, Tuple* intermediate_tuple) { vector<SlotDescriptor*>::const_iterator slot_desc = - intermediate_tuple_desc_->slots().begin() + num_grouping_exprs(); + intermediate_tuple_desc_->slots().begin() + GetNumGroupingExprs(); for (int i = 0; i < agg_fn_evals.size(); ++i, ++slot_desc) { // To minimize branching on the UpdateTuple path, initialize the result value so that // the Add() UDA function can ignore the NULL bit of its destination value. E.g. for @@ -184,7 +184,7 @@ Tuple* Aggregator::GetOutputTuple( // Copy grouping values from tuple to dst. // TODO: Codegen this. if (dst != tuple) { - int num_grouping_slots = num_grouping_exprs(); + int num_grouping_slots = GetNumGroupingExprs(); for (int i = 0; i < num_grouping_slots; ++i) { SlotDescriptor* src_slot_desc = intermediate_tuple_desc_->slots()[i]; SlotDescriptor* dst_slot_desc = output_tuple_desc_->slots()[i]; @@ -568,7 +568,7 @@ Status Aggregator::CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn) // Loop over each expr and generate the IR for that slot. If the expr is not // count(*), generate a helper IR function to update the slot and call that. - int j = num_grouping_exprs(); + int j = GetNumGroupingExprs(); for (int i = 0; i < agg_fns_.size(); ++i, ++j) { SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[j]; AggFn* agg_fn = agg_fns_[i]; http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/aggregator.h ---------------------------------------------------------------------- diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h index 1ea3c47..bd2f10e 100644 --- a/be/src/exec/aggregator.h +++ b/be/src/exec/aggregator.h @@ -48,7 +48,7 @@ class RuntimeState; class ScalarExpr; class ScalarExprEvaluator; class SlotDescriptor; -class TPlanNode; +class TAggregator; class Tuple; class TupleDescriptor; class TupleRow; @@ -60,13 +60,15 @@ class TupleRow; /// be called and the results can be fetched with GetNext(). class Aggregator { public: - Aggregator(ExecNode* exec_node, ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs, const std::string& name); + /// 'agg_idx' is the index of 'taggregator' in the parent TAggregationNode. + Aggregator(ExecNode* exec_node, ObjectPool* pool, const TAggregator& taggregator, + const DescriptorTbl& descs, const std::string& name, int agg_idx); virtual ~Aggregator(); /// Aggregators follow the same lifecycle as ExecNodes, except that after Open() and /// before GetNext() rows should be added with AddBatch(), followed by InputDone()[ - virtual Status Init(const TPlanNode& tnode, RuntimeState* state) WARN_UNUSED_RESULT; + virtual Status Init(const TAggregator& taggregator, RuntimeState* state, + const std::vector<TExpr>& conjuncts) WARN_UNUSED_RESULT; virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT; virtual void Codegen(RuntimeState* state) = 0; virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT; @@ -77,10 +79,21 @@ class Aggregator { /// Adds all of the rows in 'batch' to the aggregation. virtual Status AddBatch(RuntimeState* state, RowBatch* batch) = 0; + + /// Used to insert input rows if this is a streaming pre-agg. Tries to aggregate all of + /// the rows of 'child_batch', but if there isn't enough memory available rows will be + /// streamed through and returned in 'out_batch'. If 'eos' is true, 'child_batch' was + /// fully processed, otherwise 'out_batch' was filled up and AddBatchStreaming() should + /// be called again with the same 'child_batch' and a new 'out_batch'. + /// AddBatch() and AddBatchStreaming() should not be called on the same Aggregator. + virtual Status AddBatchStreaming(RuntimeState* state, RowBatch* out_batch, + RowBatch* child_batch, bool* eos) WARN_UNUSED_RESULT = 0; + /// Indicates that all batches have been added. Must be called before GetNext(). - virtual Status InputDone() = 0; + virtual Status InputDone() WARN_UNUSED_RESULT = 0; + + virtual int GetNumGroupingExprs() = 0; - virtual int num_grouping_exprs() = 0; RuntimeProfile* runtime_profile() { return runtime_profile_; } virtual void SetDebugOptions(const TDebugOptions& debug_options) = 0; @@ -92,8 +105,13 @@ class Aggregator { protected: /// The id of the ExecNode this Aggregator corresponds to. - int id_; + const int id_; ExecNode* exec_node_; + + /// The index of this Aggregator within the AggregationNode. When returning output, this + /// Aggregator should only write tuples at 'agg_idx_' within the row. + const int agg_idx_; + ObjectPool* pool_; /// Account for peak memory used by this aggregator. @@ -154,11 +172,11 @@ class Aggregator { /// Runtime profile for this aggregator. Owned by 'pool_'. RuntimeProfile* const runtime_profile_; - int64_t num_rows_returned_; - RuntimeProfile::Counter* rows_returned_counter_; + int64_t num_rows_returned_ = 0; + RuntimeProfile::Counter* rows_returned_counter_ = nullptr; /// Time spent processing the child rows - RuntimeProfile::Counter* build_timer_; + RuntimeProfile::Counter* build_timer_ = nullptr; /// Initializes the aggregate function slots of an intermediate tuple. /// Any var-len data is allocated from the FunctionContexts. http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/exec-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index bbb9a4b..b9406dd 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -274,7 +274,7 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode, } break; case TPlanNodeType::AGGREGATION_NODE: - if (tnode.agg_node.use_streaming_preaggregation) { + if (tnode.agg_node.aggregators[0].use_streaming_preaggregation) { *node = pool->Add(new StreamingAggregationNode(pool, tnode, descs)); } else { *node = pool->Add(new AggregationNode(pool, tnode, descs)); http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/grouping-aggregator-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/grouping-aggregator-ir.cc b/be/src/exec/grouping-aggregator-ir.cc index d3dbf17..9f7c399 100644 --- a/be/src/exec/grouping-aggregator-ir.cc +++ b/be/src/exec/grouping-aggregator-ir.cc @@ -152,18 +152,18 @@ Status GroupingAggregator::AddIntermediateTuple(Partition* __restrict__ partitio } } -Status GroupingAggregator::AddBatchStreamingImpl(bool needs_serialize, +Status GroupingAggregator::AddBatchStreamingImpl(int agg_idx, bool needs_serialize, TPrefetchMode::type prefetch_mode, RowBatch* in_batch, RowBatch* out_batch, HashTableCtx* __restrict__ ht_ctx, int remaining_capacity[PARTITION_FANOUT]) { DCHECK(is_streaming_preagg_); - DCHECK_EQ(out_batch->num_rows(), 0); - DCHECK_LE(in_batch->num_rows(), out_batch->capacity()); + DCHECK(!out_batch->AtCapacity()); RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows()); HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); const int num_rows = in_batch->num_rows(); const int cache_size = expr_vals_cache->capacity(); - for (int group_start = 0; group_start < num_rows; group_start += cache_size) { + for (int group_start = streaming_idx_; group_start < num_rows; + group_start += cache_size) { EvalAndHashPrefetchGroup<false>(in_batch, group_start, prefetch_mode, ht_ctx); FOREACH_ROW_LIMIT(in_batch, group_start, cache_size, in_batch_iter) { @@ -184,15 +184,21 @@ Status GroupingAggregator::AddBatchStreamingImpl(bool needs_serialize, return std::move(add_batch_status_); } UpdateTuple(agg_fn_evals_.data(), intermediate_tuple, in_row); - out_batch_iterator.Get()->SetTuple(0, intermediate_tuple); + out_batch_iterator.Get()->SetTuple(agg_idx, intermediate_tuple); out_batch_iterator.Next(); out_batch->CommitLastRow(); + if (out_batch->AtCapacity() && in_batch_iter.RowNum() + 1 < num_rows) { + streaming_idx_ = in_batch_iter.RowNum() + 1; + goto ret; + } } DCHECK(add_batch_status_.ok()); expr_vals_cache->NextRow(); } DCHECK(expr_vals_cache->AtEnd()); } + streaming_idx_ = 0; +ret: if (needs_serialize) { FOREACH_ROW(out_batch, 0, out_batch_iter) { AggFnEvaluator::Serialize(agg_fn_evals_, out_batch_iter.Get()->GetTuple(0)); http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/grouping-aggregator.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc index 0ce2a71..9b38a0c 100644 --- a/be/src/exec/grouping-aggregator.cc +++ b/be/src/exec/grouping-aggregator.cc @@ -36,6 +36,7 @@ #include "runtime/tuple-row.h" #include "runtime/tuple.h" #include "util/runtime-profile-counters.h" +#include "util/string-parser.h" #include "gen-cpp/PlanNodes_types.h" @@ -85,40 +86,24 @@ static const int STREAMING_HT_MIN_REDUCTION_SIZE = sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]); GroupingAggregator::GroupingAggregator(ExecNode* exec_node, ObjectPool* pool, - const TPlanNode& tnode, const DescriptorTbl& descs) - : Aggregator(exec_node, pool, tnode, descs, "GroupingAggregator"), + const TAggregator& taggregator, const DescriptorTbl& descs, + int64_t estimated_input_cardinality, int agg_idx) + : Aggregator(exec_node, pool, taggregator, descs, + Substitute("GroupingAggregator $0", agg_idx), agg_idx), intermediate_row_desc_(intermediate_tuple_desc_, false), - is_streaming_preagg_(tnode.agg_node.use_streaming_preaggregation), - needs_serialize_(false), - output_partition_(nullptr), - resource_profile_(exec_node->resource_profile()), - num_input_rows_(0), + is_streaming_preagg_(taggregator.use_streaming_preaggregation), + resource_profile_(taggregator.resource_profile), is_in_subplan_(exec_node->IsInSubplan()), limit_(exec_node->limit()), - add_batch_impl_fn_(nullptr), - add_batch_streaming_impl_fn_(nullptr), - ht_resize_timer_(nullptr), - get_results_timer_(nullptr), - num_hash_buckets_(nullptr), - partitions_created_(nullptr), - max_partition_level_(nullptr), - num_row_repartitioned_(nullptr), - num_repartitions_(nullptr), - num_spilled_partitions_(nullptr), - largest_partition_percent_(nullptr), - streaming_timer_(nullptr), - num_passthrough_rows_(nullptr), - preagg_estimated_reduction_(nullptr), - preagg_streaming_ht_min_reduction_(nullptr), - estimated_input_cardinality_(tnode.agg_node.estimated_input_cardinality), - partition_eos_(false), + estimated_input_cardinality_(estimated_input_cardinality), partition_pool_(new ObjectPool()) { DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS); } -Status GroupingAggregator::Init(const TPlanNode& tnode, RuntimeState* state) { +Status GroupingAggregator::Init(const TAggregator& taggregator, RuntimeState* state, + const std::vector<TExpr>& conjuncts) { RETURN_IF_ERROR(ScalarExpr::Create( - tnode.agg_node.grouping_exprs, input_row_desc_, state, &grouping_exprs_)); + taggregator.grouping_exprs, input_row_desc_, state, &grouping_exprs_)); // Construct build exprs from intermediate_row_desc_ for (int i = 0; i < grouping_exprs_.size(); ++i) { @@ -133,7 +118,7 @@ Status GroupingAggregator::Init(const TPlanNode& tnode, RuntimeState* state) { if (build_expr->type().IsVarLenStringType()) string_grouping_exprs_.push_back(i); } - RETURN_IF_ERROR(Aggregator::Init(tnode, state)); + RETURN_IF_ERROR(Aggregator::Init(taggregator, state, conjuncts)); for (int i = 0; i < agg_fns_.size(); ++i) { needs_serialize_ |= agg_fns_[i]->SupportsSerialize(); } @@ -283,7 +268,7 @@ Status GroupingAggregator::GetRowsFromPartition( Tuple* output_tuple = GetOutputTuple(output_partition_->agg_fn_evals, intermediate_tuple, row_batch->tuple_data_pool()); output_iterator_.Next(); - row->SetTuple(0, output_tuple); + row->SetTuple(agg_idx_, output_tuple); DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size()); if (ExecNode::EvalConjuncts(conjunct_evals_.data(), conjuncts_.size(), row)) { row_batch->CommitLastRow(); @@ -382,6 +367,7 @@ void GroupingAggregator::CleanupHashTbl( Status GroupingAggregator::Reset(RuntimeState* state) { DCHECK(!is_streaming_preagg_) << "Cannot reset preaggregation"; partition_eos_ = false; + streaming_idx_ = 0; // Reset the HT and the partitions for this grouping agg. ht_ctx_->set_level(0); ClosePartitions(); @@ -399,7 +385,6 @@ void GroupingAggregator::Close(RuntimeState* state) { } ScalarExpr::Close(grouping_exprs_); ScalarExpr::Close(build_exprs_); - reservation_manager_.Close(state); if (reservation_tracker_ != nullptr) reservation_tracker_->Close(); // Must be called after tuple_pool_ is freed, so that mem_tracker_ can be closed. @@ -422,7 +407,8 @@ Status GroupingAggregator::AddBatch(RuntimeState* state, RowBatch* batch) { } Status GroupingAggregator::AddBatchStreaming( - RuntimeState* state, RowBatch* out_batch, RowBatch* child_batch) { + RuntimeState* state, RowBatch* out_batch, RowBatch* child_batch, bool* eos) { + RETURN_IF_ERROR(QueryMaintenance(state)); SCOPED_TIMER(streaming_timer_); RETURN_IF_ERROR(QueryMaintenance(state)); num_input_rows_ += child_batch->num_rows(); @@ -457,12 +443,13 @@ Status GroupingAggregator::AddBatchStreaming( TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode; if (add_batch_streaming_impl_fn_ != nullptr) { - RETURN_IF_ERROR(add_batch_streaming_impl_fn_(this, needs_serialize_, prefetch_mode, - child_batch, out_batch, ht_ctx_.get(), remaining_capacity)); + RETURN_IF_ERROR(add_batch_streaming_impl_fn_(this, agg_idx_, needs_serialize_, + prefetch_mode, child_batch, out_batch, ht_ctx_.get(), remaining_capacity)); } else { - RETURN_IF_ERROR(AddBatchStreamingImpl(needs_serialize_, prefetch_mode, child_batch, - out_batch, ht_ctx_.get(), remaining_capacity)); + RETURN_IF_ERROR(AddBatchStreamingImpl(agg_idx_, needs_serialize_, prefetch_mode, + child_batch, out_batch, ht_ctx_.get(), remaining_capacity)); } + *eos = (streaming_idx_ == 0); num_rows_returned_ += out_batch->num_rows(); COUNTER_SET(num_passthrough_rows_, num_rows_returned_); @@ -1040,12 +1027,16 @@ Status GroupingAggregator::CodegenAddBatchStreamingImpl( llvm::Function* add_batch_streaming_impl_fn = codegen->GetFunction(ir_fn, true); DCHECK(add_batch_streaming_impl_fn != nullptr); + // Make agg_idx arg constant. + llvm::Value* agg_idx_arg = codegen->GetArgument(add_batch_streaming_impl_fn, 2); + agg_idx_arg->replaceAllUsesWith(codegen->GetI32Constant(agg_idx_)); + // Make needs_serialize arg constant so dead code can be optimised out. - llvm::Value* needs_serialize_arg = codegen->GetArgument(add_batch_streaming_impl_fn, 2); + llvm::Value* needs_serialize_arg = codegen->GetArgument(add_batch_streaming_impl_fn, 3); needs_serialize_arg->replaceAllUsesWith(codegen->GetBoolConstant(needs_serialize_)); // Replace prefetch_mode with constant so branches can be optimised out. - llvm::Value* prefetch_mode_arg = codegen->GetArgument(add_batch_streaming_impl_fn, 3); + llvm::Value* prefetch_mode_arg = codegen->GetArgument(add_batch_streaming_impl_fn, 4); prefetch_mode_arg->replaceAllUsesWith(codegen->GetI32Constant(prefetch_mode)); llvm::Function* update_tuple_fn; http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/grouping-aggregator.h ---------------------------------------------------------------------- diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h index fb95cd7..6f630d4 100644 --- a/be/src/exec/grouping-aggregator.h +++ b/be/src/exec/grouping-aggregator.h @@ -36,6 +36,7 @@ class AggFnEvaluator; class LlvmCodeGen; class RowBatch; class RuntimeState; +class TAggregator; class Tuple; /// Aggregator for doing grouping aggregations. Input is passed to the aggregator through @@ -113,10 +114,12 @@ class Tuple; /// TODO: support an Init() method with an initial value in the UDAF interface. class GroupingAggregator : public Aggregator { public: - GroupingAggregator(ExecNode* exec_node, ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs); + GroupingAggregator(ExecNode* exec_node, ObjectPool* pool, + const TAggregator& taggregator, const DescriptorTbl& descs, + int64_t estimated_input_cardinality, int agg_idx); - virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override; + virtual Status Init(const TAggregator& taggregator, RuntimeState* state, + const std::vector<TExpr>& conjuncts) override; virtual Status Prepare(RuntimeState* state) override; virtual void Codegen(RuntimeState* state) override; virtual Status Open(RuntimeState* state) override; @@ -125,15 +128,11 @@ class GroupingAggregator : public Aggregator { virtual void Close(RuntimeState* state) override; virtual Status AddBatch(RuntimeState* state, RowBatch* batch) override; - /// Used to insert input rows if this is a streaming pre-agg. Tries to aggregate all of - /// the rows of 'child_batch', but if there isn't enough memory available rows will be - /// streamed through and returned in 'out_batch'. AddBatch() and AddBatchStreaming() - /// should not be called on the same GroupingAggregator. - Status AddBatchStreaming( - RuntimeState* state, RowBatch* out_batch, RowBatch* child_batch); - virtual Status InputDone() override WARN_UNUSED_RESULT; + virtual Status AddBatchStreaming(RuntimeState* state, RowBatch* out_batch, + RowBatch* child_batch, bool* eos) override; + virtual Status InputDone() override; - virtual int num_grouping_exprs() override { return grouping_exprs_.size(); } + virtual int GetNumGroupingExprs() override { return grouping_exprs_.size(); } virtual void SetDebugOptions(const TDebugOptions& debug_options) override; @@ -184,7 +183,7 @@ class GroupingAggregator : public Aggregator { const bool is_streaming_preagg_; /// True if any of the evaluators require the serialize step. - bool needs_serialize_; + bool needs_serialize_ = false; /// Exprs used to evaluate input rows std::vector<ScalarExpr*> grouping_exprs_; @@ -211,7 +210,7 @@ class GroupingAggregator : public Aggregator { /// The current partition and iterator to the next row in its hash table that we need /// to return in GetNext(). If 'output_iterator_' is not AtEnd() then /// 'output_partition_' is not nullptr. - Partition* output_partition_; + Partition* output_partition_ = nullptr; HashTable::Iterator output_iterator_; /// Resource information sent from the frontend. @@ -222,7 +221,7 @@ class GroupingAggregator : public Aggregator { BufferPool::ClientHandle* buffer_pool_client(); /// The number of rows that have been passed to AddBatch() or AddBatchStreaming(). - int64_t num_input_rows_; + int64_t num_input_rows_ = 0; /// True if this aggregator is being executed in a subplan. const bool is_in_subplan_; @@ -233,52 +232,52 @@ class GroupingAggregator : public Aggregator { typedef Status (*AddBatchImplFn)( GroupingAggregator*, RowBatch*, TPrefetchMode::type, HashTableCtx*); /// Jitted AddBatchImpl function pointer. Null if codegen is disabled. - AddBatchImplFn add_batch_impl_fn_; + AddBatchImplFn add_batch_impl_fn_ = nullptr; - typedef Status (*AddBatchStreamingImplFn)(GroupingAggregator*, bool, + typedef Status (*AddBatchStreamingImplFn)(GroupingAggregator*, int, bool, TPrefetchMode::type, RowBatch*, RowBatch*, HashTableCtx*, int[PARTITION_FANOUT]); /// Jitted AddBatchStreamingImpl function pointer. Null if codegen is disabled. - AddBatchStreamingImplFn add_batch_streaming_impl_fn_; + AddBatchStreamingImplFn add_batch_streaming_impl_fn_ = nullptr; /// Total time spent resizing hash tables. - RuntimeProfile::Counter* ht_resize_timer_; + RuntimeProfile::Counter* ht_resize_timer_ = nullptr; /// Time spent returning the aggregated rows - RuntimeProfile::Counter* get_results_timer_; + RuntimeProfile::Counter* get_results_timer_ = nullptr; /// Total number of hash buckets across all partitions. - RuntimeProfile::Counter* num_hash_buckets_; + RuntimeProfile::Counter* num_hash_buckets_ = nullptr; /// Total number of partitions created. - RuntimeProfile::Counter* partitions_created_; + RuntimeProfile::Counter* partitions_created_ = nullptr; /// Level of max partition (i.e. number of repartitioning steps). - RuntimeProfile::HighWaterMarkCounter* max_partition_level_; + RuntimeProfile::HighWaterMarkCounter* max_partition_level_ = nullptr; /// Number of rows that have been repartitioned. - RuntimeProfile::Counter* num_row_repartitioned_; + RuntimeProfile::Counter* num_row_repartitioned_ = nullptr; /// Number of partitions that have been repartitioned. - RuntimeProfile::Counter* num_repartitions_; + RuntimeProfile::Counter* num_repartitions_ = nullptr; /// Number of partitions that have been spilled. - RuntimeProfile::Counter* num_spilled_partitions_; + RuntimeProfile::Counter* num_spilled_partitions_ = nullptr; /// The largest fraction after repartitioning. This is expected to be /// 1 / PARTITION_FANOUT. A value much larger indicates skew. - RuntimeProfile::HighWaterMarkCounter* largest_partition_percent_; + RuntimeProfile::HighWaterMarkCounter* largest_partition_percent_ = nullptr; /// Time spent in streaming preagg algorithm. - RuntimeProfile::Counter* streaming_timer_; + RuntimeProfile::Counter* streaming_timer_ = nullptr; /// The number of rows passed through without aggregation. - RuntimeProfile::Counter* num_passthrough_rows_; + RuntimeProfile::Counter* num_passthrough_rows_ = nullptr; /// The estimated reduction of the preaggregation. - RuntimeProfile::Counter* preagg_estimated_reduction_; + RuntimeProfile::Counter* preagg_estimated_reduction_ = nullptr; /// Expose the minimum reduction factor to continue growing the hash tables. - RuntimeProfile::Counter* preagg_streaming_ht_min_reduction_; + RuntimeProfile::Counter* preagg_streaming_ht_min_reduction_ = nullptr; /// The estimated number of input rows from the planner. int64_t estimated_input_cardinality_; @@ -289,7 +288,14 @@ class GroupingAggregator : public Aggregator { /// BEGIN: Members that must be Reset() /// If true, no more rows to output from partitions. - bool partition_eos_; + bool partition_eos_ = false; + + /// When streaming rows through unaggregated, if the out batch reaches capacity before + /// the input batch is fully processed, 'streaming_idx_' indicates the position within + /// the input batch to resume at in the next call to AddBatchStreaming(). This is used + /// in the case where there are multiple aggregators, as the out batch passed to + /// AddBatchStreaming() may already have rows passed through by another aggregator. + int32_t streaming_idx_ = 0; /// Used for hash-related functionality, such as evaluating rows and calculating hashes. /// It also owns the evaluators for the grouping and build expressions used during hash @@ -503,8 +509,8 @@ class GroupingAggregator : public Aggregator { /// into the hash table or added to 'out_batch' in the intermediate tuple format. /// 'in_batch' is processed entirely, and 'out_batch' must have enough capacity to /// store all of the rows in 'in_batch'. - /// 'needs_serialize' is an argument so that codegen can replace it with a constant, - /// rather than using the member variable 'needs_serialize_'. + /// 'agg_idx' and 'needs_serialize' are arguments so that codegen can replace them with + /// constants, rather than using the member variables of the same names. /// 'prefetch_mode' specifies the prefetching mode in use. If it's not PREFETCH_NONE, /// hash table buckets will be prefetched based on the hash values computed. Note /// that 'prefetch_mode' will be substituted with constants during codegen time. @@ -512,9 +518,9 @@ class GroupingAggregator : public Aggregator { /// additional rows that can be added to the hash table per partition. It is updated /// by AddBatchStreamingImpl() when it inserts new rows. /// 'ht_ctx' is passed in as a way to avoid aliasing of 'this' confusing the optimiser. - Status AddBatchStreamingImpl(bool needs_serialize, TPrefetchMode::type prefetch_mode, - RowBatch* in_batch, RowBatch* out_batch, HashTableCtx* ht_ctx, - int remaining_capacity[PARTITION_FANOUT]) WARN_UNUSED_RESULT; + Status AddBatchStreamingImpl(int agg_idx, bool needs_serialize, + TPrefetchMode::type prefetch_mode, RowBatch* in_batch, RowBatch* out_batch, + HashTableCtx* ht_ctx, int remaining_capacity[PARTITION_FANOUT]) WARN_UNUSED_RESULT; /// Tries to add intermediate to the hash table 'hash_tbl' of 'partition' for streaming /// aggregation. The input row must have been evaluated with 'ht_ctx', with 'hash' set http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/non-grouping-aggregator.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/non-grouping-aggregator.cc b/be/src/exec/non-grouping-aggregator.cc index 1ee4e46..2bcaa98 100644 --- a/be/src/exec/non-grouping-aggregator.cc +++ b/be/src/exec/non-grouping-aggregator.cc @@ -36,11 +36,9 @@ namespace impala { NonGroupingAggregator::NonGroupingAggregator(ExecNode* exec_node, ObjectPool* pool, - const TPlanNode& tnode, const DescriptorTbl& descs) - : Aggregator(exec_node, pool, tnode, descs, "NonGroupingAggregator"), - add_batch_impl_fn_(nullptr), - singleton_output_tuple_(nullptr), - singleton_output_tuple_returned_(true) {} + const TAggregator& taggregator, const DescriptorTbl& descs, int agg_idx) + : Aggregator(exec_node, pool, taggregator, descs, + Substitute("NonGroupingAggregator $0", agg_idx), agg_idx) {} Status NonGroupingAggregator::Prepare(RuntimeState* state) { RETURN_IF_ERROR(Aggregator::Prepare(state)); @@ -84,6 +82,7 @@ Status NonGroupingAggregator::GetNext( void NonGroupingAggregator::GetSingletonOutput(RowBatch* row_batch) { int row_idx = row_batch->AddRow(); TupleRow* row = row_batch->GetRow(row_idx); + row_batch->ClearRow(row); // The output row batch may reference memory allocated by Serialize() or Finalize(), // allocating that memory directly from the row batch's pool means we can safely return // the batch. @@ -91,7 +90,7 @@ void NonGroupingAggregator::GetSingletonOutput(RowBatch* row_batch) { ScopedResultsPool::Create(agg_fn_evals_, row_batch->tuple_data_pool()); Tuple* output_tuple = GetOutputTuple( agg_fn_evals_, singleton_output_tuple_, row_batch->tuple_data_pool()); - row->SetTuple(0, output_tuple); + row->SetTuple(agg_idx_, output_tuple); if (ExecNode::EvalConjuncts(conjunct_evals_.data(), conjunct_evals_.size(), row)) { row_batch->CommitLastRow(); ++num_rows_returned_; @@ -128,6 +127,12 @@ Status NonGroupingAggregator::AddBatch(RuntimeState* state, RowBatch* batch) { return Status::OK(); } +Status NonGroupingAggregator::AddBatchStreaming( + RuntimeState* state, RowBatch* out_batch, RowBatch* child_batch, bool* eos) { + *eos = true; + return AddBatch(state, child_batch); +} + Tuple* NonGroupingAggregator::ConstructSingletonOutputTuple( const vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool) { Tuple* output_tuple = Tuple::Create(intermediate_tuple_desc_->byte_size(), pool); http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/non-grouping-aggregator.h ---------------------------------------------------------------------- diff --git a/be/src/exec/non-grouping-aggregator.h b/be/src/exec/non-grouping-aggregator.h index 41b3e0d..49a663d 100644 --- a/be/src/exec/non-grouping-aggregator.h +++ b/be/src/exec/non-grouping-aggregator.h @@ -33,7 +33,7 @@ class LlvmCodeGen; class ObjectPool; class RowBatch; class RuntimeState; -class TPlanNode; +class TAggregator; class Tuple; /// Aggregator for doing non-grouping aggregations. Input is passed to the aggregator @@ -41,8 +41,8 @@ class Tuple; /// not support streaming preaggregation. class NonGroupingAggregator : public Aggregator { public: - NonGroupingAggregator(ExecNode* exec_node, ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs); + NonGroupingAggregator(ExecNode* exec_node, ObjectPool* pool, + const TAggregator& taggregator, const DescriptorTbl& descs, int agg_idx); virtual Status Prepare(RuntimeState* state) override; virtual void Codegen(RuntimeState* state) override; @@ -52,9 +52,15 @@ class NonGroupingAggregator : public Aggregator { virtual void Close(RuntimeState* state) override; virtual Status AddBatch(RuntimeState* state, RowBatch* batch) override; + + /// NonGroupingAggregators behave the same in streaming and non-streaming contexts, so + /// this just calls AddBatch. + virtual Status AddBatchStreaming(RuntimeState* state, RowBatch* out_batch, + RowBatch* child_batch, bool* eos) override; + virtual Status InputDone() override { return Status::OK(); } - virtual int num_grouping_exprs() override { return 0; } + virtual int GetNumGroupingExprs() override { return 0; } /// NonGroupingAggregator doesn't create a buffer pool client so it doesn't need the /// debug options. @@ -72,7 +78,7 @@ class NonGroupingAggregator : public Aggregator { typedef Status (*AddBatchImplFn)(NonGroupingAggregator*, RowBatch*); /// Jitted AddBatchImpl function pointer. Null if codegen is disabled. - AddBatchImplFn add_batch_impl_fn_; + AddBatchImplFn add_batch_impl_fn_ = nullptr; ///////////////////////////////////////// /// BEGIN: Members that must be Reset() @@ -80,8 +86,8 @@ class NonGroupingAggregator : public Aggregator { /// Result of aggregation w/o GROUP BY. /// Note: can be NULL even if there is no grouping if the result tuple is 0 width /// e.g. select 1 from table group by col. - Tuple* singleton_output_tuple_; - bool singleton_output_tuple_returned_; + Tuple* singleton_output_tuple_ = nullptr; + bool singleton_output_tuple_returned_ = true; /// END: Members that must be Reset() ///////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/streaming-aggregation-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/streaming-aggregation-node.cc b/be/src/exec/streaming-aggregation-node.cc index 7280849..7cbebb8 100644 --- a/be/src/exec/streaming-aggregation-node.cc +++ b/be/src/exec/streaming-aggregation-node.cc @@ -22,6 +22,7 @@ #include "gutil/strings/substitute.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" +#include "runtime/tuple-row.h" #include "util/runtime-profile-counters.h" #include "gen-cpp/PlanNodes_types.h" @@ -32,35 +33,12 @@ namespace impala { StreamingAggregationNode::StreamingAggregationNode( ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : ExecNode(pool, tnode, descs), child_eos_(false) { + : AggregationNodeBase(pool, tnode, descs) { DCHECK(tnode.conjuncts.empty()) << "Preaggs have no conjuncts"; - DCHECK(!tnode.agg_node.grouping_exprs.empty()) << "Streaming preaggs do grouping"; DCHECK(limit_ == -1) << "Preaggs have no limits"; -} - -Status StreamingAggregationNode::Init(const TPlanNode& tnode, RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::Init(tnode, state)); - aggregator_.reset(new GroupingAggregator(this, pool_, tnode, state->desc_tbl())); - RETURN_IF_ERROR(aggregator_->Init(tnode, state)); - runtime_profile_->AddChild(aggregator_->runtime_profile()); - return Status::OK(); -} - -Status StreamingAggregationNode::Prepare(RuntimeState* state) { - SCOPED_TIMER(runtime_profile_->total_time_counter()); - RETURN_IF_ERROR(ExecNode::Prepare(state)); - aggregator_->SetDebugOptions(debug_options_); - RETURN_IF_ERROR(aggregator_->Prepare(state)); - state->CheckAndAddCodegenDisabledMessage(runtime_profile()); - return Status::OK(); -} - -void StreamingAggregationNode::Codegen(RuntimeState* state) { - DCHECK(state->ShouldCodegen()); - ExecNode::Codegen(state); - if (IsNodeCodegenDisabled()) return; - - aggregator_->Codegen(state); + for (int i = 0; i < tnode.agg_node.aggregators.size(); ++i) { + DCHECK(tnode.agg_node.aggregators[i].use_streaming_preaggregation); + } } Status StreamingAggregationNode::Open(RuntimeState* state) { @@ -69,7 +47,7 @@ Status StreamingAggregationNode::Open(RuntimeState* state) { RETURN_IF_ERROR(child(0)->Open(state)); RETURN_IF_ERROR(ExecNode::Open(state)); - RETURN_IF_ERROR(aggregator_->Open(state)); + for (auto& agg : aggs_) RETURN_IF_ERROR(agg->Open(state)); // Streaming preaggregations do all processing in GetNext(). return Status::OK(); @@ -86,15 +64,22 @@ Status StreamingAggregationNode::GetNext( return Status::OK(); } - bool aggregator_eos = false; - if (!child_eos_) { + // With multiple Aggregators, each will only set a single tuple per row. We rely on the + // other tuples to be null to detect which Aggregator set which row. + if (aggs_.size() > 1) row_batch->ClearTuplePointers(); + + if (!child_eos_ || !child_batch_processed_) { // For streaming preaggregations, we process rows from the child as we go. RETURN_IF_ERROR(GetRowsStreaming(state, row_batch)); + *eos = false; } else { - RETURN_IF_ERROR(aggregator_->GetNext(state, row_batch, &aggregator_eos)); + bool aggregator_eos = false; + RETURN_IF_ERROR( + aggs_[curr_output_agg_idx_]->GetNext(state, row_batch, &aggregator_eos)); + if (aggregator_eos) ++curr_output_agg_idx_; + *eos = curr_output_agg_idx_ >= aggs_.size(); } - *eos = aggregator_eos && child_eos_; num_rows_returned_ += row_batch->num_rows(); COUNTER_SET(rows_returned_counter_, num_rows_returned_); return Status::OK(); @@ -102,27 +87,94 @@ Status StreamingAggregationNode::GetNext( Status StreamingAggregationNode::GetRowsStreaming( RuntimeState* state, RowBatch* out_batch) { - DCHECK(!child_eos_); - if (child_batch_ == nullptr) { child_batch_.reset( new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker())); } + int num_aggs = aggs_.size(); + // Create mini batches. + vector<unique_ptr<RowBatch>> mini_batches; + if (!replicate_input_ && num_aggs > 1) { + for (int i = 0; i < num_aggs; ++i) { + mini_batches.push_back(make_unique<RowBatch>( + child(0)->row_desc(), state->batch_size(), mem_tracker())); + } + } + do { DCHECK_EQ(out_batch->num_rows(), 0); RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(child(0)->GetNext(state, child_batch_.get(), &child_eos_)); - - RETURN_IF_ERROR(aggregator_->AddBatchStreaming(state, out_batch, child_batch_.get())); - child_batch_->Reset(); // All rows from child_batch_ were processed. + if (child_batch_processed_) { + DCHECK_EQ(child_batch_->num_rows(), 0); + RETURN_IF_ERROR(child(0)->GetNext(state, child_batch_.get(), &child_eos_)); + child_batch_processed_ = false; + } + + if (num_aggs == 1) { + RETURN_IF_ERROR(aggs_[0]->AddBatchStreaming( + state, out_batch, child_batch_.get(), &child_batch_processed_)); + // We're not guaranteed to be able to stream the entirety of 'child_batch_' into + // 'out_batch' as AddBatchStreaming() will attach all var-len data to 'out_batch' + // and 'child_batch_' may have been referencing data that wasn't attached to it. + if (child_batch_processed_) { + child_batch_->Reset(); + } + continue; + } + + if (replicate_input_) { + bool eos = false; + while (replicate_agg_idx_ < num_aggs) { + RETURN_IF_ERROR(aggs_[replicate_agg_idx_]->AddBatchStreaming( + state, out_batch, child_batch_.get(), &eos)); + if (eos) ++replicate_agg_idx_; + if (out_batch->AtCapacity()) break; + // If out_batch isn't full, we must have processed the entire input. + DCHECK(eos); + } + if (replicate_agg_idx_ == num_aggs) { + replicate_agg_idx_ = 0; + child_batch_processed_ = true; + child_batch_->Reset(); + } + continue; + } + + // Separate input batch into mini batches destined for the different aggs. + int num_tuples = child(0)->row_desc()->tuple_descriptors().size(); + DCHECK_EQ(num_aggs, num_tuples); + int num_rows = child_batch_->num_rows(); + DCHECK_LE(num_rows, out_batch->capacity()); + if (num_rows > 0) { + RETURN_IF_ERROR(SplitMiniBatches(child_batch_.get(), &mini_batches)); + + for (int i = 0; i < num_tuples; ++i) { + RowBatch* mini_batch = mini_batches[i].get(); + if (mini_batch->num_rows() > 0) { + bool eos; + RETURN_IF_ERROR( + aggs_[i]->AddBatchStreaming(state, out_batch, mini_batch, &eos)); + // child_batch_'s size is <= out_batch's capacity, so even under high memory + // pressure where all rows are streamed though, we will be able to process + // the entire input. Note that unlike the single agg case above, this works + // because any node that hits this path must have been preceded by an + // aggregation node in 'replicate_input_' mode, and so all memory pointed to by + // 'child_batch_' will be attached to 'child_batch_'. + DCHECK(eos); + mini_batch->Reset(); + } + } + } + child_batch_processed_ = true; + child_batch_->Reset(); } while (out_batch->num_rows() == 0 && !child_eos_); if (child_eos_) { child(0)->Close(state); child_batch_.reset(); - RETURN_IF_ERROR(aggregator_->InputDone()); + for (auto& agg : aggs_) RETURN_IF_ERROR(agg->InputDone()); } return Status::OK(); @@ -138,7 +190,7 @@ void StreamingAggregationNode::Close(RuntimeState* state) { // All expr mem allocations should happen in the Aggregator. DCHECK(expr_results_pool() == nullptr || expr_results_pool()->total_allocated_bytes() == 0); - aggregator_->Close(state); + for (auto& agg : aggs_) agg->Close(state); child_batch_.reset(); ExecNode::Close(state); } @@ -146,8 +198,8 @@ void StreamingAggregationNode::Close(RuntimeState* state) { void StreamingAggregationNode::DebugString( int indentation_level, stringstream* out) const { *out << string(indentation_level * 2, ' '); - *out << "StreamingAggregationNode(" - << "aggregator=" << aggregator_->DebugString(); + *out << "StreamingAggregationNode("; + for (auto& agg : aggs_) agg->DebugString(indentation_level, out); ExecNode::DebugString(indentation_level, out); *out << ")"; } http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exec/streaming-aggregation-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/streaming-aggregation-node.h b/be/src/exec/streaming-aggregation-node.h index 8e06b2a..71193b9 100644 --- a/be/src/exec/streaming-aggregation-node.h +++ b/be/src/exec/streaming-aggregation-node.h @@ -20,8 +20,7 @@ #include <memory> -#include "exec/exec-node.h" -#include "exec/grouping-aggregator.h" +#include "exec/aggregation-node-base.h" namespace impala { @@ -44,14 +43,11 @@ class RuntimeState; /// the final aggregation. /// /// This node only supports grouping aggregations. -class StreamingAggregationNode : public ExecNode { +class StreamingAggregationNode : public AggregationNodeBase { public: StreamingAggregationNode( ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override; - virtual Status Prepare(RuntimeState* state) override; - virtual void Codegen(RuntimeState* state) override; virtual Status Open(RuntimeState* state) override; virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override; virtual Status Reset(RuntimeState* state) override; @@ -63,14 +59,21 @@ class StreamingAggregationNode : public ExecNode { ///////////////////////////////////////// /// BEGIN: Members that must be Reset() - /// Row batch used as argument to GetNext() for the child node preaggregations. Store - /// in node to avoid reallocating for every GetNext() call when streaming. + /// Row batch retrieved from the child and passed to Aggregators in GetNext(). Stored + /// here as we may need it in multiple GetNext() calls if we're streaming rows through. std::unique_ptr<RowBatch> child_batch_; + /// If true, there are no more rows in 'child_batch_' that need to be passed to any + /// Aggregator, and the next call to GetNext() will retrieve another batch, unless + /// 'child_eos_' is true. + bool child_batch_processed_ = true; + /// True if no more rows to process from child. - bool child_eos_; + bool child_eos_ = false; - std::unique_ptr<GroupingAggregator> aggregator_; + /// If 'replicate_input_' is true, the index in 'aggs_' of the next Aggregator to pass + /// 'child_batch_' into. + int32_t replicate_agg_idx_ = 0; /// END: Members that must be Reset() ///////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exprs/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt index 75312b5..8b270f4 100644 --- a/be/src/exprs/CMakeLists.txt +++ b/be/src/exprs/CMakeLists.txt @@ -63,6 +63,7 @@ add_library(Exprs udf-builtins-ir.cc utility-functions.cc utility-functions-ir.cc + valid-tuple-id.cc ) add_dependencies(Exprs gen-deps gen_ir_descriptions) http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exprs/aggregate-functions-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc index 33533f5..bc2bc0e 100644 --- a/be/src/exprs/aggregate-functions-ir.cc +++ b/be/src/exprs/aggregate-functions-ir.cc @@ -1690,6 +1690,43 @@ BigIntVal AggregateFunctions::SampledNdvFinalize(FunctionContext* ctx, return round(scaled_extrap_ndv * ndv_scale); } +template <typename T> +void AggregateFunctions::AggIfUpdate( + FunctionContext* ctx, const BooleanVal& cond, const T& src, T* dst) { + DCHECK(!cond.is_null); + if (cond.val) *dst = src; +} + +template <> +void AggregateFunctions::AggIfUpdate( + FunctionContext* ctx, const BooleanVal& cond, const StringVal& src, StringVal* dst) { + DCHECK(!cond.is_null); + if (cond.val) CopyStringVal(ctx, src, dst); +} + +template <typename T> +void AggregateFunctions::AggIfMerge(FunctionContext*, const T& src, T* dst) { + *dst = src; +} + +template <> +void AggregateFunctions::AggIfMerge( + FunctionContext* ctx, const StringVal& src, StringVal* dst) { + CopyStringVal(ctx, src, dst); +} + +template <typename T> +T AggregateFunctions::AggIfFinalize(FunctionContext*, const T& src) { + return src; +} + +template <> +StringVal AggregateFunctions::AggIfFinalize(FunctionContext* ctx, const StringVal& src) { + StringVal result = StringValGetValue(ctx, src); + if (!src.is_null) ctx->Free(src.ptr); + return result; +} + // An implementation of a simple single pass variance algorithm. A standard UDA must // be single pass (i.e. does not scan the table more than once), so the most canonical // two pass approach is not practical. @@ -2351,6 +2388,62 @@ template void AggregateFunctions::SampledNdvUpdate( template void AggregateFunctions::SampledNdvUpdate( FunctionContext*, const DecimalVal&, const DoubleVal&, StringVal*); +template void AggregateFunctions::AggIfUpdate( + FunctionContext*, const BooleanVal& cond, const BooleanVal& src, BooleanVal* dst); +template void AggregateFunctions::AggIfUpdate( + FunctionContext*, const BooleanVal& cond, const TinyIntVal& src, TinyIntVal* dst); +template void AggregateFunctions::AggIfUpdate( + FunctionContext*, const BooleanVal& cond, const SmallIntVal& src, SmallIntVal* dst); +template void AggregateFunctions::AggIfUpdate( + FunctionContext*, const BooleanVal& cond, const IntVal& src, IntVal* dst); +template void AggregateFunctions::AggIfUpdate( + FunctionContext*, const BooleanVal& cond, const BigIntVal& src, BigIntVal* dst); +template void AggregateFunctions::AggIfUpdate( + FunctionContext*, const BooleanVal& cond, const FloatVal& src, FloatVal* dst); +template void AggregateFunctions::AggIfUpdate( + FunctionContext*, const BooleanVal& cond, const DoubleVal& src, DoubleVal* dst); +template void AggregateFunctions::AggIfUpdate( + FunctionContext*, const BooleanVal& cond, const TimestampVal& src, TimestampVal* dst); +template void AggregateFunctions::AggIfUpdate( + FunctionContext*, const BooleanVal& cond, const DecimalVal& src, DecimalVal* dst); + +template void AggregateFunctions::AggIfMerge( + FunctionContext*, const BooleanVal& src, BooleanVal* dst); +template void AggregateFunctions::AggIfMerge( + FunctionContext*, const TinyIntVal& src, TinyIntVal* dst); +template void AggregateFunctions::AggIfMerge( + FunctionContext*, const SmallIntVal& src, SmallIntVal* dst); +template void AggregateFunctions::AggIfMerge( + FunctionContext*, const IntVal& src, IntVal* dst); +template void AggregateFunctions::AggIfMerge( + FunctionContext*, const BigIntVal& src, BigIntVal* dst); +template void AggregateFunctions::AggIfMerge( + FunctionContext*, const FloatVal& src, FloatVal* dst); +template void AggregateFunctions::AggIfMerge( + FunctionContext*, const DoubleVal& src, DoubleVal* dst); +template void AggregateFunctions::AggIfMerge( + FunctionContext*, const TimestampVal& src, TimestampVal* dst); +template void AggregateFunctions::AggIfMerge( + FunctionContext*, const DecimalVal& src, DecimalVal* dst); + +template BooleanVal AggregateFunctions::AggIfFinalize( + FunctionContext*, const BooleanVal& src); +template TinyIntVal AggregateFunctions::AggIfFinalize( + FunctionContext*, const TinyIntVal& src); +template SmallIntVal AggregateFunctions::AggIfFinalize( + FunctionContext*, const SmallIntVal& src); +template IntVal AggregateFunctions::AggIfFinalize(FunctionContext*, const IntVal& src); +template BigIntVal AggregateFunctions::AggIfFinalize( + FunctionContext*, const BigIntVal& src); +template FloatVal AggregateFunctions::AggIfFinalize( + FunctionContext*, const FloatVal& src); +template DoubleVal AggregateFunctions::AggIfFinalize( + FunctionContext*, const DoubleVal& src); +template TimestampVal AggregateFunctions::AggIfFinalize( + FunctionContext*, const TimestampVal& src); +template DecimalVal AggregateFunctions::AggIfFinalize( + FunctionContext*, const DecimalVal& src); + template void AggregateFunctions::KnuthVarUpdate( FunctionContext*, const TinyIntVal&, StringVal*); template void AggregateFunctions::KnuthVarUpdate( http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exprs/aggregate-functions.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/aggregate-functions.h b/be/src/exprs/aggregate-functions.h index 243e9f7..e3c1425 100644 --- a/be/src/exprs/aggregate-functions.h +++ b/be/src/exprs/aggregate-functions.h @@ -221,6 +221,16 @@ class AggregateFunctions { static void SampledNdvMerge(FunctionContext*, const StringVal& src, StringVal* dst); static BigIntVal SampledNdvFinalize(FunctionContext*, const StringVal& src); + /// The AGGIF(predicate, expr) function returns 'expr' if 'predicate' is true. + /// It is expected that 'predicate' only returns true for a single row per group. + /// The predicate must not evaluate to NULL. + template <typename T> + static void AggIfUpdate(FunctionContext*, const BooleanVal& cond, const T& src, T* dst); + template <typename T> + static void AggIfMerge(FunctionContext*, const T& src, T* dst); + template <typename T> + static T AggIfFinalize(FunctionContext*, const T& src); + /// Knuth's variance algorithm, more numerically stable than canonical stddev /// algorithms; reference implementation: /// http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online_algorithm http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exprs/scalar-expr.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/scalar-expr.cc b/be/src/exprs/scalar-expr.cc index 42c2867..00e6709 100644 --- a/be/src/exprs/scalar-expr.cc +++ b/be/src/exprs/scalar-expr.cc @@ -44,6 +44,7 @@ #include "exprs/tuple-is-null-predicate.h" #include "exprs/udf-builtins.h" #include "exprs/utility-functions.h" +#include "exprs/valid-tuple-id.h" #include "runtime/runtime-state.h" #include "runtime/tuple-row.h" #include "runtime/tuple.h" @@ -186,6 +187,9 @@ Status ScalarExpr::CreateNode( case TExprNodeType::KUDU_PARTITION_EXPR: *expr = pool->Add(new KuduPartitionExpr(texpr_node)); return Status::OK(); + case TExprNodeType::VALID_TUPLE_ID_EXPR: + *expr = pool->Add(new ValidTupleIdExpr(texpr_node)); + return Status::OK(); default: *expr = nullptr; stringstream os; http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exprs/valid-tuple-id.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/valid-tuple-id.cc b/be/src/exprs/valid-tuple-id.cc new file mode 100644 index 0000000..84cdf2f --- /dev/null +++ b/be/src/exprs/valid-tuple-id.cc @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/valid-tuple-id.h" + +#include <sstream> + +#include "gen-cpp/Exprs_types.h" + +#include "common/names.h" +#include "runtime/descriptors.h" +#include "runtime/tuple-row.h" + +namespace impala { + +ValidTupleIdExpr::ValidTupleIdExpr(const TExprNode& node) : ScalarExpr(node) {} + +Status ValidTupleIdExpr::Init(const RowDescriptor& row_desc, RuntimeState* state) { + RETURN_IF_ERROR(ScalarExpr::Init(row_desc, state)); + DCHECK_EQ(0, children_.size()); + tuple_ids_.reserve(row_desc.tuple_descriptors().size()); + for (TupleDescriptor* tuple_desc : row_desc.tuple_descriptors()) { + tuple_ids_.push_back(tuple_desc->id()); + } + return Status::OK(); +} + +int ValidTupleIdExpr::ComputeNonNullCount(const TupleRow* row) const { + int num_tuples = tuple_ids_.size(); + int non_null_count = 0; + for (int i = 0; i < num_tuples; ++i) non_null_count += (row->GetTuple(i) != nullptr); + return non_null_count; +} + +IntVal ValidTupleIdExpr::GetIntVal(ScalarExprEvaluator* eval, const TupleRow* row) const { + // Validate that exactly one tuple is non-NULL. + DCHECK_EQ(1, ComputeNonNullCount(row)); + int num_tuples = tuple_ids_.size(); + for (int i = 0; i < num_tuples; ++i) { + if (row->GetTuple(i) != nullptr) return IntVal(tuple_ids_[i]); + } + return IntVal::null(); +} + +Status ValidTupleIdExpr::GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** fn) { + return GetCodegendComputeFnWrapper(codegen, fn); +} + +string ValidTupleIdExpr::DebugString() const { + return "ValidTupleId()"; +} + +} // namespace impala http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/exprs/valid-tuple-id.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/valid-tuple-id.h b/be/src/exprs/valid-tuple-id.h new file mode 100644 index 0000000..7eb03d9 --- /dev/null +++ b/be/src/exprs/valid-tuple-id.h @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_EXPRS_VALID_TUPLE_ID_H_ +#define IMPALA_EXPRS_VALID_TUPLE_ID_H_ + +#include "exprs/scalar-expr.h" + +namespace impala { + +class TExprNode; + +/// Returns the tuple id of the single non-NULL tuple in the input row. +/// Valid input rows must have exactly one non-NULL tuple. +class ValidTupleIdExpr : public ScalarExpr { + protected: + friend class ScalarExpr; + + ValidTupleIdExpr(const TExprNode& node); + + virtual Status Init(const RowDescriptor& row_desc, RuntimeState* state) override; + virtual Status GetCodegendComputeFn( + LlvmCodeGen* codegen, llvm::Function** fn) override WARN_UNUSED_RESULT; + virtual std::string DebugString() const override; + + virtual IntVal GetIntVal(ScalarExprEvaluator*, const TupleRow*) const override; + + private: + /// Maps from tuple index in the row to its corresponding tuple id. + std::vector<TupleId> tuple_ids_; + + /// Returns the number of tuples in 'row' that are non-null. Used for debugging. + int ComputeNonNullCount(const TupleRow* row) const; +}; + +} // namespace impala + +#endif http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/be/src/runtime/row-batch.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h index 90d8c4d..1334858 100644 --- a/be/src/runtime/row-batch.h +++ b/be/src/runtime/row-batch.h @@ -242,6 +242,10 @@ class RowBatch { return Get(); } + /// Returns the index in the RowBatch of the current row. This does an integer + /// division and so should not be used in hot inner loops. + int RowNum() { return (row_ - parent_->tuple_ptrs_) / num_tuples_per_row_; } + /// Returns true if the iterator is beyond the last row for read iterators. /// Useful for read iterators to determine the limit. Write iterators should use /// RowBatch::AtCapacity() instead. @@ -334,6 +338,10 @@ class RowBatch { num_rows * num_tuples_per_row_ * sizeof(Tuple*)); } + void ClearTuplePointers() { + memset(tuple_ptrs_, 0, capacity_ * num_tuples_per_row_ * sizeof(Tuple*)); + } + void ClearRow(TupleRow* row) { memset(row, 0, num_tuples_per_row_ * sizeof(Tuple*)); } http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/common/thrift/Exprs.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/Exprs.thrift b/common/thrift/Exprs.thrift index ee9fe94..584e525 100644 --- a/common/thrift/Exprs.thrift +++ b/common/thrift/Exprs.thrift @@ -38,7 +38,8 @@ enum TExprNodeType { FUNCTION_CALL, AGGREGATE_EXPR, IS_NOT_EMPTY_PRED, - KUDU_PARTITION_EXPR + KUDU_PARTITION_EXPR, + VALID_TUPLE_ID_EXPR } struct TBoolLiteral { http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/common/thrift/PlanNodes.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index e116c3b..5d245fb 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -47,7 +47,8 @@ enum TPlanNodeType { UNNEST_NODE, SUBPLAN_NODE, KUDU_SCAN_NODE, - CARDINALITY_CHECK_NODE + CARDINALITY_CHECK_NODE, + MULTI_AGGREGATION_NODE } // phases of an execution node @@ -352,7 +353,26 @@ struct TNestedLoopJoinNode { 2: optional list<Exprs.TExpr> join_conjuncts } -struct TAggregationNode { +// This contains all of the information computed by the plan as part of the resource +// profile that is needed by the backend to execute. +struct TBackendResourceProfile { + // The minimum reservation for this plan node in bytes. + 1: required i64 min_reservation + + // The maximum reservation for this plan node in bytes. MAX_INT64 means effectively + // unlimited. + 2: required i64 max_reservation + + // The spillable buffer size in bytes to use for this node, chosen by the planner. + // Set iff the node uses spillable buffers. + 3: optional i64 spillable_buffer_size + + // The buffer size in bytes that is large enough to fit the largest row to be processed. + // Set if the node allocates buffers for rows from the buffer pool. + 4: optional i64 max_row_buffer_size +} + +struct TAggregator { 1: optional list<Exprs.TExpr> grouping_exprs // aggregate exprs. The root of each expr is the aggregate function. The // other exprs are the inputs to the aggregate function. @@ -366,14 +386,25 @@ struct TAggregationNode { // aggregate functions. 4: required Types.TTupleId output_tuple_id - // Set to true if this aggregation node needs to run the finalization step. + // Set to true if this aggregator needs to run the finalization step. 5: required bool need_finalize // Set to true to use the streaming preagg algorithm. Node must be a preaggregation. 6: required bool use_streaming_preaggregation - // Estimate of number of input rows from the planner. - 7: required i64 estimated_input_cardinality + 7: required TBackendResourceProfile resource_profile +} + +struct TAggregationNode { + // Aggregators for this node, each with a unique set of grouping exprs. + 1: required list<TAggregator> aggregators + + // Used in streaming aggregations to determine how much memory to use. + 2: required i64 estimated_input_cardinality + + // If true, this is the first AggregationNode in a aggregation plan with multiple + // Aggregators and the entire input to this node should be passed to each Aggregator. + 3: required bool replicate_input } struct TSortInfo { @@ -522,25 +553,6 @@ struct TUnnestNode { 1: required Exprs.TExpr collection_expr } -// This contains all of the information computed by the plan as part of the resource -// profile that is needed by the backend to execute. -struct TBackendResourceProfile { - // The minimum reservation for this plan node in bytes. - 1: required i64 min_reservation - - // The maximum reservation for this plan node in bytes. MAX_INT64 means effectively - // unlimited. - 2: required i64 max_reservation - - // The spillable buffer size in bytes to use for this node, chosen by the planner. - // Set iff the node uses spillable buffers. - 3: optional i64 spillable_buffer_size - - // The buffer size in bytes that is large enough to fit the largest row to be processed. - // Set if the node allocates buffers for rows from the buffer pool. - 4: optional i64 max_row_buffer_size -} - struct TCardinalityCheckNode { // Associated statement of child 1: required string display_statement