IMPALA-4452: Always call AggFnEvaluator::Open() before AggFnEvaluator::Init()
As part of the fix for IMPALA-2379, the expression contexts of aggregation function evaluators are expected to be opened before their initFn() are called so \ constant arguments can be accessed in initFn(). However, the legacy aggregation node wasn't updated to follow this order for singleton result tuple (i.e. no group-by). This patch fixes the problem by deferring the creation of the singleton tuple to a point in AggregationNode::Open() after the expression contexts of all aggregate function evaluators have been opened. PartitionedAggregationNode() was already updated to follow this order. This patch also fixes a minor bug in which uninitialized entries of agg_fn_ctxs_[] may be accessed in AggregationNode::Close() if AggregationNode::Prepare() fails. Change-Id: I2f261dee47821c517d8dbe1babf4112462d85807 Reviewed-on: http://gerrit.cloudera.org:8080/5049 Reviewed-by: Michael Ho <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/cac02d6b Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/cac02d6b Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/cac02d6b Branch: refs/heads/master Commit: cac02d6b767601ce308050f57520b65285566183 Parents: 10a4c5a Author: Michael Ho <[email protected]> Authored: Thu Nov 10 00:15:06 2016 -0800 Committer: Internal Jenkins <[email protected]> Committed: Mon Nov 14 22:38:09 2016 +0000 ---------------------------------------------------------------------- be/src/exec/aggregation-node.cc | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cac02d6b/be/src/exec/aggregation-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc index 0607aa3..a5c7490 100644 --- a/be/src/exec/aggregation-node.cc +++ b/be/src/exec/aggregation-node.cc @@ -141,30 +141,23 @@ Status AggregationNode::Prepare(RuntimeState* state) { Expr::Prepare(build_expr_ctxs_, state, build_row_desc, expr_mem_tracker())); AddExprCtxsToFree(build_expr_ctxs_); - agg_fn_ctxs_.resize(aggregate_evaluators_.size()); int j = probe_expr_ctxs_.size(); for (int i = 0; i < aggregate_evaluators_.size(); ++i, ++j) { SlotDescriptor* intermediate_slot_desc = intermediate_tuple_desc_->slots()[j]; SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j]; + FunctionContext* agg_fn_ctx; RETURN_IF_ERROR(aggregate_evaluators_[i]->Prepare(state, child(0)->row_desc(), - intermediate_slot_desc, output_slot_desc, agg_fn_pool_.get(), &agg_fn_ctxs_[i])); - state->obj_pool()->Add(agg_fn_ctxs_[i]); + intermediate_slot_desc, output_slot_desc, agg_fn_pool_.get(), &agg_fn_ctx)); + agg_fn_ctxs_.push_back(agg_fn_ctx); + state->obj_pool()->Add(agg_fn_ctx); } + DCHECK_EQ(agg_fn_ctxs_.size(), aggregate_evaluators_.size()); // TODO: how many buckets? hash_tbl_.reset(new OldHashTable(state, build_expr_ctxs_, probe_expr_ctxs_, vector<ExprContext*>(), 1, true, vector<bool>(build_expr_ctxs_.size(), true), id(), mem_tracker(), vector<RuntimeFilter*>(), true)); - if (probe_expr_ctxs_.empty()) { - // create single intermediate tuple now; we need to output something - // even if our input is empty - singleton_intermediate_tuple_ = ConstructIntermediateTuple(); - // Check for failures during AggFnEvaluator::Init(). - RETURN_IF_ERROR(state->GetQueryStatus()); - hash_tbl_->Insert(singleton_intermediate_tuple_); - output_iterator_ = hash_tbl_->Begin(); - } if (!state->codegen_enabled()) { runtime_profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN"); } @@ -202,6 +195,15 @@ Status AggregationNode::Open(RuntimeState* state) { RETURN_IF_ERROR(aggregate_evaluators_[i]->Open(state, agg_fn_ctxs_[i])); } + if (probe_expr_ctxs_.empty()) { + // Create single intermediate tuple. This must happen after + // opening the aggregate evaluators. + singleton_intermediate_tuple_ = ConstructIntermediateTuple(); + // Check for failures during AggFnEvaluator::Init(). + RETURN_IF_ERROR(state->GetQueryStatus()); + hash_tbl_->Insert(singleton_intermediate_tuple_); + } + RETURN_IF_ERROR(children_[0]->Open(state)); RowBatch batch(children_[0]->row_desc(), state->batch_size(), mem_tracker()); @@ -319,10 +321,11 @@ void AggregationNode::Close(RuntimeState* state) { if (hash_tbl_.get() != NULL) hash_tbl_->Close(); agg_expr_ctxs_.clear(); - DCHECK(agg_fn_ctxs_.empty() || aggregate_evaluators_.size() == agg_fn_ctxs_.size()); - for (int i = 0; i < aggregate_evaluators_.size(); ++i) { - aggregate_evaluators_[i]->Close(state); - if (!agg_fn_ctxs_.empty()) agg_fn_ctxs_[i]->impl()->Close(); + for (AggFnEvaluator* aggregate_evaluator : aggregate_evaluators_) { + aggregate_evaluator->Close(state); + } + for (FunctionContext* agg_fn_ctx : agg_fn_ctxs_) { + agg_fn_ctx->impl()->Close(); } if (agg_fn_pool_.get() != NULL) agg_fn_pool_->FreeAll();
