http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/non-grouping-aggregator.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/non-grouping-aggregator.cc b/be/src/exec/non-grouping-aggregator.cc new file mode 100644 index 0000000..585c264 --- /dev/null +++ b/be/src/exec/non-grouping-aggregator.cc @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/non-grouping-aggregator.h" + +#include <sstream> + +#include "codegen/llvm-codegen.h" +#include "exec/exec-node.h" +#include "exprs/agg-fn-evaluator.h" +#include "gutil/strings/substitute.h" +#include "runtime/descriptors.h" +#include "runtime/mem-pool.h" +#include "runtime/row-batch.h" +#include "runtime/runtime-state.h" +#include "runtime/tuple-row.h" +#include "runtime/tuple.h" +#include "util/runtime-profile-counters.h" + +#include "common/names.h" + +namespace impala { + +NonGroupingAggregator::NonGroupingAggregator(ExecNode* exec_node, ObjectPool* pool, + const TPlanNode& tnode, const DescriptorTbl& descs) + : Aggregator(exec_node, pool, tnode, descs, "NonGroupingAggregator"), + add_batch_impl_fn_(nullptr), + singleton_output_tuple_(nullptr), + singleton_output_tuple_returned_(true) {} + +Status NonGroupingAggregator::Prepare(RuntimeState* state) { + RETURN_IF_ERROR(Aggregator::Prepare(state)); + singleton_tuple_pool_.reset(new MemPool(mem_tracker_.get())); + return Status::OK(); +} + +void NonGroupingAggregator::Codegen(RuntimeState* state) { + LlvmCodeGen* codegen = state->codegen(); + DCHECK(codegen != nullptr); + TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode; + Status codegen_status = CodegenAddBatchImpl(codegen, prefetch_mode); + runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status); +} + +Status NonGroupingAggregator::Open(RuntimeState* state) { + RETURN_IF_ERROR(Aggregator::Open(state)); + + // Create the single output tuple for this non-grouping agg. This must happen after + // opening the aggregate evaluators. + singleton_output_tuple_ = + ConstructSingletonOutputTuple(agg_fn_evals_, singleton_tuple_pool_.get()); + // Check for failures during AggFnEvaluator::Init(). + RETURN_IF_ERROR(state->GetQueryStatus()); + singleton_output_tuple_returned_ = false; + + return Status::OK(); +} + +Status NonGroupingAggregator::GetNext( + RuntimeState* state, RowBatch* row_batch, bool* eos) { + // There was no grouping, so evaluate the conjuncts and return the single result row. + // We allow calling GetNext() after eos, so don't return this row again. + if (!singleton_output_tuple_returned_) GetSingletonOutput(row_batch); + singleton_output_tuple_returned_ = true; + *eos = true; + return Status::OK(); +} + +void NonGroupingAggregator::GetSingletonOutput(RowBatch* row_batch) { + int row_idx = row_batch->AddRow(); + TupleRow* row = row_batch->GetRow(row_idx); + // The output row batch may reference memory allocated by Serialize() or Finalize(), + // allocating that memory directly from the row batch's pool means we can safely return + // the batch. + vector<ScopedResultsPool> allocate_from_batch_pool = + ScopedResultsPool::Create(agg_fn_evals_, row_batch->tuple_data_pool()); + Tuple* output_tuple = GetOutputTuple( + agg_fn_evals_, singleton_output_tuple_, row_batch->tuple_data_pool()); + row->SetTuple(0, output_tuple); + if (ExecNode::EvalConjuncts(conjunct_evals_.data(), conjunct_evals_.size(), row)) { + row_batch->CommitLastRow(); + ++num_rows_returned_; + COUNTER_SET(rows_returned_counter_, num_rows_returned_); + } + // Keep the current chunk to amortize the memory allocation over a series + // of Reset()/Open()/GetNext()* calls. + row_batch->tuple_data_pool()->AcquireData(singleton_tuple_pool_.get(), true); + // This node no longer owns the memory for singleton_output_tuple_. + singleton_output_tuple_ = nullptr; +} + +void NonGroupingAggregator::Close(RuntimeState* state) { + if (!singleton_output_tuple_returned_) { + GetOutputTuple(agg_fn_evals_, singleton_output_tuple_, singleton_tuple_pool_.get()); + } + + if (singleton_tuple_pool_.get() != nullptr) singleton_tuple_pool_->FreeAll(); + // Must be called after singleton_tuple_pool_ is freed, so that mem_tracker_ can be + // closed. + Aggregator::Close(state); +} + +Status NonGroupingAggregator::AddBatch(RuntimeState* state, RowBatch* batch) { + SCOPED_TIMER(build_timer_); + + if (add_batch_impl_fn_ != nullptr) { + RETURN_IF_ERROR(add_batch_impl_fn_(this, batch)); + } else { + RETURN_IF_ERROR(AddBatchImpl(batch)); + } + + return Status::OK(); +} + +Tuple* NonGroupingAggregator::ConstructSingletonOutputTuple( + const vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool) { + Tuple* output_tuple = Tuple::Create(intermediate_tuple_desc_->byte_size(), pool); + InitAggSlots(agg_fn_evals, output_tuple); + return output_tuple; +} + +string NonGroupingAggregator::DebugString(int indentation_level) const { + stringstream ss; + DebugString(indentation_level, &ss); + return ss.str(); +} + +void NonGroupingAggregator::DebugString(int indentation_level, stringstream* out) const { + *out << string(indentation_level * 2, ' '); + *out << "NonGroupingAggregator(" + << "intermediate_tuple_id=" << intermediate_tuple_id_ + << " output_tuple_id=" << output_tuple_id_ << " needs_finalize=" << needs_finalize_ + << " agg_exprs=" << AggFn::DebugString(agg_fns_); + *out << ")"; +} + +Status NonGroupingAggregator::CodegenAddBatchImpl( + LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) { + llvm::Function* update_tuple_fn; + RETURN_IF_ERROR(CodegenUpdateTuple(codegen, &update_tuple_fn)); + + // Get the cross compiled update row batch function + IRFunction::Type ir_fn = IRFunction::NON_GROUPING_AGG_ADD_BATCH_IMPL; + llvm::Function* add_batch_impl_fn = codegen->GetFunction(ir_fn, true); + DCHECK(add_batch_impl_fn != nullptr); + + int replaced; + replaced = codegen->ReplaceCallSites(add_batch_impl_fn, update_tuple_fn, "UpdateTuple"); + DCHECK_GE(replaced, 1); + add_batch_impl_fn = codegen->FinalizeFunction(add_batch_impl_fn); + if (add_batch_impl_fn == nullptr) { + return Status("NonGroupingAggregator::CodegenAddBatchImpl(): codegen'd " + "AddBatchImpl() function failed verification, see log"); + } + + void** codegened_fn_ptr = reinterpret_cast<void**>(&add_batch_impl_fn_); + codegen->AddFunctionToJit(add_batch_impl_fn, codegened_fn_ptr); + return Status::OK(); +} +} // namespace impala
http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/non-grouping-aggregator.h ---------------------------------------------------------------------- diff --git a/be/src/exec/non-grouping-aggregator.h b/be/src/exec/non-grouping-aggregator.h new file mode 100644 index 0000000..41b3e0d --- /dev/null +++ b/be/src/exec/non-grouping-aggregator.h @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_EXEC_NON_GROUPING_AGGREGATOR_H +#define IMPALA_EXEC_NON_GROUPING_AGGREGATOR_H + +#include <memory> +#include <vector> + +#include "exec/aggregator.h" +#include "runtime/mem-pool.h" + +namespace impala { + +class AggFnEvaluator; +class DescriptorTbl; +class ExecNode; +class LlvmCodeGen; +class ObjectPool; +class RowBatch; +class RuntimeState; +class TPlanNode; +class Tuple; + +/// Aggregator for doing non-grouping aggregations. Input is passed to the aggregator +/// through AddBatch(), which generates the single output row. This Aggregator does +/// not support streaming preaggregation. +class NonGroupingAggregator : public Aggregator { + public: + NonGroupingAggregator(ExecNode* exec_node, ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs); + + virtual Status Prepare(RuntimeState* state) override; + virtual void Codegen(RuntimeState* state) override; + virtual Status Open(RuntimeState* state) override; + virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override; + virtual Status Reset(RuntimeState* state) override { return Status::OK(); } + virtual void Close(RuntimeState* state) override; + + virtual Status AddBatch(RuntimeState* state, RowBatch* batch) override; + virtual Status InputDone() override { return Status::OK(); } + + virtual int num_grouping_exprs() override { return 0; } + + /// NonGroupingAggregator doesn't create a buffer pool client so it doesn't need the + /// debug options. + virtual void SetDebugOptions(const TDebugOptions& debug_options) override {} + + virtual std::string DebugString(int indentation_level = 0) const override; + virtual void DebugString(int indentation_level, std::stringstream* out) const override; + + private: + /// MemPool used to allocate memory for 'singleton_output_tuple_'. The ownership of the + /// pool's memory is transferred to the output batch on eos. The pool should not be + /// Reset() to allow amortizing memory allocation over a series of + /// Reset()/Open()/GetNext()* calls. + std::unique_ptr<MemPool> singleton_tuple_pool_; + + typedef Status (*AddBatchImplFn)(NonGroupingAggregator*, RowBatch*); + /// Jitted AddBatchImpl function pointer. Null if codegen is disabled. + AddBatchImplFn add_batch_impl_fn_; + + ///////////////////////////////////////// + /// BEGIN: Members that must be Reset() + + /// Result of aggregation w/o GROUP BY. + /// Note: can be NULL even if there is no grouping if the result tuple is 0 width + /// e.g. select 1 from table group by col. + Tuple* singleton_output_tuple_; + bool singleton_output_tuple_returned_; + + /// END: Members that must be Reset() + ///////////////////////////////////////// + + /// Constructs singleton output tuple, allocating memory from pool. + Tuple* ConstructSingletonOutputTuple( + const std::vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool); + + /// Do the aggregation for all tuple rows in the batch when there is no grouping. + /// This function is replaced by codegen. + Status AddBatchImpl(RowBatch* batch) WARN_UNUSED_RESULT; + + /// Output 'singleton_output_tuple_' and transfer memory to 'row_batch'. + void GetSingletonOutput(RowBatch* row_batch); + + /// Codegen the non-streaming add row batch loop. The loop has already been compiled to + /// IR and loaded into the codegen object. UpdateAggTuple has also been codegen'd to IR. + /// This function will modify the loop subsituting the statically compiled functions + /// with codegen'd ones. 'add_batch_impl_fn_' will be updated with the codegened + /// function. + /// Assumes AGGREGATED_ROWS = false. + Status CodegenAddBatchImpl( + LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT; +}; +} // namespace impala + +#endif // IMPALA_EXEC_NON_GROUPING_AGGREGATOR_H http://git-wip-us.apache.org/repos/asf/impala/blob/010321d4/be/src/exec/partitioned-aggregation-node-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc b/be/src/exec/partitioned-aggregation-node-ir.cc deleted file mode 100644 index 69d297c..0000000 --- a/be/src/exec/partitioned-aggregation-node-ir.cc +++ /dev/null @@ -1,253 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/partitioned-aggregation-node.h" - -#include "exec/hash-table.inline.h" -#include "exprs/agg-fn-evaluator.h" -#include "exprs/scalar-expr.h" -#include "exprs/scalar-expr-evaluator.h" -#include "runtime/buffered-tuple-stream.inline.h" -#include "runtime/row-batch.h" -#include "runtime/tuple-row.h" - -using namespace impala; - -Status PartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch) { - Tuple* output_tuple = singleton_output_tuple_; - FOREACH_ROW(batch, 0, batch_iter) { - UpdateTuple(agg_fn_evals_.data(), output_tuple, batch_iter.Get()); - } - return Status::OK(); -} - -template<bool AGGREGATED_ROWS> -Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch, - TPrefetchMode::type prefetch_mode, HashTableCtx* __restrict__ ht_ctx) { - DCHECK(!hash_partitions_.empty()); - DCHECK(!is_streaming_preagg_); - - // Make sure that no resizes will happen when inserting individual rows to the hash - // table of each partition by pessimistically assuming that all the rows in each batch - // will end up to the same partition. - // TODO: Once we have a histogram with the number of rows per partition, we will have - // accurate resize calls. - RETURN_IF_ERROR( - CheckAndResizeHashPartitions(AGGREGATED_ROWS, batch->num_rows(), ht_ctx)); - - HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); - const int cache_size = expr_vals_cache->capacity(); - const int num_rows = batch->num_rows(); - for (int group_start = 0; group_start < num_rows; group_start += cache_size) { - EvalAndHashPrefetchGroup<AGGREGATED_ROWS>(batch, group_start, prefetch_mode, ht_ctx); - - FOREACH_ROW_LIMIT(batch, group_start, cache_size, batch_iter) { - RETURN_IF_ERROR(ProcessRow<AGGREGATED_ROWS>(batch_iter.Get(), ht_ctx)); - expr_vals_cache->NextRow(); - } - DCHECK(expr_vals_cache->AtEnd()); - } - return Status::OK(); -} - -template<bool AGGREGATED_ROWS> -void IR_ALWAYS_INLINE PartitionedAggregationNode::EvalAndHashPrefetchGroup( - RowBatch* batch, int start_row_idx, TPrefetchMode::type prefetch_mode, - HashTableCtx* ht_ctx) { - HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); - const int cache_size = expr_vals_cache->capacity(); - - expr_vals_cache->Reset(); - FOREACH_ROW_LIMIT(batch, start_row_idx, cache_size, batch_iter) { - TupleRow* row = batch_iter.Get(); - bool is_null; - if (AGGREGATED_ROWS) { - is_null = !ht_ctx->EvalAndHashBuild(row); - } else { - is_null = !ht_ctx->EvalAndHashProbe(row); - } - // Hoist lookups out of non-null branch to speed up non-null case. - const uint32_t hash = expr_vals_cache->CurExprValuesHash(); - const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS); - HashTable* hash_tbl = GetHashTable(partition_idx); - if (is_null) { - expr_vals_cache->SetRowNull(); - } else if (prefetch_mode != TPrefetchMode::NONE) { - if (LIKELY(hash_tbl != NULL)) hash_tbl->PrefetchBucket<false>(hash); - } - expr_vals_cache->NextRow(); - } - - expr_vals_cache->ResetForRead(); -} - -template<bool AGGREGATED_ROWS> -Status PartitionedAggregationNode::ProcessRow(TupleRow* __restrict__ row, - HashTableCtx* __restrict__ ht_ctx) { - HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); - // Hoist lookups out of non-null branch to speed up non-null case. - const uint32_t hash = expr_vals_cache->CurExprValuesHash(); - const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS); - if (expr_vals_cache->IsRowNull()) return Status::OK(); - // To process this row, we first see if it can be aggregated or inserted into this - // partition's hash table. If we need to insert it and that fails, due to OOM, we - // spill the partition. The partition to spill is not necessarily dst_partition, - // so we can try again to insert the row. - HashTable* hash_tbl = GetHashTable(partition_idx); - Partition* dst_partition = hash_partitions_[partition_idx]; - DCHECK(dst_partition != nullptr); - DCHECK_EQ(dst_partition->is_spilled(), hash_tbl == NULL); - if (hash_tbl == NULL) { - // This partition is already spilled, just append the row. - return AppendSpilledRow<AGGREGATED_ROWS>(dst_partition, row); - } - - DCHECK(dst_partition->aggregated_row_stream->is_pinned()); - bool found; - // Find the appropriate bucket in the hash table. There will always be a free - // bucket because we checked the size above. - HashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found); - DCHECK(!it.AtEnd()) << "Hash table had no free buckets"; - if (AGGREGATED_ROWS) { - // If the row is already an aggregate row, it cannot match anything in the - // hash table since we process the aggregate rows first. These rows should - // have been aggregated in the initial pass. - DCHECK(!found); - } else if (found) { - // Row is already in hash table. Do the aggregation and we're done. - UpdateTuple(dst_partition->agg_fn_evals.data(), it.GetTuple(), row); - return Status::OK(); - } - - // If we are seeing this result row for the first time, we need to construct the - // result row and initialize it. - return AddIntermediateTuple<AGGREGATED_ROWS>(dst_partition, row, hash, it); -} - -template<bool AGGREGATED_ROWS> -Status PartitionedAggregationNode::AddIntermediateTuple(Partition* __restrict__ partition, - TupleRow* __restrict__ row, uint32_t hash, HashTable::Iterator insert_it) { - while (true) { - DCHECK(partition->aggregated_row_stream->is_pinned()); - Tuple* intermediate_tuple = ConstructIntermediateTuple(partition->agg_fn_evals, - partition->aggregated_row_stream.get(), &process_batch_status_); - - if (LIKELY(intermediate_tuple != NULL)) { - UpdateTuple(partition->agg_fn_evals.data(), intermediate_tuple, - row, AGGREGATED_ROWS); - // After copying and initializing the tuple, insert it into the hash table. - insert_it.SetTuple(intermediate_tuple, hash); - return Status::OK(); - } else if (!process_batch_status_.ok()) { - return std::move(process_batch_status_); - } - - // We did not have enough memory to add intermediate_tuple to the stream. - RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS)); - if (partition->is_spilled()) { - return AppendSpilledRow<AGGREGATED_ROWS>(partition, row); - } - } -} - -Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize, - TPrefetchMode::type prefetch_mode, RowBatch* in_batch, RowBatch* out_batch, - HashTableCtx* __restrict__ ht_ctx, int remaining_capacity[PARTITION_FANOUT]) { - DCHECK(is_streaming_preagg_); - DCHECK_EQ(out_batch->num_rows(), 0); - DCHECK_LE(in_batch->num_rows(), out_batch->capacity()); - - RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows()); - HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); - const int num_rows = in_batch->num_rows(); - const int cache_size = expr_vals_cache->capacity(); - for (int group_start = 0; group_start < num_rows; group_start += cache_size) { - EvalAndHashPrefetchGroup<false>(in_batch, group_start, prefetch_mode, ht_ctx); - - FOREACH_ROW_LIMIT(in_batch, group_start, cache_size, in_batch_iter) { - // Hoist lookups out of non-null branch to speed up non-null case. - TupleRow* in_row = in_batch_iter.Get(); - const uint32_t hash = expr_vals_cache->CurExprValuesHash(); - const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS); - if (!expr_vals_cache->IsRowNull() && - !TryAddToHashTable(ht_ctx, hash_partitions_[partition_idx], - GetHashTable(partition_idx), in_row, hash, &remaining_capacity[partition_idx], - &process_batch_status_)) { - RETURN_IF_ERROR(std::move(process_batch_status_)); - // Tuple is not going into hash table, add it to the output batch. - Tuple* intermediate_tuple = ConstructIntermediateTuple(agg_fn_evals_, - out_batch->tuple_data_pool(), &process_batch_status_); - if (UNLIKELY(intermediate_tuple == NULL)) { - DCHECK(!process_batch_status_.ok()); - return std::move(process_batch_status_); - } - UpdateTuple(agg_fn_evals_.data(), intermediate_tuple, in_row); - out_batch_iterator.Get()->SetTuple(0, intermediate_tuple); - out_batch_iterator.Next(); - out_batch->CommitLastRow(); - } - DCHECK(process_batch_status_.ok()); - expr_vals_cache->NextRow(); - } - DCHECK(expr_vals_cache->AtEnd()); - } - if (needs_serialize) { - FOREACH_ROW(out_batch, 0, out_batch_iter) { - AggFnEvaluator::Serialize(agg_fn_evals_, out_batch_iter.Get()->GetTuple(0)); - } - } - - return Status::OK(); -} - -bool PartitionedAggregationNode::TryAddToHashTable( - HashTableCtx* __restrict__ ht_ctx, Partition* __restrict__ partition, - HashTable* __restrict__ hash_tbl, TupleRow* __restrict__ in_row, - uint32_t hash, int* __restrict__ remaining_capacity, Status* status) { - DCHECK(remaining_capacity != NULL); - DCHECK_EQ(hash_tbl, partition->hash_tbl.get()); - DCHECK_GE(*remaining_capacity, 0); - bool found; - // This is called from ProcessBatchStreaming() so the rows are not aggregated. - HashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found); - Tuple* intermediate_tuple; - if (found) { - intermediate_tuple = it.GetTuple(); - } else if (*remaining_capacity == 0) { - return false; - } else { - intermediate_tuple = ConstructIntermediateTuple(partition->agg_fn_evals, - partition->aggregated_row_stream.get(), status); - if (LIKELY(intermediate_tuple != NULL)) { - it.SetTuple(intermediate_tuple, hash); - --(*remaining_capacity); - } else { - // Avoid repeatedly trying to add tuples when under memory pressure. - *remaining_capacity = 0; - return false; - } - } - - UpdateTuple(partition->agg_fn_evals.data(), intermediate_tuple, in_row); - return true; -} - -// Instantiate required templates. -template Status PartitionedAggregationNode::ProcessBatch<false>(RowBatch*, - TPrefetchMode::type, HashTableCtx*); -template Status PartitionedAggregationNode::ProcessBatch<true>(RowBatch*, - TPrefetchMode::type, HashTableCtx*);
