http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/partitioned-aggregation-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc index 83232d2..20ee5e9 100644 --- a/be/src/exec/partitioned-aggregation-node.cc +++ b/be/src/exec/partitioned-aggregation-node.cc @@ -27,8 +27,8 @@ #include "exec/hash-table.inline.h" #include "exprs/agg-fn-evaluator.h" #include "exprs/anyval-util.h" -#include "exprs/expr-context.h" -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "exprs/slot-ref.h" #include "gutil/strings/substitute.h" #include "runtime/buffered-tuple-stream.inline.h" @@ -104,9 +104,10 @@ PartitionedAggregationNode::PartitionedAggregationNode( ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id), - intermediate_tuple_desc_(NULL), + intermediate_tuple_desc_(descs.GetTupleDescriptor(intermediate_tuple_id_)), + intermediate_row_desc_(pool->Add(new RowDescriptor(intermediate_tuple_desc_, false))), output_tuple_id_(tnode.agg_node.output_tuple_id), - output_tuple_desc_(NULL), + output_tuple_desc_(descs.GetTupleDescriptor(output_tuple_id_)), needs_finalize_(tnode.agg_node.need_finalize), is_streaming_preagg_(tnode.agg_node.use_streaming_preaggregation), needs_serialize_(false), @@ -137,7 +138,7 @@ PartitionedAggregationNode::PartitionedAggregationNode( partition_pool_(new ObjectPool()) { DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS); if (is_streaming_preagg_) { - DCHECK(conjunct_ctxs_.empty()) << "Preaggs have no conjuncts"; + DCHECK(conjunct_evals_.empty()) << "Preaggs have no conjuncts"; DCHECK(!tnode.agg_node.grouping_exprs.empty()) << "Streaming preaggs do grouping"; DCHECK(limit_ == -1) << "Preaggs have no limits"; } @@ -145,24 +146,35 @@ PartitionedAggregationNode::PartitionedAggregationNode( Status PartitionedAggregationNode::Init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::Init(tnode, state)); - RETURN_IF_ERROR( - Expr::CreateExprTrees(pool_, tnode.agg_node.grouping_exprs, &grouping_expr_ctxs_)); - for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) { - AggFnEvaluator* evaluator; - RETURN_IF_ERROR( - AggFnEvaluator::Create(pool_, tnode.agg_node.aggregate_functions[i], &evaluator)); - aggregate_evaluators_.push_back(evaluator); - ExprContext* const* agg_expr_ctxs; - if (evaluator->input_expr_ctxs().size() > 0) { - agg_expr_ctxs = evaluator->input_expr_ctxs().data(); - } else { - // Some aggregate functions have no input expressions and therefore no ExprContext - // (e.g. count(*)). In those cases, 'agg_expr_ctxs_' will contain NULL for that - // entry. - DCHECK(evaluator->agg_op() == AggFnEvaluator::OTHER || evaluator->is_count_star()); - agg_expr_ctxs = NULL; - } - agg_expr_ctxs_.push_back(agg_expr_ctxs); + + DCHECK(intermediate_tuple_desc_ != nullptr); + DCHECK(output_tuple_desc_ != nullptr); + DCHECK_EQ(intermediate_tuple_desc_->slots().size(), output_tuple_desc_->slots().size()); + const RowDescriptor& row_desc = child(0)->row_desc(); + RETURN_IF_ERROR(ScalarExpr::Create(tnode.agg_node.grouping_exprs, row_desc, + state, &grouping_exprs_)); + + // Construct build exprs from intermediate_row_desc_ + for (int i = 0; i < grouping_exprs_.size(); ++i) { + SlotDescriptor* desc = intermediate_tuple_desc_->slots()[i]; + DCHECK(desc->type().type == TYPE_NULL || desc->type() == grouping_exprs_[i]->type()); + // Hack to avoid TYPE_NULL SlotRefs. + SlotRef* build_expr = pool_->Add(desc->type().type != TYPE_NULL ? + new SlotRef(desc) : new SlotRef(desc, TYPE_BOOLEAN)); + build_exprs_.push_back(build_expr); + RETURN_IF_ERROR(build_expr->Init(*intermediate_row_desc_, state)); + if (build_expr->type().IsVarLenStringType()) string_grouping_exprs_.push_back(i); + } + + int j = grouping_exprs_.size(); + for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i, ++j) { + SlotDescriptor* intermediate_slot_desc = intermediate_tuple_desc_->slots()[j]; + SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j]; + AggFn* agg_fn; + RETURN_IF_ERROR(AggFn::Create(tnode.agg_node.aggregate_functions[i], row_desc, + *intermediate_slot_desc, *output_slot_desc, state, &agg_fn)); + agg_fns_.push_back(agg_fn); + needs_serialize_ |= agg_fn->SupportsSerialize(); } return Status::OK(); } @@ -205,60 +217,16 @@ Status PartitionedAggregationNode::Prepare(RuntimeState* state) { "MaxPartitionLevel", TUnit::UNIT); } - intermediate_tuple_desc_ = - state->desc_tbl().GetTupleDescriptor(intermediate_tuple_id_); - output_tuple_desc_ = state->desc_tbl().GetTupleDescriptor(output_tuple_id_); - DCHECK_EQ(intermediate_tuple_desc_->slots().size(), - output_tuple_desc_->slots().size()); - - RETURN_IF_ERROR(Expr::Prepare(grouping_expr_ctxs_, state, child(0)->row_desc(), - expr_mem_tracker())); - AddExprCtxsToFree(grouping_expr_ctxs_); - - // Construct build exprs from intermediate_agg_tuple_desc_ - for (int i = 0; i < grouping_expr_ctxs_.size(); ++i) { - SlotDescriptor* desc = intermediate_tuple_desc_->slots()[i]; - DCHECK(desc->type().type == TYPE_NULL || - desc->type() == grouping_expr_ctxs_[i]->root()->type()); - // Hack to avoid TYPE_NULL SlotRefs. - Expr* expr = desc->type().type != TYPE_NULL ? - new SlotRef(desc) : new SlotRef(desc, TYPE_BOOLEAN); - state->obj_pool()->Add(expr); - build_expr_ctxs_.push_back(new ExprContext(expr)); - state->obj_pool()->Add(build_expr_ctxs_.back()); - if (expr->type().IsVarLenStringType()) { - string_grouping_exprs_.push_back(i); - } - } - // Construct a new row desc for preparing the build exprs because neither the child's - // nor this node's output row desc may contain the intermediate tuple, e.g., - // in a single-node plan with an intermediate tuple different from the output tuple. - intermediate_row_desc_.reset(new RowDescriptor(intermediate_tuple_desc_, false)); - RETURN_IF_ERROR( - Expr::Prepare(build_expr_ctxs_, state, *intermediate_row_desc_, - expr_mem_tracker())); - AddExprCtxsToFree(build_expr_ctxs_); + RETURN_IF_ERROR(AggFnEvaluator::Create(agg_fns_, state, pool_, agg_fn_pool_.get(), + &agg_fn_evals_)); - int j = grouping_expr_ctxs_.size(); - for (int i = 0; i < aggregate_evaluators_.size(); ++i, ++j) { - SlotDescriptor* intermediate_slot_desc = intermediate_tuple_desc_->slots()[j]; - SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j]; - FunctionContext* agg_fn_ctx = NULL; - RETURN_IF_ERROR(aggregate_evaluators_[i]->Prepare(state, child(0)->row_desc(), - intermediate_slot_desc, output_slot_desc, agg_fn_pool_.get(), &agg_fn_ctx)); - agg_fn_ctxs_.push_back(agg_fn_ctx); - state->obj_pool()->Add(agg_fn_ctx); - needs_serialize_ |= aggregate_evaluators_[i]->SupportsSerialize(); - } - - if (!grouping_expr_ctxs_.empty()) { - RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, grouping_expr_ctxs_, - true, vector<bool>(build_expr_ctxs_.size(), true), state->fragment_hash_seed(), - MAX_PARTITION_DEPTH, 1, mem_tracker(), &ht_ctx_)); + if (!grouping_exprs_.empty()) { + RETURN_IF_ERROR(HashTableCtx::Create(pool_, state, build_exprs_, + grouping_exprs_, true, vector<bool>(build_exprs_.size(), true), + state->fragment_hash_seed(), MAX_PARTITION_DEPTH, 1, expr_mem_pool(), &ht_ctx_)); RETURN_IF_ERROR(state_->block_mgr()->RegisterClient( Substitute("PartitionedAggregationNode id=$0 ptr=$1", id_, this), MinRequiredBuffers(), true, mem_tracker(), state, &block_mgr_client_)); - RETURN_IF_ERROR(CreateHashPartitions(0)); } // TODO: Is there a need to create the stream here? If memory reservations work we may @@ -296,23 +264,18 @@ void PartitionedAggregationNode::Codegen(RuntimeState* state) { Status PartitionedAggregationNode::Open(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(ExecNode::Open(state)); - - RETURN_IF_ERROR(Expr::Open(grouping_expr_ctxs_, state)); - RETURN_IF_ERROR(Expr::Open(build_expr_ctxs_, state)); - - DCHECK_EQ(aggregate_evaluators_.size(), agg_fn_ctxs_.size()); - for (int i = 0; i < aggregate_evaluators_.size(); ++i) { - RETURN_IF_ERROR(aggregate_evaluators_[i]->Open(state, agg_fn_ctxs_[i])); - } - - if (grouping_expr_ctxs_.empty()) { + if (ht_ctx_.get() != nullptr) RETURN_IF_ERROR(ht_ctx_->Open(state)); + RETURN_IF_ERROR(AggFnEvaluator::Open(agg_fn_evals_, state)); + if (grouping_exprs_.empty()) { // Create the single output tuple for this non-grouping agg. This must happen after // opening the aggregate evaluators. singleton_output_tuple_ = - ConstructSingletonOutputTuple(agg_fn_ctxs_, mem_pool_.get()); + ConstructSingletonOutputTuple(agg_fn_evals_, mem_pool_.get()); // Check for failures during AggFnEvaluator::Init(). RETURN_IF_ERROR(state_->GetQueryStatus()); singleton_output_tuple_returned_ = false; + } else { + RETURN_IF_ERROR(CreateHashPartitions(0)); } RETURN_IF_ERROR(children_[0]->Open(state)); @@ -337,7 +300,7 @@ Status PartitionedAggregationNode::Open(RuntimeState* state) { TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode; SCOPED_TIMER(build_timer_); - if (grouping_expr_ctxs_.empty()) { + if (grouping_exprs_.empty()) { if (process_batch_no_grouping_fn_ != NULL) { RETURN_IF_ERROR(process_batch_no_grouping_fn_(this, &batch)); } else { @@ -364,7 +327,7 @@ Status PartitionedAggregationNode::Open(RuntimeState* state) { // Done consuming child(0)'s input. Move all the partitions in hash_partitions_ // to spilled_partitions_ or aggregated_partitions_. We'll finish the processing in // GetNext(). - if (!grouping_expr_ctxs_.empty()) { + if (!grouping_exprs_.empty()) { RETURN_IF_ERROR(MoveHashPartitions(child(0)->rows_returned())); } return Status::OK(); @@ -385,16 +348,16 @@ Status PartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch, // the agg function contexts, and will be freed on the next GetNext() call by // FreeLocalAllocations(). The data either needs to be copied out now or sent up the // plan and copied out by a blocking ancestor. (See IMPALA-3311) - for (int i = 0; i < aggregate_evaluators_.size(); ++i) { - const SlotDescriptor* slot_desc = aggregate_evaluators_[i]->output_slot_desc(); - DCHECK(!slot_desc->type().IsCollectionType()) << "producing collections NYI"; - if (!slot_desc->type().IsVarLenStringType()) continue; + for (const AggFn* agg_fn : agg_fns_) { + const SlotDescriptor& slot_desc = agg_fn->output_slot_desc(); + DCHECK(!slot_desc.type().IsCollectionType()) << "producing collections NYI"; + if (!slot_desc.type().IsVarLenStringType()) continue; if (IsInSubplan()) { // Copy string data to the row batch's pool. This is more efficient than // MarkNeedsDeepCopy() in a subplan since we are likely producing many small // batches. - RETURN_IF_ERROR(CopyStringData( - slot_desc, row_batch, first_row_idx, row_batch->tuple_data_pool())); + RETURN_IF_ERROR(CopyStringData(slot_desc, row_batch, + first_row_idx, row_batch->tuple_data_pool())); } else { row_batch->MarkNeedsDeepCopy(); break; @@ -403,14 +366,14 @@ Status PartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch, return Status::OK(); } -Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor* slot_desc, +Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor& slot_desc, RowBatch* row_batch, int first_row_idx, MemPool* pool) { - DCHECK(slot_desc->type().IsVarLenStringType()); + DCHECK(slot_desc.type().IsVarLenStringType()); DCHECK_EQ(row_batch->row_desc().tuple_descriptors().size(), 1); FOREACH_ROW(row_batch, first_row_idx, batch_iter) { Tuple* tuple = batch_iter.Get()->GetTuple(0); StringValue* sv = reinterpret_cast<StringValue*>( - tuple->GetSlot(slot_desc->tuple_offset())); + tuple->GetSlot(slot_desc.tuple_offset())); if (sv == NULL || sv->len == 0) continue; char* new_ptr = reinterpret_cast<char*>(pool->TryAllocate(sv->len)); if (UNLIKELY(new_ptr == NULL)) { @@ -436,7 +399,7 @@ Status PartitionedAggregationNode::GetNextInternal(RuntimeState* state, return Status::OK(); } - if (grouping_expr_ctxs_.empty()) { + if (grouping_exprs_.empty()) { // There was no grouping, so evaluate the conjuncts and return the single result row. // We allow calling GetNext() after eos, so don't return this row again. if (!singleton_output_tuple_returned_) GetSingletonOutput(row_batch); @@ -459,13 +422,14 @@ Status PartitionedAggregationNode::GetNextInternal(RuntimeState* state, } void PartitionedAggregationNode::GetSingletonOutput(RowBatch* row_batch) { - DCHECK(grouping_expr_ctxs_.empty()); + DCHECK(grouping_exprs_.empty()); int row_idx = row_batch->AddRow(); TupleRow* row = row_batch->GetRow(row_idx); - Tuple* output_tuple = GetOutputTuple( - agg_fn_ctxs_, singleton_output_tuple_, row_batch->tuple_data_pool()); + Tuple* output_tuple = GetOutputTuple(agg_fn_evals_, + singleton_output_tuple_, row_batch->tuple_data_pool()); row->SetTuple(0, output_tuple); - if (ExecNode::EvalConjuncts(&conjunct_ctxs_[0], conjunct_ctxs_.size(), row)) { + if (ExecNode::EvalConjuncts( + conjunct_evals_.data(), conjunct_evals_.size(), row)) { row_batch->CommitLastRow(); ++num_rows_returned_; COUNTER_SET(rows_returned_counter_, num_rows_returned_); @@ -511,11 +475,12 @@ Status PartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state, int row_idx = row_batch->AddRow(); TupleRow* row = row_batch->GetRow(row_idx); Tuple* intermediate_tuple = output_iterator_.GetTuple(); - Tuple* output_tuple = GetOutputTuple( - output_partition_->agg_fn_ctxs, intermediate_tuple, row_batch->tuple_data_pool()); + Tuple* output_tuple = GetOutputTuple(output_partition_->agg_fn_evals, + intermediate_tuple, row_batch->tuple_data_pool()); output_iterator_.Next(); row->SetTuple(0, output_tuple); - if (ExecNode::EvalConjuncts(&conjunct_ctxs_[0], conjunct_ctxs_.size(), row)) { + DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size()); + if (ExecNode::EvalConjuncts(conjunct_evals_.data(), conjuncts_.size(), row)) { row_batch->CommitLastRow(); ++num_rows_returned_; if (ReachedLimit() || row_batch->AtCapacity()) { @@ -648,7 +613,7 @@ bool PartitionedAggregationNode::ShouldExpandPreaggHashTables() const { } void PartitionedAggregationNode::CleanupHashTbl( - const vector<FunctionContext*>& agg_fn_ctxs, HashTable::Iterator it) { + const vector<AggFnEvaluator*>& agg_fn_evals, HashTable::Iterator it) { if (!needs_finalize_ && !needs_serialize_) return; // Iterate through the remaining rows in the hash table and call Serialize/Finalize on @@ -660,13 +625,13 @@ void PartitionedAggregationNode::CleanupHashTbl( dummy_dst = Tuple::Create(output_tuple_desc_->byte_size(), mem_pool_.get()); while (!it.AtEnd()) { Tuple* tuple = it.GetTuple(); - AggFnEvaluator::Finalize(aggregate_evaluators_, agg_fn_ctxs, tuple, dummy_dst); + AggFnEvaluator::Finalize(agg_fn_evals, tuple, dummy_dst); it.Next(); } } else { while (!it.AtEnd()) { Tuple* tuple = it.GetTuple(); - AggFnEvaluator::Serialize(aggregate_evaluators_, agg_fn_ctxs, tuple); + AggFnEvaluator::Serialize(agg_fn_evals, tuple); it.Next(); } } @@ -674,13 +639,12 @@ void PartitionedAggregationNode::CleanupHashTbl( Status PartitionedAggregationNode::Reset(RuntimeState* state) { DCHECK(!is_streaming_preagg_) << "Cannot reset preaggregation"; - if (!grouping_expr_ctxs_.empty()) { + if (!grouping_exprs_.empty()) { child_eos_ = false; partition_eos_ = false; // Reset the HT and the partitions for this grouping agg. ht_ctx_->set_level(0); ClosePartitions(); - RETURN_IF_ERROR(CreateHashPartitions(0)); } return ExecNode::Reset(state); } @@ -689,40 +653,34 @@ void PartitionedAggregationNode::Close(RuntimeState* state) { if (is_closed()) return; if (!singleton_output_tuple_returned_) { - DCHECK_EQ(agg_fn_ctxs_.size(), aggregate_evaluators_.size()); - GetOutputTuple(agg_fn_ctxs_, singleton_output_tuple_, mem_pool_.get()); + GetOutputTuple(agg_fn_evals_, singleton_output_tuple_, mem_pool_.get()); } // Iterate through the remaining rows in the hash table and call Serialize/Finalize on // them in order to free any memory allocated by UDAs if (output_partition_ != NULL) { - CleanupHashTbl(output_partition_->agg_fn_ctxs, output_iterator_); + CleanupHashTbl(output_partition_->agg_fn_evals, output_iterator_); output_partition_->Close(false); } ClosePartitions(); child_batch_.reset(); - for (int i = 0; i < aggregate_evaluators_.size(); ++i) { - aggregate_evaluators_[i]->Close(state); - } - agg_expr_ctxs_.clear(); - for (int i = 0; i < agg_fn_ctxs_.size(); ++i) { - agg_fn_ctxs_[i]->impl()->Close(); - } - if (agg_fn_pool_.get() != NULL) agg_fn_pool_->FreeAll(); - if (mem_pool_.get() != NULL) mem_pool_->FreeAll(); - if (ht_ctx_.get() != NULL) ht_ctx_->Close(); - if (serialize_stream_.get() != NULL) { - serialize_stream_->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); - } - if (block_mgr_client_ != NULL) { - state->block_mgr()->ClearReservations(block_mgr_client_); - } + // Close all the agg-fn-evaluators + AggFnEvaluator::Close(agg_fn_evals_, state); - Expr::Close(grouping_expr_ctxs_, state); - Expr::Close(build_expr_ctxs_, state); + if (agg_fn_pool_.get() != nullptr) agg_fn_pool_->FreeAll(); + if (mem_pool_.get() != nullptr) mem_pool_->FreeAll(); + if (ht_ctx_.get() != nullptr) ht_ctx_->Close(state); + ht_ctx_.reset(); + if (serialize_stream_.get() != nullptr) { + serialize_stream_->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES); + } + if (block_mgr_client_ != nullptr) state->block_mgr()->ClearReservations(block_mgr_client_); + ScalarExpr::Close(grouping_exprs_); + ScalarExpr::Close(build_exprs_); + AggFn::Close(agg_fns_); ExecNode::Close(state); } @@ -732,16 +690,13 @@ PartitionedAggregationNode::Partition::~Partition() { Status PartitionedAggregationNode::Partition::InitStreams() { agg_fn_pool.reset(new MemPool(parent->expr_mem_tracker())); - DCHECK_EQ(agg_fn_ctxs.size(), 0); - for (int i = 0; i < parent->agg_fn_ctxs_.size(); ++i) { - agg_fn_ctxs.push_back(parent->agg_fn_ctxs_[i]->impl()->Clone(agg_fn_pool.get())); - parent->partition_pool_->Add(agg_fn_ctxs[i]); - } - + DCHECK_EQ(agg_fn_evals.size(), 0); + AggFnEvaluator::ShallowClone(parent->partition_pool_.get(), agg_fn_pool.get(), + parent->agg_fn_evals_, &agg_fn_evals); // Varlen aggregate function results are stored outside of aggregated_row_stream because // BufferedTupleStream doesn't support relocating varlen data stored in the stream. auto agg_slot = parent->intermediate_tuple_desc_->slots().begin() + - parent->grouping_expr_ctxs_.size(); + parent->grouping_exprs_.size(); set<SlotId> external_varlen_slots; for (; agg_slot != parent->intermediate_tuple_desc_->slots().end(); ++agg_slot) { if ((*agg_slot)->type().IsVarLenStringType()) { @@ -813,8 +768,6 @@ Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() { DCHECK(!parent->serialize_stream_->is_pinned()); DCHECK(parent->serialize_stream_->has_write_block()); - const vector<AggFnEvaluator*>& evaluators = parent->aggregate_evaluators_; - // Serialize and copy the spilled partition's stream into the new stream. Status status = Status::OK(); bool failed_to_add = false; @@ -823,7 +776,7 @@ Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() { while (!it.AtEnd()) { Tuple* tuple = it.GetTuple(); it.Next(); - AggFnEvaluator::Serialize(evaluators, agg_fn_ctxs, tuple); + AggFnEvaluator::Serialize(agg_fn_evals, tuple); if (UNLIKELY(!new_stream->AddRow(reinterpret_cast<TupleRow*>(&tuple), &status))) { failed_to_add = true; break; @@ -834,7 +787,7 @@ Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() { // clean up easier (someone has to finalize this stream and we don't want to remember // where we are). if (failed_to_add) { - parent->CleanupHashTbl(agg_fn_ctxs, it); + parent->CleanupHashTbl(agg_fn_evals, it); hash_tbl->Close(); hash_tbl.reset(); aggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); @@ -881,9 +834,8 @@ Status PartitionedAggregationNode::Partition::Spill() { RETURN_IF_ERROR(SerializeStreamForSpilling()); // Free the in-memory result data. - for (int i = 0; i < agg_fn_ctxs.size(); ++i) { - agg_fn_ctxs[i]->impl()->Close(); - } + AggFnEvaluator::Close(agg_fn_evals, parent->state_); + agg_fn_evals.clear(); if (agg_fn_pool.get() != NULL) { agg_fn_pool->FreeAll(); @@ -938,7 +890,7 @@ void PartitionedAggregationNode::Partition::Close(bool finalize_rows) { // We need to walk all the rows and Finalize them here so the UDA gets a chance // to cleanup. If the hash table is gone (meaning this was spilled), the rows // should have been finalized/serialized in Spill(). - parent->CleanupHashTbl(agg_fn_ctxs, hash_tbl->Begin(parent->ht_ctx_.get())); + parent->CleanupHashTbl(agg_fn_evals, hash_tbl->Begin(parent->ht_ctx_.get())); } aggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); } @@ -946,23 +898,21 @@ void PartitionedAggregationNode::Partition::Close(bool finalize_rows) { if (unaggregated_row_stream.get() != NULL) { unaggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); } - - for (int i = 0; i < agg_fn_ctxs.size(); ++i) { - agg_fn_ctxs[i]->impl()->Close(); - } + for (AggFnEvaluator* eval : agg_fn_evals) eval->Close(parent->state_); if (agg_fn_pool.get() != NULL) agg_fn_pool->FreeAll(); } Tuple* PartitionedAggregationNode::ConstructSingletonOutputTuple( - const vector<FunctionContext*>& agg_fn_ctxs, MemPool* pool) { - DCHECK(grouping_expr_ctxs_.empty()); + const vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool) { + DCHECK(grouping_exprs_.empty()); Tuple* output_tuple = Tuple::Create(intermediate_tuple_desc_->byte_size(), pool); - InitAggSlots(agg_fn_ctxs, output_tuple); + InitAggSlots(agg_fn_evals, output_tuple); return output_tuple; } Tuple* PartitionedAggregationNode::ConstructIntermediateTuple( - const vector<FunctionContext*>& agg_fn_ctxs, MemPool* pool, Status* status) noexcept { + const vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool, + Status* status) noexcept { const int fixed_size = intermediate_tuple_desc_->byte_size(); const int varlen_size = GroupingExprsVarlenSize(); const int tuple_data_size = fixed_size + varlen_size; @@ -977,12 +927,12 @@ Tuple* PartitionedAggregationNode::ConstructIntermediateTuple( Tuple* intermediate_tuple = reinterpret_cast<Tuple*>(tuple_data); uint8_t* varlen_data = tuple_data + fixed_size; CopyGroupingValues(intermediate_tuple, varlen_data, varlen_size); - InitAggSlots(agg_fn_ctxs, intermediate_tuple); + InitAggSlots(agg_fn_evals, intermediate_tuple); return intermediate_tuple; } Tuple* PartitionedAggregationNode::ConstructIntermediateTuple( - const vector<FunctionContext*>& agg_fn_ctxs, BufferedTupleStream* stream, + const vector<AggFnEvaluator*>& agg_fn_evals, BufferedTupleStream* stream, Status* status) noexcept { DCHECK(stream != NULL && status != NULL); // Allocate space for the entire tuple in the stream. @@ -1007,7 +957,7 @@ Tuple* PartitionedAggregationNode::ConstructIntermediateTuple( Tuple* intermediate_tuple = reinterpret_cast<Tuple*>(fixed_buffer); intermediate_tuple->Init(fixed_size); CopyGroupingValues(intermediate_tuple, varlen_buffer, varlen_size); - InitAggSlots(agg_fn_ctxs, intermediate_tuple); + InitAggSlots(agg_fn_evals, intermediate_tuple); return intermediate_tuple; } @@ -1026,7 +976,7 @@ int PartitionedAggregationNode::GroupingExprsVarlenSize() { void PartitionedAggregationNode::CopyGroupingValues(Tuple* intermediate_tuple, uint8_t* buffer, int varlen_size) { // Copy over all grouping slots (the variable length data is copied below). - for (int i = 0; i < grouping_expr_ctxs_.size(); ++i) { + for (int i = 0; i < grouping_exprs_.size(); ++i) { SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[i]; if (ht_ctx_->ExprValueNull(i)) { intermediate_tuple->SetNull(slot_desc->null_indicator_offset()); @@ -1052,11 +1002,10 @@ void PartitionedAggregationNode::CopyGroupingValues(Tuple* intermediate_tuple, // TODO: codegen this function. void PartitionedAggregationNode::InitAggSlots( - const vector<FunctionContext*>& agg_fn_ctxs, Tuple* intermediate_tuple) { + const vector<AggFnEvaluator*>& agg_fn_evals, Tuple* intermediate_tuple) { vector<SlotDescriptor*>::const_iterator slot_desc = - intermediate_tuple_desc_->slots().begin() + grouping_expr_ctxs_.size(); - for (int i = 0; i < aggregate_evaluators_.size(); ++i, ++slot_desc) { - AggFnEvaluator* evaluator = aggregate_evaluators_[i]; + intermediate_tuple_desc_->slots().begin() + grouping_exprs_.size(); + for (int i = 0; i < agg_fn_evals.size(); ++i, ++slot_desc) { // To minimize branching on the UpdateTuple path, initialize the result value so that // the Add() UDA function can ignore the NULL bit of its destination value. E.g. for // SUM(), if we initialize the destination value to 0 (with the NULL bit set), we can @@ -1067,18 +1016,21 @@ void PartitionedAggregationNode::InitAggSlots( // For boolean and numeric types, the default values are false/0, so the nullable // aggregate functions SUM() and AVG() produce the correct result. For MIN()/MAX(), // initialize the value to max/min possible value for the same effect. - evaluator->Init(agg_fn_ctxs[i], intermediate_tuple); - - AggFnEvaluator::AggregationOp agg_op = evaluator->agg_op(); - if ((agg_op == AggFnEvaluator::MIN || agg_op == AggFnEvaluator::MAX) - && !evaluator->intermediate_type().IsStringType() - && !evaluator->intermediate_type().IsTimestampType()) { + AggFnEvaluator* eval = agg_fn_evals[i]; + eval->Init(intermediate_tuple); + + DCHECK(agg_fns_[i] == &(eval->agg_fn())); + const AggFn* agg_fn = agg_fns_[i]; + const AggFn::AggregationOp agg_op = agg_fn->agg_op(); + if ((agg_op == AggFn::MIN || agg_op == AggFn::MAX) && + !agg_fn->intermediate_type().IsStringType() && + !agg_fn->intermediate_type().IsTimestampType()) { ExprValue default_value; void* default_value_ptr = NULL; - if (evaluator->agg_op() == AggFnEvaluator::MIN) { + if (agg_op == AggFn::MIN) { default_value_ptr = default_value.SetToMax((*slot_desc)->type()); } else { - DCHECK_EQ(evaluator->agg_op(), AggFnEvaluator::MAX); + DCHECK_EQ(agg_op, AggFn::MAX); default_value_ptr = default_value.SetToMin((*slot_desc)->type()); } RawValue::Write(default_value_ptr, intermediate_tuple, *slot_desc, NULL); @@ -1086,34 +1038,34 @@ void PartitionedAggregationNode::InitAggSlots( } } -void PartitionedAggregationNode::UpdateTuple( - FunctionContext** agg_fn_ctxs, Tuple* tuple, TupleRow* row, bool is_merge) noexcept { - DCHECK(tuple != NULL || aggregate_evaluators_.empty()); - for (int i = 0; i < aggregate_evaluators_.size(); ++i) { +void PartitionedAggregationNode::UpdateTuple(AggFnEvaluator** agg_fn_evals, + Tuple* tuple, TupleRow* row, bool is_merge) noexcept { + DCHECK(tuple != NULL || agg_fns_.empty()); + for (int i = 0; i < agg_fns_.size(); ++i) { if (is_merge) { - aggregate_evaluators_[i]->Merge(agg_fn_ctxs[i], row->GetTuple(0), tuple); + agg_fn_evals[i]->Merge(row->GetTuple(0), tuple); } else { - aggregate_evaluators_[i]->Add(agg_fn_ctxs[i], row, tuple); + agg_fn_evals[i]->Add(row, tuple); } } } Tuple* PartitionedAggregationNode::GetOutputTuple( - const vector<FunctionContext*>& agg_fn_ctxs, Tuple* tuple, MemPool* pool) { - DCHECK(tuple != NULL || aggregate_evaluators_.empty()) << tuple; + const vector<AggFnEvaluator*>& agg_fn_evals, Tuple* tuple, MemPool* pool) { + DCHECK(tuple != NULL || agg_fn_evals.empty()) << tuple; Tuple* dst = tuple; if (needs_finalize_ && intermediate_tuple_id_ != output_tuple_id_) { dst = Tuple::Create(output_tuple_desc_->byte_size(), pool); } if (needs_finalize_) { - AggFnEvaluator::Finalize(aggregate_evaluators_, agg_fn_ctxs, tuple, dst); + AggFnEvaluator::Finalize(agg_fn_evals, tuple, dst); } else { - AggFnEvaluator::Serialize(aggregate_evaluators_, agg_fn_ctxs, tuple); + AggFnEvaluator::Serialize(agg_fn_evals, tuple); } // Copy grouping values from tuple to dst. // TODO: Codegen this. if (dst != tuple) { - int num_grouping_slots = grouping_expr_ctxs_.size(); + int num_grouping_slots = grouping_exprs_.size(); for (int i = 0; i < num_grouping_slots; ++i) { SlotDescriptor* src_slot_desc = intermediate_tuple_desc_->slots()[i]; SlotDescriptor* dst_slot_desc = output_tuple_desc_->slots()[i]; @@ -1155,8 +1107,8 @@ void PartitionedAggregationNode::DebugString(int indentation_level, << "intermediate_tuple_id=" << intermediate_tuple_id_ << " output_tuple_id=" << output_tuple_id_ << " needs_finalize=" << needs_finalize_ - << " grouping_exprs=" << Expr::DebugString(grouping_expr_ctxs_) - << " agg_exprs=" << AggFnEvaluator::DebugString(aggregate_evaluators_); + << " grouping_exprs=" << ScalarExpr::DebugString(grouping_exprs_) + << " agg_exprs=" << AggFn::DebugString(agg_fns_); ExecNode::DebugString(indentation_level, out); *out << ")"; } @@ -1326,8 +1278,7 @@ Status PartitionedAggregationNode::ProcessStream(BufferedTupleStream* input_stre RETURN_IF_ERROR(input_stream->GetNext(&batch, &eos)); RETURN_IF_ERROR( ProcessBatch<AGGREGATED_ROWS>(&batch, prefetch_mode, ht_ctx_.get())); - RETURN_IF_ERROR(state_->GetQueryStatus()); - FreeLocalAllocations(); + RETURN_IF_ERROR(QueryMaintenance(state_)); batch.Reset(); } while (!eos); } @@ -1433,37 +1384,36 @@ void PartitionedAggregationNode::ClosePartitions() { } Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) { - for (int i = 0; i < aggregate_evaluators_.size(); ++i) { - ExprContext::FreeLocalAllocations(aggregate_evaluators_[i]->input_expr_ctxs()); - } - ExprContext::FreeLocalAllocations(agg_fn_ctxs_); + AggFnEvaluator::FreeLocalAllocations(agg_fn_evals_); for (int i = 0; i < hash_partitions_.size(); ++i) { - ExprContext::FreeLocalAllocations(hash_partitions_[i]->agg_fn_ctxs); + AggFnEvaluator::FreeLocalAllocations(hash_partitions_[i]->agg_fn_evals); } + if (ht_ctx_.get() != nullptr) ht_ctx_->FreeLocalAllocations(); return ExecNode::QueryMaintenance(state); } // IR Generation for updating a single aggregation slot. Signature is: -// void UpdateSlot(FunctionContext* agg_fn_ctx, ExprContext* agg_expr_ctx, -// AggTuple* agg_tuple, char** row) +// void UpdateSlot(AggFnEvaluator* agg_expr_eval, AggTuple* agg_tuple, char** row) // // The IR for sum(double_col), which is constructed directly with the IRBuilder, is: // -// define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %agg_fn_ctx, -// %"class.impala::ExprContext"** %agg_expr_ctxs, -// { i8, [7 x i8], double }* %agg_tuple, %"class.impala::TupleRow"* %row) #34 { +// define void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval, +// <{ double, i8 }>* %agg_tuple, %"class.impala::TupleRow"* %row) #33 { // entry: -// %expr_ctx_ptr = getelementptr %"class.impala::ExprContext"*, -// %"class.impala::ExprContext"** %agg_expr_ctxs, i32 0 -// %expr_ctx = load %"class.impala::ExprContext"*, -// %"class.impala::ExprContext"** %expr_ctx_ptr -// %input0 = call { i8, double } @GetSlotRef(%"class.impala::ExprContext"* %expr_ctx, -// %"class.impala::TupleRow"* %row) -// %dst_slot_ptr = getelementptr inbounds { i8, [7 x i8], double }, -// { i8, [7 x i8], double }* %agg_tuple, i32 0, i32 2 +// %input_evals_vector = call %"class.impala::ScalarExprEvaluator"** +// @_ZNK6impala14AggFnEvaluator11input_evalsEv( +// %"class.impala::AggFnEvaluator"* %agg_fn_eval) +// %0 = getelementptr %"class.impala::ScalarExprEvaluator"*, +// %"class.impala::ScalarExprEvaluator"** %input_evals_vector, i32 0 +// %input_eval = load %"class.impala::ScalarExprEvaluator"*, +// %"class.impala::ScalarExprEvaluator"** %0 +// %input0 = call { i8, double } @GetSlotRef(%"class.impala::ScalarExprEvaluator"* +// %input_eval, %"class.impala::TupleRow"* %row) +// %dst_slot_ptr = getelementptr inbounds <{ double, i8 }>, +// <{ double, i8 }>* %agg_tuple, i32 0, i32 0 // %dst_val = load double, double* %dst_slot_ptr -// %0 = extractvalue { i8, double } %input0, 0 -// %is_null = trunc i8 %0 to i1 +// %1 = extractvalue { i8, double } %input0, 0 +// %is_null = trunc i8 %1 to i1 // br i1 %is_null, label %ret, label %not_null // // ret: ; preds = %not_null, %entry @@ -1471,83 +1421,68 @@ Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) { // // not_null: ; preds = %entry // %val = extractvalue { i8, double } %input0, 1 -// %1 = fadd double %dst_val, %val -// %2 = bitcast { i8, [7 x i8], double }* %agg_tuple to i8* -// %null_byte_ptr = getelementptr i8, i8* %2, i32 0 +// %2 = fadd double %dst_val, %val +// %3 = bitcast <{ double, i8 }>* %agg_tuple to i8* +// %null_byte_ptr = getelementptr inbounds i8, i8* %3, i32 8 // %null_byte = load i8, i8* %null_byte_ptr // %null_bit_cleared = and i8 %null_byte, -2 // store i8 %null_bit_cleared, i8* %null_byte_ptr -// store double %1, double* %dst_slot_ptr +// store double %2, double* %dst_slot_ptr // br label %ret // } // -// The IR for min(timestamp_col), which uses the UDA interface, is: +// The IR for ndv(timestamp_col), which uses the UDA interface, is: // -// define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %agg_fn_ctx, -// %"class.impala::ExprContext"** %agg_expr_ctxs, -// { i8, [7 x i8], %"class.impala::TimestampValue" }* %agg_tuple, -// %"class.impala::TupleRow"* %row) #34 { +// define void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval, +// <{ %"struct.impala::StringValue" }>* %agg_tuple, +// %"class.impala::TupleRow"* %row) #33 { // entry: -// %dst_lowered_ptr = alloca { i64, i64 } -// %input_lowered_ptr = alloca { i64, i64 } -// %expr_ctx_ptr = getelementptr %"class.impala::ExprContext"*, -// %"class.impala::ExprContext"** %agg_expr_ctxs, i32 0 -// %expr_ctx = load %"class.impala::ExprContext"*, -// %"class.impala::ExprContext"** %expr_ctx_ptr -// %input0 = call { i64, i64 } @GetSlotRef(%"class.impala::ExprContext"* %expr_ctx, -// %"class.impala::TupleRow"* %row) -// %dst_slot_ptr = getelementptr inbounds { i8, [7 x i8], -// %"class.impala::TimestampValue" }, { i8, [7 x i8], -// %"class.impala::TimestampValue" }* %agg_tuple, i32 0, i32 2 -// %dst_val = load %"class.impala::TimestampValue", -// %"class.impala::TimestampValue"* %dst_slot_ptr -// %0 = bitcast { i8, [7 x i8], %"class.impala::TimestampValue" }* %agg_tuple to i8* -// %null_byte_ptr = getelementptr i8, i8* %0, i32 0 -// %null_byte = load i8, i8* %null_byte_ptr -// %null_mask = and i8 %null_byte, 1 -// %is_null = icmp ne i8 %null_mask, 0 -// %is_null_ext = zext i1 %is_null to i64 -// %1 = or i64 0, %is_null_ext -// %dst = insertvalue { i64, i64 } zeroinitializer, i64 %1, 0 -// %time_of_day = extractvalue %"class.impala::TimestampValue" %dst_val, 0, 0, 0, 0 -// %dst1 = insertvalue { i64, i64 } %dst, i64 %time_of_day, 1 -// %date = extractvalue %"class.impala::TimestampValue" %dst_val, 1, 0, 0 -// %2 = extractvalue { i64, i64 } %dst1, 0 -// %3 = zext i32 %date to i64 +// %dst_lowered_ptr = alloca { i64, i8* } +// %0 = alloca { i64, i64 } +// %input_evals_vector = call %"class.impala::ScalarExprEvaluator"** +// @_ZNK6impala14AggFnEvaluator11input_evalsEv( +// %"class.impala::AggFnEvaluator"* %agg_fn_eval) +// %1 = getelementptr %"class.impala::ScalarExprEvaluator"*, +// %"class.impala::ScalarExprEvaluator"** %input_evals_vector, i32 0 +// %input_eval = load %"class.impala::ScalarExprEvaluator"*, +// %"class.impala::ScalarExprEvaluator"** %1 +// %input0 = call { i64, i64 } @GetSlotRef( +// %"class.impala::ScalarExprEvaluator"* %input_eval, +// %"class.impala::TupleRow"* %row) +// %dst_slot_ptr = getelementptr inbounds <{ %"struct.impala::StringValue" }>, +// <{ %"struct.impala::StringValue" }>* %agg_tuple, i32 0, i32 0 +// %dst_val = +// load %"struct.impala::StringValue", %"struct.impala::StringValue"* %dst_slot_ptr +// %ptr = extractvalue %"struct.impala::StringValue" %dst_val, 0 +// %dst = insertvalue { i64, i8* } zeroinitializer, i8* %ptr, 1 +// %len = extractvalue %"struct.impala::StringValue" %dst_val, 1 +// %2 = extractvalue { i64, i8* } %dst, 0 +// %3 = zext i32 %len to i64 // %4 = shl i64 %3, 32 // %5 = and i64 %2, 4294967295 // %6 = or i64 %5, %4 -// %dst2 = insertvalue { i64, i64 } %dst1, i64 %6, 0 -// store { i64, i64 } %input0, { i64, i64 }* %input_lowered_ptr -// %input_unlowered_ptr = bitcast { i64, i64 }* %input_lowered_ptr -// to %"struct.impala_udf::TimestampVal"* -// store { i64, i64 } %dst2, { i64, i64 }* %dst_lowered_ptr -// %dst_unlowered_ptr = bitcast { i64, i64 }* %dst_lowered_ptr -// to %"struct.impala_udf::TimestampVal"* -// call void -// @_ZN6impala18AggregateFunctions3MinIN10impala_udf12TimestampValEEEvPNS2_15FunctionContextERKT_PS6_.2( -// %"class.impala_udf::FunctionContext"* %agg_fn_ctx, -// %"struct.impala_udf::TimestampVal"* %input_unlowered_ptr, -// %"struct.impala_udf::TimestampVal"* %dst_unlowered_ptr) -// %anyval_result = load { i64, i64 }, { i64, i64 }* %dst_lowered_ptr -// %7 = extractvalue { i64, i64 } %anyval_result, 1 -// %8 = insertvalue %"class.impala::TimestampValue" zeroinitializer, i64 %7, 0, 0, 0, 0 -// %9 = extractvalue { i64, i64 } %anyval_result, 0 -// %10 = ashr i64 %9, 32 -// %11 = trunc i64 %10 to i32 -// %12 = insertvalue %"class.impala::TimestampValue" %8, i32 %11, 1, 0, 0 -// %13 = extractvalue { i64, i64 } %anyval_result, 0 -// %result_is_null = trunc i64 %13 to i1 -// %14 = bitcast { i8, [7 x i8], %"class.impala::TimestampValue" }* %agg_tuple to i8* -// %null_byte_ptr3 = getelementptr i8, i8* %14, i32 0 -// %null_byte4 = load i8, i8* %null_byte_ptr3 -// %null_bit_cleared = and i8 %null_byte4, -2 -// %15 = sext i1 %result_is_null to i8 -// %null_bit = and i8 %15, 1 -// %null_bit_set = or i8 %null_bit_cleared, %null_bit -// store i8 %null_bit_set, i8* %null_byte_ptr3 -// store %"class.impala::TimestampValue" %12, -// %"class.impala::TimestampValue"* %dst_slot_ptr +// %dst1 = insertvalue { i64, i8* } %dst, i64 %6, 0 +// %agg_fn_ctx = call %"class.impala_udf::FunctionContext"* +// @_ZNK6impala14AggFnEvaluator10agg_fn_ctxEv( +// %"class.impala::AggFnEvaluator"* %agg_fn_eval) +// store { i64, i64 } %input0, { i64, i64 }* %0 +// %input_unlowered_ptr = +// bitcast { i64, i64 }* %0 to %"struct.impala_udf::TimestampVal"* +// store { i64, i8* } %dst1, { i64, i8* }* %dst_lowered_ptr +// %dst_unlowered_ptr = +// bitcast { i64, i8* }* %dst_lowered_ptr to %"struct.impala_udf::StringVal"* +// call void @"void impala::AggregateFunctions::HllUpdate<impala_udf::TimestampVal>"( +// %"class.impala_udf::FunctionContext"* %agg_fn_ctx, +// %"struct.impala_udf::TimestampVal"* %input_unlowered_ptr, +// %"struct.impala_udf::StringVal"* %dst_unlowered_ptr) +// %anyval_result = load { i64, i8* }, { i64, i8* }* %dst_lowered_ptr +// %7 = extractvalue { i64, i8* } %anyval_result, 0 +// %8 = ashr i64 %7, 32 +// %9 = trunc i64 %8 to i32 +// %10 = insertvalue %"struct.impala::StringValue" zeroinitializer, i32 %9, 1 +// %11 = extractvalue { i64, i8* } %anyval_result, 1 +// %12 = insertvalue %"struct.impala::StringValue" %10, i8* %11, 0 +// store %"struct.impala::StringValue" %12, %"struct.impala::StringValue"* %dst_slot_ptr // br label %ret // // ret: ; preds = %entry @@ -1555,12 +1490,9 @@ Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) { // } // Status PartitionedAggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen, - AggFnEvaluator* evaluator, int evaluator_idx, SlotDescriptor* slot_desc, - Function** fn) { - PointerType* fn_ctx_type = - codegen->GetPtrType(FunctionContextImpl::LLVM_FUNCTIONCONTEXT_NAME); - PointerType* expr_ctxs_type = - codegen->GetPtrPtrType(codegen->GetType(ExprContext::LLVM_CLASS_NAME)); + int agg_fn_idx, SlotDescriptor* slot_desc, Function** fn) { + PointerType* agg_fn_eval_type = + codegen->GetPtrType(AggFnEvaluator::LLVM_CLASS_NAME); StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen); if (tuple_struct == NULL) { return Status("PartitionedAggregationNode::CodegenUpdateSlot(): failed to generate " @@ -1569,42 +1501,46 @@ Status PartitionedAggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen, PointerType* tuple_ptr_type = codegen->GetPtrType(tuple_struct); PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME); - // Create UpdateSlot prototype LlvmCodeGen::FnPrototype prototype(codegen, "UpdateSlot", codegen->void_type()); - prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_fn_ctx", fn_ctx_type)); - prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_expr_ctxs", expr_ctxs_type)); + prototype.AddArgument( + LlvmCodeGen::NamedVariable("agg_fn_eval", agg_fn_eval_type)); prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_tuple", tuple_ptr_type)); prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type)); LlvmBuilder builder(codegen->context()); - Value* args[4]; + Value* args[3]; *fn = prototype.GeneratePrototype(&builder, &args[0]); - Value* agg_fn_ctx_arg = args[0]; - Value* agg_expr_ctxs_arg = args[1]; - Value* agg_tuple_arg = args[2]; - Value* row_arg = args[3]; - - DCHECK_GE(evaluator->input_expr_ctxs().size(), 1); + Value* agg_fn_eval_arg = args[0]; + Value* agg_tuple_arg = args[1]; + Value* row_arg = args[2]; + + // Get the vector of input expressions' evaluators. + Value* input_evals_vector = codegen->CodegenCallFunction(&builder, + IRFunction::AGG_FN_EVALUATOR_INPUT_EVALUATORS, agg_fn_eval_arg, + "input_evals_vector"); + + AggFn* agg_fn = agg_fns_[agg_fn_idx]; + const int num_inputs = agg_fn->GetNumChildren(); + DCHECK_GE(num_inputs, 1); vector<CodegenAnyVal> input_vals; - for (int i = 0; i < evaluator->input_expr_ctxs().size(); ++i) { - ExprContext* agg_expr_ctx = evaluator->input_expr_ctxs()[i]; - Expr* agg_expr = agg_expr_ctx->root(); - Function* agg_expr_fn; - RETURN_IF_ERROR(agg_expr->GetCodegendComputeFn(codegen, &agg_expr_fn)); - DCHECK(agg_expr_fn != NULL); - - // Call expr function with the matching expr context to get src slot value. - Value* expr_ctx_ptr = builder.CreateInBoundsGEP( - agg_expr_ctxs_arg, codegen->GetIntConstant(TYPE_INT, i), "expr_ctx_ptr"); - Value* expr_ctx = builder.CreateLoad(expr_ctx_ptr, "expr_ctx"); + for (int i = 0; i < num_inputs; ++i) { + ScalarExpr* input_expr = agg_fn->GetChild(i); + Function* input_expr_fn; + RETURN_IF_ERROR(input_expr->GetCodegendComputeFn(codegen, &input_expr_fn)); + DCHECK(input_expr_fn != NULL); + + // Call input expr function with the matching evaluator to get src slot value. + Value* input_eval = + codegen->CodegenArrayAt(&builder, input_evals_vector, i, "input_eval"); string input_name = Substitute("input$0", i); - input_vals.push_back( - CodegenAnyVal::CreateCallWrapped(codegen, &builder, agg_expr->type(), agg_expr_fn, - ArrayRef<Value*>({expr_ctx, row_arg}), input_name.c_str())); + CodegenAnyVal input_val = CodegenAnyVal::CreateCallWrapped(codegen, &builder, + input_expr->type(), input_expr_fn, ArrayRef<Value*>({input_eval, row_arg}), + input_name.c_str()); + input_vals.push_back(input_val); } - AggFnEvaluator::AggregationOp agg_op = evaluator->agg_op(); - const ColumnType& dst_type = evaluator->intermediate_type(); + AggFn::AggregationOp agg_op = agg_fn->agg_op(); + const ColumnType& dst_type = agg_fn->intermediate_type(); bool dst_is_int_or_float_or_bool = dst_type.IsIntegerType() || dst_type.IsFloatingPointType() || dst_type.IsBooleanType(); bool dst_is_numeric_or_bool = dst_is_int_or_float_or_bool || dst_type.IsDecimalType(); @@ -1615,23 +1551,24 @@ Status PartitionedAggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen, // for special cases where we can emit a very simple instruction sequence, then fall // back to the general-purpose approach of calling the cross-compiled builtin UDA. CodegenAnyVal& src = input_vals[0]; + // 'dst_slot_ptr' points to the slot in the aggregate tuple to update. Value* dst_slot_ptr = builder.CreateStructGEP( NULL, agg_tuple_arg, slot_desc->llvm_field_idx(), "dst_slot_ptr"); Value* result = NULL; Value* dst_value = builder.CreateLoad(dst_slot_ptr, "dst_val"); - if (agg_op == AggFnEvaluator::COUNT) { + // TODO: consider moving the following codegen logic to AggFn. + if (agg_op == AggFn::COUNT) { src.CodegenBranchIfNull(&builder, ret_block); - if (evaluator->is_merge()) { + if (agg_fn->is_merge()) { result = builder.CreateAdd(dst_value, src.GetVal(), "count_sum"); } else { result = builder.CreateAdd( dst_value, codegen->GetIntConstant(TYPE_BIGINT, 1), "count_inc"); } DCHECK(!slot_desc->is_nullable()); - } else if ((agg_op == AggFnEvaluator::MIN || agg_op == AggFnEvaluator::MAX) - && dst_is_numeric_or_bool) { - bool is_min = agg_op == AggFnEvaluator::MIN; + } else if ((agg_op == AggFn::MIN || agg_op == AggFn::MAX) && dst_is_numeric_or_bool) { + bool is_min = agg_op == AggFn::MIN; src.CodegenBranchIfNull(&builder, ret_block); Function* min_max_fn = codegen->CodegenMinMax(slot_desc->type(), is_min); Value* min_max_args[] = {dst_value, src.GetVal()}; @@ -1641,7 +1578,7 @@ Status PartitionedAggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen, DCHECK(slot_desc->is_nullable()); slot_desc->CodegenSetNullIndicator( codegen, &builder, agg_tuple_arg, codegen->false_value()); - } else if (agg_op == AggFnEvaluator::SUM && dst_is_int_or_float_or_bool) { + } else if (agg_op == AggFn::SUM && dst_is_int_or_float_or_bool) { src.CodegenBranchIfNull(&builder, ret_block); if (dst_type.IsFloatingPointType()) { result = builder.CreateFAdd(dst_value, src.GetVal()); @@ -1662,11 +1599,11 @@ Status PartitionedAggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen, // value of the slot was initialized in the right way in InitAggSlots() (e.g. 0 for // SUM) that we get the right result if UpdateSlot() pretends that the NULL bit of // 'dst' is unset. Empirically this optimisation makes TPC-H Q1 5-10% faster. - bool special_null_handling = !evaluator->intermediate_type().IsStringType() - && !evaluator->intermediate_type().IsTimestampType() - && (agg_op == AggFnEvaluator::MIN || agg_op == AggFnEvaluator::MAX - || agg_op == AggFnEvaluator::SUM || agg_op == AggFnEvaluator::AVG - || agg_op == AggFnEvaluator::NDV); + bool special_null_handling = !agg_fn->intermediate_type().IsStringType() + && !agg_fn->intermediate_type().IsTimestampType() + && (agg_op == AggFn::MIN || agg_op == AggFn::MAX + || agg_op == AggFn::SUM || agg_op == AggFn::AVG + || agg_op == AggFn::NDV); if (slot_desc->is_nullable()) { if (special_null_handling) { src.CodegenBranchIfNull(&builder, ret_block); @@ -1678,10 +1615,17 @@ Status PartitionedAggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen, } dst.SetFromRawValue(dst_value); + // Get the FunctionContext object for the AggFnEvaluator. + Function* get_agg_fn_ctx_fn = + codegen->GetFunction(IRFunction::AGG_FN_EVALUATOR_AGG_FN_CTX, false); + DCHECK(get_agg_fn_ctx_fn != NULL); + Value* agg_fn_ctx_val = + builder.CreateCall(get_agg_fn_ctx_fn, {agg_fn_eval_arg}, "agg_fn_ctx"); + // Call the UDA to update/merge 'src' into 'dst', with the result stored in // 'updated_dst_val'. CodegenAnyVal updated_dst_val; - RETURN_IF_ERROR(CodegenCallUda(codegen, &builder, evaluator, agg_fn_ctx_arg, + RETURN_IF_ERROR(CodegenCallUda(codegen, &builder, agg_fn, agg_fn_ctx_val, input_vals, dst, &updated_dst_val)); result = updated_dst_val.ToNativeValue(); @@ -1704,7 +1648,7 @@ Status PartitionedAggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen, // memory/CPU usage scales super-linearly with function size. // E.g. compute stats on all columns of a 1000-column table previously took 4 minutes to // codegen because all the UpdateSlot() functions were inlined. - if (evaluator_idx >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) { + if (agg_fn_idx >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) { codegen->SetNoInline(*fn); } @@ -1717,29 +1661,29 @@ Status PartitionedAggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen, } Status PartitionedAggregationNode::CodegenCallUda(LlvmCodeGen* codegen, - LlvmBuilder* builder, AggFnEvaluator* evaluator, Value* agg_fn_ctx_arg, - const vector<CodegenAnyVal>& input_vals, const CodegenAnyVal& dst, + LlvmBuilder* builder, AggFn* agg_fn, Value* agg_fn_ctx_val, + const vector<CodegenAnyVal>& input_vals, const CodegenAnyVal& dst_val, CodegenAnyVal* updated_dst_val) { - DCHECK_EQ(evaluator->input_expr_ctxs().size(), input_vals.size()); Function* uda_fn; - RETURN_IF_ERROR(evaluator->GetUpdateOrMergeFunction(codegen, &uda_fn)); + RETURN_IF_ERROR(agg_fn->CodegenUpdateOrMergeFunction(codegen, &uda_fn)); // Set up arguments for call to UDA, which are the FunctionContext*, followed by // pointers to all input values, followed by a pointer to the destination value. vector<Value*> uda_fn_args; - uda_fn_args.push_back(agg_fn_ctx_arg); + uda_fn_args.push_back(agg_fn_ctx_val); // Create pointers to input args to pass to uda_fn. We must use the unlowered type, // e.g. IntVal, because the UDA interface expects the values to be passed as const // references to the classes. - for (int i = 0; i < evaluator->input_expr_ctxs().size(); ++i) { + DCHECK_EQ(agg_fn->GetNumChildren(), input_vals.size()); + for (int i = 0; i < input_vals.size(); ++i) { uda_fn_args.push_back(input_vals[i].GetUnloweredPtr("input_unlowered_ptr")); } // Create pointer to dst to pass to uda_fn. We must use the unlowered type for the // same reason as above. - Value* dst_lowered_ptr = dst.GetLoweredPtr("dst_lowered_ptr"); - const ColumnType& dst_type = evaluator->intermediate_type(); + Value* dst_lowered_ptr = dst_val.GetLoweredPtr("dst_lowered_ptr"); + const ColumnType& dst_type = agg_fn->intermediate_type(); Type* dst_unlowered_ptr_type = CodegenAnyVal::GetUnloweredPtrType(codegen, dst_type); Value* dst_unlowered_ptr = builder->CreateBitCast( dst_lowered_ptr, dst_unlowered_ptr_type, "dst_unlowered_ptr"); @@ -1761,41 +1705,31 @@ Status PartitionedAggregationNode::CodegenCallUda(LlvmCodeGen* codegen, // For the query: // select count(*), count(int_col), sum(double_col) the IR looks like: // -// ; Function Attrs: alwaysinline // define void @UpdateTuple(%"class.impala::PartitionedAggregationNode"* %this_ptr, -// %"class.impala_udf::FunctionContext"** %agg_fn_ctxs, %"class.impala::Tuple"* -// %tuple, -// %"class.impala::TupleRow"* %row, i1 %is_merge) #34 { +// %"class.impala::AggFnEvaluator"** %agg_fn_evals, %"class.impala::Tuple"* %tuple, +// %"class.impala::TupleRow"* %row, i1 %is_merge) #33 { // entry: -// %tuple1 = -// bitcast %"class.impala::Tuple"* %tuple to { i8, [7 x i8], i64, i64, double }* -// %src_slot = getelementptr inbounds { i8, [7 x i8], i64, i64, double }, -// { i8, [7 x i8], i64, i64, double }* %tuple1, i32 0, i32 2 +// %tuple1 = bitcast %"class.impala::Tuple"* %tuple to <{ i64, i64, double, i8 }>* +// %src_slot = getelementptr inbounds <{ i64, i64, double, i8 }>, +// <{ i64, i64, double, i8 }>* %tuple1, i32 0, i32 0 // %count_star_val = load i64, i64* %src_slot // %count_star_inc = add i64 %count_star_val, 1 // store i64 %count_star_inc, i64* %src_slot -// %0 = getelementptr %"class.impala_udf::FunctionContext"*, -// %"class.impala_udf::FunctionContext"** %agg_fn_ctxs, i32 1 -// %agg_fn_ctx = load %"class.impala_udf::FunctionContext"*, -// %"class.impala_udf::FunctionContext"** %0 -// %1 = call %"class.impala::ExprContext"** -// @_ZNK6impala26PartitionedAggregationNode18GetAggExprContextsEi( -// %"class.impala::PartitionedAggregationNode"* %this_ptr, i32 1) -// call void @UpdateSlot(%"class.impala_udf::FunctionContext"* %agg_fn_ctx, -// %"class.impala::ExprContext"** %1, { i8, [7 x i8], i64, i64, double }* %tuple1, -// %"class.impala::TupleRow"* %row) -// %2 = getelementptr %"class.impala_udf::FunctionContext"*, -// %"class.impala_udf::FunctionContext"** %agg_fn_ctxs, i32 2 -// %agg_fn_ctx2 = load %"class.impala_udf::FunctionContext"*, -// %"class.impala_udf::FunctionContext"** %2 -// %3 = call %"class.impala::ExprContext"** -// @_ZNK6impala26PartitionedAggregationNode18GetAggExprContextsEi( -// %"class.impala::PartitionedAggregationNode"* %this_ptr, i32 2) -// call void @UpdateSlot.4(%"class.impala_udf::FunctionContext"* %agg_fn_ctx2, -// %"class.impala::ExprContext"** %3, { i8, [7 x i8], i64, i64, double }* %tuple1, -// %"class.impala::TupleRow"* %row) +// %0 = getelementptr %"class.impala::AggFnEvaluator"*, +// %"class.impala::AggFnEvaluator"** %agg_fn_evals, i32 1 +// %agg_fn_eval = +// load %"class.impala::AggFnEvaluator"*, %"class.impala::AggFnEvaluator"** %0 +// call void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval, +// <{ i64, i64, double, i8 }>* %tuple1, %"class.impala::TupleRow"* %row) +// %1 = getelementptr %"class.impala::AggFnEvaluator"*, +// %"class.impala::AggFnEvaluator"** %agg_fn_evals, i32 2 +// %agg_fn_eval2 = +// load %"class.impala::AggFnEvaluator"*, %"class.impala::AggFnEvaluator"** %1 +// call void @UpdateSlot.2(%"class.impala::AggFnEvaluator"* %agg_fn_eval2, +// <{ i64, i64, double, i8 }>* %tuple1, %"class.impala::TupleRow"* %row) // ret void // } +// Status PartitionedAggregationNode::CodegenUpdateTuple( LlvmCodeGen* codegen, Function** fn) { SCOPED_TIMER(codegen->codegen_timer()); @@ -1814,12 +1748,11 @@ Status PartitionedAggregationNode::CodegenUpdateTuple( // Get the types to match the UpdateTuple signature Type* agg_node_type = codegen->GetType(PartitionedAggregationNode::LLVM_CLASS_NAME); - Type* fn_ctx_type = codegen->GetType(FunctionContextImpl::LLVM_FUNCTIONCONTEXT_NAME); Type* tuple_type = codegen->GetType(Tuple::LLVM_CLASS_NAME); Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME); PointerType* agg_node_ptr_type = codegen->GetPtrType(agg_node_type); - PointerType* fn_ctx_ptr_ptr_type = codegen->GetPtrPtrType(fn_ctx_type); + PointerType* evals_type = codegen->GetPtrPtrType(AggFnEvaluator::LLVM_CLASS_NAME); PointerType* tuple_ptr_type = codegen->GetPtrType(tuple_type); PointerType* tuple_row_ptr_type = codegen->GetPtrType(tuple_row_type); @@ -1827,7 +1760,7 @@ Status PartitionedAggregationNode::CodegenUpdateTuple( PointerType* tuple_ptr = codegen->GetPtrType(tuple_struct); LlvmCodeGen::FnPrototype prototype(codegen, "UpdateTuple", codegen->void_type()); prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", agg_node_ptr_type)); - prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_fn_ctxs", fn_ctx_ptr_ptr_type)); + prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_fn_evals", evals_type)); prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple", tuple_ptr_type)); prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type)); prototype.AddArgument(LlvmCodeGen::NamedVariable("is_merge", codegen->boolean_type())); @@ -1835,8 +1768,7 @@ Status PartitionedAggregationNode::CodegenUpdateTuple( LlvmBuilder builder(codegen->context()); Value* args[5]; *fn = prototype.GeneratePrototype(&builder, &args[0]); - Value* this_arg = args[0]; - Value* agg_fn_ctxs_arg = args[1]; + Value* agg_fn_evals_arg = args[1]; Value* tuple_arg = args[2]; Value* row_arg = args[3]; @@ -1844,17 +1776,13 @@ Status PartitionedAggregationNode::CodegenUpdateTuple( // TODO: get rid of this by using right type in function signature tuple_arg = builder.CreateBitCast(tuple_arg, tuple_ptr, "tuple"); - Function* get_expr_ctxs_fn = - codegen->GetFunction(IRFunction::PART_AGG_NODE_GET_EXPR_CTXS, false); - DCHECK(get_expr_ctxs_fn != NULL); - // Loop over each expr and generate the IR for that slot. If the expr is not // count(*), generate a helper IR function to update the slot and call that. - int j = grouping_expr_ctxs_.size(); - for (int i = 0; i < aggregate_evaluators_.size(); ++i, ++j) { + int j = grouping_exprs_.size(); + for (int i = 0; i < agg_fns_.size(); ++i, ++j) { SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[j]; - AggFnEvaluator* evaluator = aggregate_evaluators_[i]; - if (evaluator->is_count_star()) { + AggFn* agg_fn = agg_fns_[i]; + if (agg_fn->is_count_star()) { // TODO: we should be able to hoist this up to the loop over the batch and just // increment the slot by the number of rows in the batch. int field_idx = slot_desc->llvm_field_idx(); @@ -1865,15 +1793,14 @@ Status PartitionedAggregationNode::CodegenUpdateTuple( builder.CreateStore(count_inc, slot_ptr); } else { Function* update_slot_fn; - RETURN_IF_ERROR( - CodegenUpdateSlot(codegen, evaluator, i, slot_desc, &update_slot_fn)); - Value* agg_fn_ctx_ptr = builder.CreateConstGEP1_32(agg_fn_ctxs_arg, i); - Value* agg_fn_ctx = builder.CreateLoad(agg_fn_ctx_ptr, "agg_fn_ctx"); - // Call GetExprCtx() to get the expression context. - DCHECK(agg_expr_ctxs_[i] != NULL); - Value* get_expr_ctxs_args[] = {this_arg, codegen->GetIntConstant(TYPE_INT, i)}; - Value* agg_expr_ctxs = builder.CreateCall(get_expr_ctxs_fn, get_expr_ctxs_args); - Value* update_slot_args[] = {agg_fn_ctx, agg_expr_ctxs, tuple_arg, row_arg}; + RETURN_IF_ERROR(CodegenUpdateSlot(codegen, i, slot_desc, &update_slot_fn)); + + // Load agg_fn_evals_[i] + Value* agg_fn_eval_val = + codegen->CodegenArrayAt(&builder, agg_fn_evals_arg, i, "agg_fn_eval"); + + // Call UpdateSlot(agg_fn_evals_[i], tuple, row); + Value* update_slot_args[] = {agg_fn_eval_val, tuple_arg, row_arg}; builder.CreateCall(update_slot_fn, update_slot_args); } } @@ -1881,7 +1808,7 @@ Status PartitionedAggregationNode::CodegenUpdateTuple( // Avoid inlining big UpdateTuple function into outer loop - we're unlikely to get // any benefit from it since the function call overhead will be amortized. - if (aggregate_evaluators_.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) { + if (agg_fns_.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) { codegen->SetNoInline(*fn); } @@ -1902,14 +1829,14 @@ Status PartitionedAggregationNode::CodegenProcessBatch(LlvmCodeGen* codegen, RETURN_IF_ERROR(CodegenUpdateTuple(codegen, &update_tuple_fn)); // Get the cross compiled update row batch function - IRFunction::Type ir_fn = (!grouping_expr_ctxs_.empty() ? + IRFunction::Type ir_fn = (!grouping_exprs_.empty() ? IRFunction::PART_AGG_NODE_PROCESS_BATCH_UNAGGREGATED : IRFunction::PART_AGG_NODE_PROCESS_BATCH_NO_GROUPING); Function* process_batch_fn = codegen->GetFunction(ir_fn, true); DCHECK(process_batch_fn != NULL); int replaced; - if (!grouping_expr_ctxs_.empty()) { + if (!grouping_exprs_.empty()) { // Codegen for grouping using hash table // Replace prefetch_mode with constant so branches can be optimised out. @@ -1960,7 +1887,7 @@ Status PartitionedAggregationNode::CodegenProcessBatch(LlvmCodeGen* codegen, "ProcessBatch() function failed verification, see log"); } - void **codegened_fn_ptr = grouping_expr_ctxs_.empty() ? + void **codegened_fn_ptr = grouping_exprs_.empty() ? reinterpret_cast<void**>(&process_batch_no_grouping_fn_) : reinterpret_cast<void**>(&process_batch_fn_); codegen->AddFunctionToJit(process_batch_fn, codegened_fn_ptr);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/partitioned-aggregation-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h index 2155473..ccac45b 100644 --- a/be/src/exec/partitioned-aggregation-node.h +++ b/be/src/exec/partitioned-aggregation-node.h @@ -37,6 +37,7 @@ class Value; namespace impala { +class AggFn; class AggFnEvaluator; class CodegenAnyVal; class LlvmCodeGen; @@ -139,7 +140,7 @@ class PartitionedAggregationNode : public ExecNode { static const char* LLVM_CLASS_NAME; protected: - /// Frees local allocations from aggregate_evaluators_ and agg_fn_ctxs + /// Frees local allocations from aggregate_evals_ and agg_fn_evals virtual Status QueryMaintenance(RuntimeState* state); virtual void DebugString(int indentation_level, std::stringstream* out) const; @@ -180,7 +181,11 @@ class PartitionedAggregationNode : public ExecNode { TupleDescriptor* intermediate_tuple_desc_; /// Row with the intermediate tuple as its only tuple. - boost::scoped_ptr<RowDescriptor> intermediate_row_desc_; + /// Construct a new row desc for preparing the build exprs because neither the child's + /// nor this node's output row desc may contain the intermediate tuple, e.g., + /// in a single-node plan with an intermediate tuple different from the output tuple. + /// Lives in the query state's obj_pool. + RowDescriptor* intermediate_row_desc_; /// Tuple into which Finalize() results are stored. Possibly the same as /// the intermediate tuple. @@ -195,39 +200,32 @@ class PartitionedAggregationNode : public ExecNode { /// True if this is first phase of a two-phase distributed aggregation for which we /// are doing a streaming preaggregation. - bool is_streaming_preagg_; + const bool is_streaming_preagg_; /// True if any of the evaluators require the serialize step. bool needs_serialize_; /// The list of all aggregate operations for this exec node. - std::vector<AggFnEvaluator*> aggregate_evaluators_; - - /// Cache of the ExprContexts of 'aggregate_evaluators_'. Used in the codegen'ed - /// version of UpdateTuple() to avoid loading aggregate_evaluators_[i] at runtime. - /// An entry is NULL if the aggregate evaluator is not codegen'ed or there is no Expr - /// in the aggregate evaluator (e.g. count(*)). - std::vector<ExprContext* const*> agg_expr_ctxs_; - - /// FunctionContext for each aggregate function and backing MemPool. String data - /// returned by the aggregate functions is allocated via these contexts. - /// These contexts are only passed to the evaluators in the non-partitioned - /// (non-grouping) case. Otherwise they are only used to clone FunctionContexts for the - /// partitions. + std::vector<AggFn*> agg_fns_; + + /// Evaluators for each aggregate function and backing MemPool. String data + /// returned by the aggregate functions is allocated via these evaluators. + /// These evaluatorss are only used for the non-grouping cases. For queries + /// with the group-by clause, each partition will clone these evaluators. /// TODO: we really need to plumb through CHAR(N) for intermediate types. - std::vector<impala_udf::FunctionContext*> agg_fn_ctxs_; + std::vector<AggFnEvaluator*> agg_fn_evals_; boost::scoped_ptr<MemPool> agg_fn_pool_; /// Exprs used to evaluate input rows - std::vector<ExprContext*> grouping_expr_ctxs_; + std::vector<ScalarExpr*> grouping_exprs_; /// Exprs used to insert constructed aggregation tuple into the hash table. /// All the exprs are simply SlotRefs for the intermediate tuple. - std::vector<ExprContext*> build_expr_ctxs_; + std::vector<ScalarExpr*> build_exprs_; - /// Indices of grouping exprs with var-len string types in grouping_expr_ctxs_. We need - /// to do more work for var-len expressions when allocating and spilling rows. All - /// var-len grouping exprs have type string. + /// Indices of grouping exprs with var-len string types in grouping_exprs_. + /// We need to do more work for var-len expressions when allocating and spilling rows. + /// All var-len grouping exprs have type string. std::vector<int> string_grouping_exprs_; RuntimeState* state_; @@ -325,6 +323,8 @@ class PartitionedAggregationNode : public ExecNode { bool child_eos_; /// Used for hash-related functionality, such as evaluating rows and calculating hashes. + /// It also owns the evaluators for the grouping and build expressions used during hash + /// table insertion and probing. boost::scoped_ptr<HashTableCtx> ht_ctx_; /// Object pool that holds the Partition objects in hash_partitions_. @@ -395,8 +395,8 @@ class PartitionedAggregationNode : public ExecNode { /// is spilled). boost::scoped_ptr<HashTable> hash_tbl; - /// Clone of parent's agg_fn_ctxs_ and backing MemPool. - std::vector<impala_udf::FunctionContext*> agg_fn_ctxs; + /// Clone of parent's agg_fn_evals_ and backing MemPool. + std::vector<AggFnEvaluator*> agg_fn_evals; boost::scoped_ptr<MemPool> agg_fn_pool; /// Tuple stream used to store aggregated rows. When the partition is not spilled, @@ -432,54 +432,53 @@ class PartitionedAggregationNode : public ExecNode { /// Copies string data from the specified slot into 'pool', and sets the StringValues' /// ptrs to the copied data. Copies data from all tuples in 'row_batch' from /// 'first_row_idx' onwards. 'slot_desc' must have a var-len string type. - Status CopyStringData(const SlotDescriptor* slot_desc, RowBatch* row_batch, + Status CopyStringData(const SlotDescriptor& slot_desc, RowBatch* row_batch, int first_row_idx, MemPool* pool); /// Constructs singleton output tuple, allocating memory from pool. Tuple* ConstructSingletonOutputTuple( - const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs, MemPool* pool); + const std::vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool); /// Copies grouping values stored in 'ht_ctx_' that were computed over 'current_row_' - /// using 'grouping_expr_ctxs_'. Aggregation expr slots are set to their initial values. - /// Returns NULL if there was not enough memory to allocate the tuple or an error - /// occurred. When returning NULL, sets *status. Allocates tuple and var-len data for + /// using 'grouping_expr_evals_'. Aggregation expr slots are set to their initial + /// values. Returns NULL if there was not enough memory to allocate the tuple or errors + /// occurred. In which case, 'status' is set. Allocates tuple and var-len data for /// grouping exprs from stream. Var-len data for aggregate exprs is allocated from the /// FunctionContexts, so is stored outside the stream. If stream's small buffers get /// full, it will attempt to switch to IO-buffers. - Tuple* ConstructIntermediateTuple( - const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs, + Tuple* ConstructIntermediateTuple(const std::vector<AggFnEvaluator*>& agg_fn_evals, BufferedTupleStream* stream, Status* status) noexcept; /// Constructs intermediate tuple, allocating memory from pool instead of the stream. /// Returns NULL and sets status if there is not enough memory to allocate the tuple. - Tuple* ConstructIntermediateTuple( - const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs, MemPool* pool, - Status* status) noexcept; + Tuple* ConstructIntermediateTuple(const std::vector<AggFnEvaluator*>& agg_fn_evals, + MemPool* pool, Status* status) noexcept; /// Returns the number of bytes of variable-length data for the grouping values stored /// in 'ht_ctx_'. int GroupingExprsVarlenSize(); /// Initializes intermediate tuple by copying grouping values stored in 'ht_ctx_' that - /// that were computed over 'current_row_' using 'grouping_expr_ctxs_'. Writes the + /// that were computed over 'current_row_' using 'grouping_expr_evals_'. Writes the /// var-len data into buffer. 'buffer' points to the start of a buffer of at least the /// size of the variable-length data: 'varlen_size'. void CopyGroupingValues(Tuple* intermediate_tuple, uint8_t* buffer, int varlen_size); /// Initializes the aggregate function slots of an intermediate tuple. /// Any var-len data is allocated from the FunctionContexts. - void InitAggSlots(const vector<impala_udf::FunctionContext*>& agg_fn_ctxs, + void InitAggSlots(const std::vector<AggFnEvaluator*>& agg_fn_evals, Tuple* intermediate_tuple); /// Updates the given aggregation intermediate tuple with aggregation values computed - /// over 'row' using 'agg_fn_ctxs'. Whether the agg fn evaluator calls Update() or + /// over 'row' using 'agg_fn_evals'. Whether the agg fn evaluator calls Update() or /// Merge() is controlled by the evaluator itself, unless enforced explicitly by passing /// in is_merge == true. The override is needed to merge spilled and non-spilled rows /// belonging to the same partition independent of whether the agg fn evaluators have /// is_merge() == true. /// This function is replaced by codegen (which is why we don't use a vector argument - /// for agg_fn_ctxs).. Any var-len data is allocated from the FunctionContexts. - void UpdateTuple(impala_udf::FunctionContext** agg_fn_ctxs, Tuple* tuple, TupleRow* row, + /// for agg_fn_evals).. Any var-len data is allocated from the FunctionContexts. + /// TODO: Fix the arguments order. Need to update CodegenUpdateTuple() too. + void UpdateTuple(AggFnEvaluator** agg_fn_evals, Tuple* tuple, TupleRow* row, bool is_merge = false) noexcept; /// Called on the intermediate tuple of each group after all input rows have been @@ -491,7 +490,7 @@ class PartitionedAggregationNode : public ExecNode { /// the finalized/serialized aggregate values is returned. /// TODO: Coordinate the allocation of new tuples with the release of memory /// so as not to make memory consumption blow up. - Tuple* GetOutputTuple(const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs, + Tuple* GetOutputTuple(const std::vector<AggFnEvaluator*>& agg_fn_evals, Tuple* tuple, MemPool* pool); /// Do the aggregation for all tuple rows in the batch when there is no grouping. @@ -527,11 +526,6 @@ class PartitionedAggregationNode : public ExecNode { template <bool AGGREGATED_ROWS> Status IR_ALWAYS_INLINE ProcessRow(TupleRow* row, HashTableCtx* ht_ctx); - /// Accessor for the expression contexts of an AggFnEvaluator. Returns an array of - /// pointers the the AggFnEvaluator's expression contexts. Used only in codegen'ed - /// version of UpdateTuple(). - ExprContext* const* IR_ALWAYS_INLINE GetAggExprContexts(int i) const; - /// Create a new intermediate tuple in partition, initialized with row. ht_ctx is /// the context for the partition's hash table and hash is the precomputed hash of /// the row. The row can be an unaggregated or aggregated row depending on @@ -637,23 +631,23 @@ class PartitionedAggregationNode : public ExecNode { void ClosePartitions(); /// Calls finalizes on all tuples starting at 'it'. - void CleanupHashTbl(const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs, + void CleanupHashTbl(const std::vector<AggFnEvaluator*>& agg_fn_evals, HashTable::Iterator it); - /// Codegen UpdateSlot(). Returns non-OK status if codegen is unsuccessful. - /// Assumes is_merge = false; - Status CodegenUpdateSlot(LlvmCodeGen* codegen, AggFnEvaluator* evaluator, - int evaluator_idx, SlotDescriptor* slot_desc, llvm::Function** fn); + /// Codegen for updating aggregate expressions agg_fns_[agg_fn_idx] + /// and returns the IR function in 'fn'. Returns non-OK status if codegen + /// is unsuccessful. + Status CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx, + SlotDescriptor* slot_desc, llvm::Function** fn); /// Codegen a call to a function implementing the UDA interface with input values /// from 'input_vals'. 'dst_val' should contain the previous value of the aggregate /// function, and 'updated_dst_val' is set to the new value after the Update or Merge /// operation is applied. The instruction sequence for the UDA call is inserted at /// the insert position of 'builder'. - Status CodegenCallUda(LlvmCodeGen* codegen, LlvmBuilder* builder, - AggFnEvaluator* evaluator, llvm::Value* agg_fn_ctx_arg, - const std::vector<CodegenAnyVal>& input_vals, const CodegenAnyVal& dst_val, - CodegenAnyVal* updated_dst_val); + Status CodegenCallUda(LlvmCodeGen* codegen, LlvmBuilder* builder, AggFn* agg_fn, + llvm::Value* agg_fn_ctx_arg, const std::vector<CodegenAnyVal>& input_vals, + const CodegenAnyVal& dst_val, CodegenAnyVal* updated_dst_val); /// Codegen UpdateTuple(). Returns non-OK status if codegen is unsuccessful. Status CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/partitioned-hash-join-builder-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-builder-ir.cc b/be/src/exec/partitioned-hash-join-builder-ir.cc index be980c7..e5f649e 100644 --- a/be/src/exec/partitioned-hash-join-builder-ir.cc +++ b/be/src/exec/partitioned-hash-join-builder-ir.cc @@ -59,7 +59,7 @@ Status PhjBuilder::ProcessBuildBatch( DCHECK_EQ(ctx->level(), 0) << "Runtime filters should not be built during repartitioning."; // TODO: unroll loop and codegen expr evaluation and hashing (IMPALA-3360). - for (const FilterContext& ctx : filters_) ctx.Insert(build_row); + for (const FilterContext& ctx : filter_ctxs_) ctx.Insert(build_row); } const uint32_t hash = expr_vals_cache->CurExprValuesHash(); const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/partitioned-hash-join-builder.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc index 57a690c..a17e295 100644 --- a/be/src/exec/partitioned-hash-join-builder.cc +++ b/be/src/exec/partitioned-hash-join-builder.cc @@ -23,8 +23,8 @@ #include "codegen/llvm-codegen.h" #include "exec/hash-table.inline.h" -#include "exprs/expr-context.h" -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "runtime/buffered-tuple-stream.h" #include "runtime/mem-tracker.h" #include "runtime/row-batch.h" @@ -75,31 +75,41 @@ PhjBuilder::PhjBuilder(int join_node_id, TJoinOp::type join_op, insert_batch_fn_(NULL), insert_batch_fn_level0_(NULL) {} -Status PhjBuilder::Init(RuntimeState* state, +Status PhjBuilder::Init(const vector<TExpr>& thrift_output_exprs, + const TDataSink& tsink, RuntimeState* state) { + return Status::OK(); +} + +Status PhjBuilder::InitExprsAndFilters(RuntimeState* state, const vector<TEqJoinCondition>& eq_join_conjuncts, - const vector<TRuntimeFilterDesc>& filters) { + const vector<TRuntimeFilterDesc>& filter_descs) { for (const TEqJoinCondition& eq_join_conjunct : eq_join_conjuncts) { - ExprContext* ctx; - RETURN_IF_ERROR(Expr::CreateExprTree(&pool_, eq_join_conjunct.right, &ctx)); - build_expr_ctxs_.push_back(ctx); + ScalarExpr* build_expr; + RETURN_IF_ERROR( + ScalarExpr::Create(eq_join_conjunct.right, row_desc_, state, &build_expr)); + build_exprs_.push_back(build_expr); is_not_distinct_from_.push_back(eq_join_conjunct.is_not_distinct_from); } - for (const TRuntimeFilterDesc& filter : filters) { - // If filter propagation not enabled, only consider building broadcast joins (that may - // be consumed by this fragment). - if (state->query_options().runtime_filter_mode != TRuntimeFilterMode::GLOBAL - && !filter.is_broadcast_join) { + for (const TRuntimeFilterDesc& filter_desc : filter_descs) { + // If filter propagation not enabled, only consider building broadcast joins (that + // may be consumed by this fragment). + if (state->query_options().runtime_filter_mode != TRuntimeFilterMode::GLOBAL && + !filter_desc.is_broadcast_join) { continue; } - if (state->query_options().disable_row_runtime_filtering - && !filter.applied_on_partition_columns) { + if (state->query_options().disable_row_runtime_filtering && + !filter_desc.applied_on_partition_columns) { continue; } - FilterContext filter_ctx; - filter_ctx.filter = state->filter_bank()->RegisterFilter(filter, true); - RETURN_IF_ERROR(Expr::CreateExprTree(&pool_, filter.src_expr, &filter_ctx.expr_ctx)); - filters_.push_back(filter_ctx); + ScalarExpr* filter_expr; + RETURN_IF_ERROR( + ScalarExpr::Create(filter_desc.src_expr, row_desc_, state, &filter_expr)); + filter_exprs_.push_back(filter_expr); + + // TODO: Move to Prepare(). + filter_ctxs_.emplace_back(); + filter_ctxs_.back().filter = state->filter_bank()->RegisterFilter(filter_desc, true); } return Status::OK(); } @@ -108,21 +118,25 @@ string PhjBuilder::GetName() { return Substitute("Hash Join Builder (join_node_id=$0)", join_node_id_); } +void PhjBuilder::FreeLocalAllocations() const { + if (ht_ctx_.get() != nullptr) ht_ctx_->FreeLocalAllocations(); + for (const FilterContext& ctx : filter_ctxs_) { + if (ctx.expr_eval != nullptr) ctx.expr_eval->FreeLocalAllocations(); + } +} + Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) { RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker)); - RETURN_IF_ERROR( - Expr::Prepare(build_expr_ctxs_, state, row_desc_, expr_mem_tracker_.get())); - expr_ctxs_to_free_.insert( - expr_ctxs_to_free_.end(), build_expr_ctxs_.begin(), build_expr_ctxs_.end()); - - for (const FilterContext& ctx : filters_) { - RETURN_IF_ERROR(ctx.expr_ctx->Prepare(state, row_desc_, expr_mem_tracker_.get())); - expr_ctxs_to_free_.push_back(ctx.expr_ctx); - } - RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, build_expr_ctxs_, + RETURN_IF_ERROR(HashTableCtx::Create(&pool_, state, build_exprs_, build_exprs_, HashTableStoresNulls(), is_not_distinct_from_, state->fragment_hash_seed(), - MAX_PARTITION_DEPTH, row_desc_.tuple_descriptors().size(), mem_tracker_.get(), + MAX_PARTITION_DEPTH, row_desc_.tuple_descriptors().size(), expr_mem_pool(), &ht_ctx_)); + + DCHECK_EQ(filter_exprs_.size(), filter_ctxs_.size()); + for (int i = 0; i < filter_exprs_.size(); ++i) { + RETURN_IF_ERROR(ScalarExprEvaluator::Create(*filter_exprs_[i], state, &pool_, + expr_mem_pool(), &filter_ctxs_[i].expr_eval)); + } RETURN_IF_ERROR(state->block_mgr()->RegisterClient( Substitute("PartitionedHashJoin id=$0 builder=$1", join_node_id_, this), MinRequiredBuffers(), true, mem_tracker_.get(), state, &block_mgr_client_)); @@ -150,9 +164,10 @@ Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) } Status PhjBuilder::Open(RuntimeState* state) { - RETURN_IF_ERROR(Expr::Open(build_expr_ctxs_, state)); - for (const FilterContext& filter : filters_) { - RETURN_IF_ERROR(filter.expr_ctx->Open(state)); + RETURN_IF_ERROR(ht_ctx_->Open(state)); + + for (const FilterContext& ctx : filter_ctxs_) { + RETURN_IF_ERROR(ctx.expr_eval->Open(state)); } RETURN_IF_ERROR(CreateHashPartitions(0)); AllocateRuntimeFilters(); @@ -165,7 +180,7 @@ Status PhjBuilder::Open(RuntimeState* state) { Status PhjBuilder::Send(RuntimeState* state, RowBatch* batch) { SCOPED_TIMER(partition_build_rows_timer_); - bool build_filters = ht_ctx_->level() == 0 && filters_.size() > 0; + bool build_filters = ht_ctx_->level() == 0 && filter_ctxs_.size() > 0; if (process_build_batch_fn_ == NULL) { RETURN_IF_ERROR(ProcessBuildBatch(batch, ht_ctx_.get(), build_filters)); } else { @@ -179,7 +194,7 @@ Status PhjBuilder::Send(RuntimeState* state, RowBatch* batch) { } // Free any local allocations made during partitioning. - ExprContext::FreeLocalAllocations(expr_ctxs_to_free_); + FreeLocalAllocations(); COUNTER_ADD(num_build_rows_partitioned_, batch->num_rows()); return Status::OK(); } @@ -225,19 +240,23 @@ Status PhjBuilder::FlushFinal(RuntimeState* state) { void PhjBuilder::Close(RuntimeState* state) { if (closed_) return; - ExprContext::FreeLocalAllocations(expr_ctxs_to_free_); + FreeLocalAllocations(); CloseAndDeletePartitions(); - if (ht_ctx_ != NULL) ht_ctx_->Close(); - Expr::Close(build_expr_ctxs_, state); - for (const FilterContext& ctx : filters_) ctx.expr_ctx->Close(state); + if (ht_ctx_ != nullptr) ht_ctx_->Close(state); + ht_ctx_.reset(); + for (const FilterContext& ctx : filter_ctxs_) { + if (ctx.expr_eval != nullptr) ctx.expr_eval->Close(state); + } + ScalarExpr::Close(filter_exprs_); if (block_mgr_client_ != NULL) state->block_mgr()->ClearReservations(block_mgr_client_); + ScalarExpr::Close(build_exprs_); pool_.Clear(); DataSink::Close(state); closed_ = true; } void PhjBuilder::Reset() { - ExprContext::FreeLocalAllocations(expr_ctxs_to_free_); + FreeLocalAllocations(); non_empty_build_ = false; CloseAndDeletePartitions(); } @@ -448,13 +467,13 @@ void PhjBuilder::CloseAndDeletePartitions() { } void PhjBuilder::AllocateRuntimeFilters() { - DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || filters_.size() == 0) + DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || filter_ctxs_.size() == 0) << "Runtime filters not supported with NULL_AWARE_LEFT_ANTI_JOIN"; DCHECK(ht_ctx_ != NULL); - for (int i = 0; i < filters_.size(); ++i) { - filters_[i].local_bloom_filter = + for (int i = 0; i < filter_ctxs_.size(); ++i) { + filter_ctxs_[i].local_bloom_filter = runtime_state_->filter_bank()->AllocateScratchBloomFilter( - filters_[i].filter->id()); + filter_ctxs_[i].filter->id()); } } @@ -467,7 +486,7 @@ void PhjBuilder::PublishRuntimeFilters(int64_t num_build_rows) { // poor estimate of the NDV - particularly if the filter expression is a function of // several columns. // TODO: Better heuristic. - for (const FilterContext& ctx : filters_) { + for (const FilterContext& ctx : filter_ctxs_) { // TODO: Consider checking actual number of bits set in filter to compute FP rate. // TODO: Consider checking this every few batches or so. bool fp_rate_too_high = runtime_state_->filter_bank()->FpRateTooHigh( @@ -478,15 +497,15 @@ void PhjBuilder::PublishRuntimeFilters(int64_t num_build_rows) { num_enabled_filters += !fp_rate_too_high; } - if (filters_.size() > 0) { + if (filter_ctxs_.size() > 0) { string info_string; - if (num_enabled_filters == filters_.size()) { - info_string = Substitute("$0 of $0 Runtime Filter$1 Published", filters_.size(), - filters_.size() == 1 ? "" : "s"); + if (num_enabled_filters == filter_ctxs_.size()) { + info_string = Substitute("$0 of $0 Runtime Filter$1 Published", filter_ctxs_.size(), + filter_ctxs_.size() == 1 ? "" : "s"); } else { info_string = Substitute("$0 of $1 Runtime Filter$2 Published, $3 Disabled", - num_enabled_filters, filters_.size(), filters_.size() == 1 ? "" : "s", - filters_.size() - num_enabled_filters); + num_enabled_filters, filter_ctxs_.size(), filter_ctxs_.size() == 1 ? "" : "s", + filter_ctxs_.size() - num_enabled_filters); } profile()->AddInfoString("Runtime filters", info_string); } @@ -689,7 +708,7 @@ Status PhjBuilder::Partition::BuildHashTable(bool* built) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->GetQueryStatus()); // Free any local allocations made while inserting. - ExprContext::FreeLocalAllocations(parent_->expr_ctxs_to_free_); + parent_->FreeLocalAllocations(); batch.Reset(); } while (!eos); @@ -791,9 +810,9 @@ Status PhjBuilder::CodegenProcessBuildBatch(LlvmCodeGen* codegen, // Always build runtime filters at level0 (if there are any). // Note that the first argument of this function is the return value. - Value* build_filters_l0_arg = codegen->GetArgument(process_build_batch_fn_level0, 4); - build_filters_l0_arg->replaceAllUsesWith( - ConstantInt::get(Type::getInt1Ty(codegen->context()), filters_.size() > 0)); + Value* build_filter_l0_arg = codegen->GetArgument(process_build_batch_fn_level0, 4); + build_filter_l0_arg->replaceAllUsesWith( + ConstantInt::get(Type::getInt1Ty(codegen->context()), filter_ctxs_.size() > 0)); // process_build_batch_fn_level0 uses CRC hash if available, replaced = @@ -808,8 +827,8 @@ Status PhjBuilder::CodegenProcessBuildBatch(LlvmCodeGen* codegen, // Never build filters after repartitioning, as all rows have already been added to the // filters during the level0 build. Note that the first argument of this function is the // return value. - Value* build_filters_arg = codegen->GetArgument(process_build_batch_fn, 4); - build_filters_arg->replaceAllUsesWith( + Value* build_filter_arg = codegen->GetArgument(process_build_batch_fn, 4); + build_filter_arg->replaceAllUsesWith( ConstantInt::get(Type::getInt1Ty(codegen->context()), false)); // Finalize ProcessBuildBatch functions
