http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/unnest-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/unnest-node.cc b/be/src/exec/unnest-node.cc index bf467bd..5bde655 100644 --- a/be/src/exec/unnest-node.cc +++ b/be/src/exec/unnest-node.cc @@ -18,7 +18,7 @@ #include "common/status.h" #include "exec/unnest-node.h" #include "exec/subplan-node.h" -#include "exprs/expr-context.h" +#include "exprs/scalar-expr-evaluator.h" #include "exprs/slot-ref.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" @@ -33,31 +33,40 @@ UnnestNode::UnnestNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), item_byte_size_(0), - coll_expr_ctx_(NULL), - coll_slot_desc_(NULL), + thrift_coll_expr_(tnode.unnest_node.collection_expr), + coll_expr_(nullptr), + coll_expr_eval_(nullptr), + coll_slot_desc_(nullptr), coll_tuple_idx_(-1), - coll_value_(NULL), + coll_value_(nullptr), item_idx_(0), num_collections_(0), total_collection_size_(0), max_collection_size_(-1), min_collection_size_(-1), - avg_collection_size_counter_(NULL), - max_collection_size_counter_(NULL), - min_collection_size_counter_(NULL), - num_collections_counter_(NULL) { + avg_collection_size_counter_(nullptr), + max_collection_size_counter_(nullptr), + min_collection_size_counter_(nullptr), + num_collections_counter_(nullptr) { } Status UnnestNode::Init(const TPlanNode& tnode, RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::Init(tnode, state)); DCHECK(tnode.__isset.unnest_node); - Expr::CreateExprTree(pool_, tnode.unnest_node.collection_expr, &coll_expr_ctx_); + RETURN_IF_ERROR(ExecNode::Init(tnode, state)); + return Status::OK(); +} + +Status UnnestNode::InitCollExpr(RuntimeState* state) { + DCHECK(containing_subplan_ != nullptr) + << "set_containing_subplan() must have been called"; + const RowDescriptor& row_desc = containing_subplan_->child(0)->row_desc(); + RETURN_IF_ERROR(ScalarExpr::Create(thrift_coll_expr_, row_desc, state, &coll_expr_)); + DCHECK(coll_expr_->IsSlotRef()); return Status::OK(); } Status UnnestNode::Prepare(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); - DCHECK(containing_subplan_ != NULL) << "set_containing_subplan() must be called"; RETURN_IF_ERROR(ExecNode::Prepare(state)); avg_collection_size_counter_ = @@ -71,17 +80,18 @@ Status UnnestNode::Prepare(RuntimeState* state) { DCHECK_EQ(1, row_desc().tuple_descriptors().size()); const TupleDescriptor* item_tuple_desc = row_desc().tuple_descriptors()[0]; - DCHECK(item_tuple_desc != NULL); + DCHECK(item_tuple_desc != nullptr); item_byte_size_ = item_tuple_desc->byte_size(); - RETURN_IF_ERROR(coll_expr_ctx_->Prepare( - state, containing_subplan_->child(0)->row_desc(), expr_mem_tracker())); + + RETURN_IF_ERROR(ScalarExprEvaluator::Create(*coll_expr_, state, pool_, + expr_mem_pool(), &coll_expr_eval_)); // Set the coll_slot_desc_ and the corresponding tuple index used for manually // evaluating the collection SlotRef and for projection. - DCHECK(coll_expr_ctx_->root()->is_slotref()); - const SlotRef* slot_ref = static_cast<SlotRef*>(coll_expr_ctx_->root()); + DCHECK(coll_expr_->IsSlotRef()); + const SlotRef* slot_ref = static_cast<SlotRef*>(coll_expr_); coll_slot_desc_ = state->desc_tbl().GetSlotDescriptor(slot_ref->slot_id()); - DCHECK(coll_slot_desc_ != NULL); + DCHECK(coll_slot_desc_ != nullptr); const RowDescriptor& row_desc = containing_subplan_->child(0)->row_desc(); coll_tuple_idx_ = row_desc.GetTupleIdx(coll_slot_desc_->parent()->id()); @@ -91,17 +101,17 @@ Status UnnestNode::Prepare(RuntimeState* state) { Status UnnestNode::Open(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(ExecNode::Open(state)); - RETURN_IF_ERROR(coll_expr_ctx_->Open(state)); + RETURN_IF_ERROR(coll_expr_eval_->Open(state)); - DCHECK(containing_subplan_->current_row() != NULL); + DCHECK(containing_subplan_->current_row() != nullptr); Tuple* tuple = containing_subplan_->current_input_row_->GetTuple(coll_tuple_idx_); - if (tuple != NULL) { + if (tuple != nullptr) { // Retrieve the collection value to be unnested directly from the tuple. We purposely // ignore the null bit of the slot because we may have set it in a previous Open() of // this same unnest node for projection. coll_value_ = reinterpret_cast<const CollectionValue*>( tuple->GetSlot(coll_slot_desc_->tuple_offset())); - // Projection: Set the slot containing the collection value to NULL. + // Projection: Set the slot containing the collection value to nullptr. tuple->SetNull(coll_slot_desc_->null_indicator_offset()); } else { coll_value_ = &EMPTY_COLLECTION_VALUE; @@ -134,7 +144,7 @@ Status UnnestNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) *eos = false; // Populate the output row_batch with tuples from the collection. - DCHECK(coll_value_ != NULL); + DCHECK(coll_value_ != nullptr); DCHECK_GE(coll_value_->num_tuples, 0); while (item_idx_ < coll_value_->num_tuples) { Tuple* item = @@ -144,7 +154,8 @@ Status UnnestNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) TupleRow* row = row_batch->GetRow(row_idx); row->SetTuple(0, item); // TODO: Ideally these should be evaluated by the parent scan node. - if (EvalConjuncts(&conjunct_ctxs_[0], conjunct_ctxs_.size(), row)) { + DCHECK_EQ(conjuncts_.size(), conjunct_evals_.size()); + if (EvalConjuncts(conjunct_evals_.data(), conjuncts_.size(), row)) { row_batch->CommitLastRow(); // The limit is handled outside of this loop. if (row_batch->AtCapacity()) break; @@ -171,8 +182,8 @@ Status UnnestNode::Reset(RuntimeState* state) { void UnnestNode::Close(RuntimeState* state) { if (is_closed()) return; - DCHECK(coll_expr_ctx_ != NULL); - coll_expr_ctx_->Close(state); + if (coll_expr_eval_ != nullptr) coll_expr_eval_->Close(state); + if (coll_expr_ != nullptr) coll_expr_->Close(); ExecNode::Close(state); }
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/unnest-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/unnest-node.h b/be/src/exec/unnest-node.h index 1aeca74..247dd76 100644 --- a/be/src/exec/unnest-node.h +++ b/be/src/exec/unnest-node.h @@ -19,7 +19,8 @@ #define IMPALA_EXEC_UNNEST_NODE_H_ #include "exec/exec-node.h" -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" +#include "runtime/collection-value.h" namespace impala { @@ -61,6 +62,10 @@ class UnnestNode : public ExecNode { virtual Status Reset(RuntimeState* state); virtual void Close(RuntimeState* state); + /// Initializes the expression which produces the collection to be unnested. + /// Called by the containing subplan node. + Status InitCollExpr(RuntimeState* state); + private: friend class SubplanNode; @@ -72,9 +77,11 @@ class UnnestNode : public ExecNode { /// Expr that produces the collection to be unnested. Currently always a SlotRef into an /// collection-typed slot. We do not evaluate this expr for setting coll_value_, but /// instead manually retrieve the slot value to support projection (see class comment). - ExprContext* coll_expr_ctx_; + const TExpr& thrift_coll_expr_; + ScalarExpr* coll_expr_; + ScalarExprEvaluator* coll_expr_eval_; - /// Descriptor of the collection-typed slot referenced by coll_expr_ctx_. Set in + /// Descriptor of the collection-typed slot referenced by coll_expr_eval_. Set in /// Prepare(). This slot is always set to NULL in Open() as a simple projection. const SlotDescriptor* coll_slot_desc_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt index 660b951..1fe3bcb 100644 --- a/be/src/exprs/CMakeLists.txt +++ b/be/src/exprs/CMakeLists.txt @@ -23,7 +23,9 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exprs") set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exprs") add_library(Exprs + agg-fn.cc agg-fn-evaluator.cc + agg-fn-evaluator-ir.cc aggregate-functions-ir.cc anyval-util.cc bit-byte-functions-ir.cc @@ -36,8 +38,6 @@ add_library(Exprs decimal-functions-ir.cc decimal-operators-ir.cc expr.cc - expr-context.cc - expr-ir.cc hive-udf-call.cc in-predicate-ir.cc is-not-empty-predicate.cc @@ -49,6 +49,9 @@ add_library(Exprs math-functions-ir.cc null-literal.cc operators-ir.cc + scalar-expr.cc + scalar-expr-evaluator.cc + scalar-expr-ir.cc slot-ref.cc string-functions-ir.cc timestamp-functions.cc http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/agg-fn-evaluator-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/agg-fn-evaluator-ir.cc b/be/src/exprs/agg-fn-evaluator-ir.cc new file mode 100644 index 0000000..407bdcb --- /dev/null +++ b/be/src/exprs/agg-fn-evaluator-ir.cc @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/agg-fn-evaluator.h" + +using namespace impala; + +FunctionContext* AggFnEvaluator::agg_fn_ctx() const { + return agg_fn_ctx_.get(); +} + +ScalarExprEvaluator* const* AggFnEvaluator::input_evals() const { + return input_evals_.data(); +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/agg-fn-evaluator.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/agg-fn-evaluator.cc b/be/src/exprs/agg-fn-evaluator.cc index 81c0277..910eef1 100644 --- a/be/src/exprs/agg-fn-evaluator.cc +++ b/be/src/exprs/agg-fn-evaluator.cc @@ -24,7 +24,8 @@ #include "exec/aggregation-node.h" #include "exprs/aggregate-functions.h" #include "exprs/anyval-util.h" -#include "exprs/expr-context.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "exprs/scalar-fn-call.h" #include "runtime/lib-cache.h" #include "runtime/raw-value.h" @@ -68,186 +69,134 @@ typedef StringVal (*SerializeFn)(FunctionContext*, const StringVal&); typedef AnyVal (*GetValueFn)(FunctionContext*, const AnyVal&); typedef AnyVal (*FinalizeFn)(FunctionContext*, const AnyVal&); -Status AggFnEvaluator::Create(ObjectPool* pool, const TExpr& desc, - AggFnEvaluator** result) { - return Create(pool, desc, false, result); +const char* AggFnEvaluator::LLVM_CLASS_NAME = "class.impala::AggFnEvaluator"; + +AggFnEvaluator::AggFnEvaluator(const AggFn& agg_fn, MemPool* mem_pool, bool is_clone) + : is_clone_(is_clone), + agg_fn_(agg_fn), + mem_pool_(mem_pool) { } -Status AggFnEvaluator::Create(ObjectPool* pool, const TExpr& desc, - bool is_analytic_fn, AggFnEvaluator** result) { - DCHECK_GT(desc.nodes.size(), 0); - *result = pool->Add(new AggFnEvaluator(desc.nodes[0], is_analytic_fn)); - int node_idx = 0; - for (int i = 0; i < desc.nodes[0].num_children; ++i) { - ++node_idx; - Expr* expr = NULL; - ExprContext* ctx = NULL; - RETURN_IF_ERROR(Expr::CreateTreeFromThrift( - pool, desc.nodes, NULL, &node_idx, &expr, &ctx)); - (*result)->input_expr_ctxs_.push_back(ctx); - } - return Status::OK(); +AggFnEvaluator::~AggFnEvaluator() { + DCHECK(closed_); } -AggFnEvaluator::AggFnEvaluator(const TExprNode& desc, bool is_analytic_fn) - : fn_(desc.fn), - is_merge_(desc.agg_expr.is_merge_agg), - is_analytic_fn_(is_analytic_fn), - intermediate_slot_desc_(NULL), - output_slot_desc_(NULL), - arg_type_descs_(AnyValUtil::ColumnTypesToTypeDescs( - ColumnType::FromThrift(desc.agg_expr.arg_types))), - cache_entry_(NULL), - init_fn_(NULL), - update_fn_(NULL), - remove_fn_(NULL), - merge_fn_(NULL), - serialize_fn_(NULL), - get_value_fn_(NULL), - finalize_fn_(NULL) { - DCHECK(desc.fn.__isset.aggregate_fn); - DCHECK(desc.node_type == TExprNodeType::AGGREGATE_EXPR); - // TODO: remove. See comment with AggregationOp - if (fn_.name.function_name == "count") { - agg_op_ = COUNT; - } else if (fn_.name.function_name == "min") { - agg_op_ = MIN; - } else if (fn_.name.function_name == "max") { - agg_op_ = MAX; - } else if (fn_.name.function_name == "sum") { - agg_op_ = SUM; - } else if (fn_.name.function_name == "avg") { - agg_op_ = AVG; - } else if (fn_.name.function_name == "ndv" || - fn_.name.function_name == "ndv_no_finalize") { - agg_op_ = NDV; - } else { - agg_op_ = OTHER; - } +const SlotDescriptor& AggFnEvaluator::intermediate_slot_desc() const { + return agg_fn_.intermediate_slot_desc(); } -AggFnEvaluator::~AggFnEvaluator() { - DCHECK(cache_entry_ == NULL) << "Need to call Close()"; +const ColumnType& AggFnEvaluator::intermediate_type() const { + return agg_fn_.intermediate_type(); } -Status AggFnEvaluator::Prepare(RuntimeState* state, const RowDescriptor& desc, - const SlotDescriptor* intermediate_slot_desc, - const SlotDescriptor* output_slot_desc, - MemPool* agg_fn_pool, FunctionContext** agg_fn_ctx) { - DCHECK(intermediate_slot_desc != NULL); - DCHECK_EQ(intermediate_slot_desc->type().type, - ColumnType::FromThrift(fn_.aggregate_fn.intermediate_type).type); - DCHECK(intermediate_slot_desc_ == NULL); - intermediate_slot_desc_ = intermediate_slot_desc; - - DCHECK(output_slot_desc != NULL); - DCHECK_EQ(output_slot_desc->type().type, ColumnType::FromThrift(fn_.ret_type).type); - DCHECK(output_slot_desc_ == NULL); - output_slot_desc_ = output_slot_desc; - - RETURN_IF_ERROR( - Expr::Prepare(input_expr_ctxs_, state, desc, agg_fn_pool->mem_tracker())); - - for (int i = 0; i < input_expr_ctxs_.size(); ++i) { +Status AggFnEvaluator::Create(const AggFn& agg_fn, RuntimeState* state, ObjectPool* pool, + MemPool* mem_pool, AggFnEvaluator** result) { + *result = nullptr; + + // Create a new AggFn evaluator. + AggFnEvaluator* agg_fn_eval = pool->Add(new AggFnEvaluator(agg_fn, mem_pool, false)); + agg_fn_eval->agg_fn_ctx_.reset(FunctionContextImpl::CreateContext(state, mem_pool, + agg_fn.GetIntermediateTypeDesc(), agg_fn.GetOutputTypeDesc(), + agg_fn.arg_type_descs())); + + Status status; + // Create the evaluators for the input expressions. + for (const ScalarExpr* input_expr : agg_fn.children()) { + ScalarExprEvaluator* input_eval; + status = ScalarExprEvaluator::Create(*input_expr, state, pool, mem_pool, &input_eval); + if (UNLIKELY(!status.ok())) goto cleanup; + agg_fn_eval->input_evals_.push_back(input_eval); + DCHECK(&input_eval->root() == input_expr); + AnyVal* staging_input_val; - RETURN_IF_ERROR( - AllocateAnyVal(state, agg_fn_pool, input_expr_ctxs_[i]->root()->type(), - "Could not allocate aggregate expression input value", &staging_input_val)); - staging_input_vals_.push_back(staging_input_val); + status = AllocateAnyVal(state, mem_pool, input_expr->type(), + "Could not allocate aggregate expression input value", &staging_input_val); + agg_fn_eval->staging_input_vals_.push_back(staging_input_val); + if (UNLIKELY(!status.ok())) goto cleanup; } - RETURN_IF_ERROR(AllocateAnyVal(state, agg_fn_pool, intermediate_type(), + DCHECK_EQ(agg_fn.GetNumChildren(), agg_fn_eval->input_evals_.size()); + DCHECK_EQ(agg_fn_eval->staging_input_vals_.size(), agg_fn_eval->input_evals_.size()); + + status = AllocateAnyVal(state, mem_pool, agg_fn.intermediate_type(), "Could not allocate aggregate expression intermediate value", - &staging_intermediate_val_)); - RETURN_IF_ERROR(AllocateAnyVal(state, agg_fn_pool, intermediate_type(), + &(agg_fn_eval->staging_intermediate_val_)); + if (UNLIKELY(!status.ok())) goto cleanup; + status = AllocateAnyVal(state, mem_pool, agg_fn.intermediate_type(), "Could not allocate aggregate expression merge input value", - &staging_merge_input_val_)); + &(agg_fn_eval->staging_merge_input_val_)); + if (UNLIKELY(!status.ok())) goto cleanup; - if (is_merge_) { - DCHECK_EQ(staging_input_vals_.size(), 1) << "Merge should only have 1 input."; + if (agg_fn.is_merge()) { + DCHECK_EQ(agg_fn_eval->staging_input_vals_.size(), 1) + << "Merge should only have 1 input."; } + *result = agg_fn_eval; + return Status::OK(); - // Load the function pointers. Merge is not required if this is evaluating an - // analytic function. - if (fn_.aggregate_fn.init_fn_symbol.empty() || - fn_.aggregate_fn.update_fn_symbol.empty() || - (!is_analytic_fn_ && fn_.aggregate_fn.merge_fn_symbol.empty())) { - // This path is only for partially implemented builtins. - DCHECK_EQ(fn_.binary_type, TFunctionBinaryType::BUILTIN); - stringstream ss; - ss << "Function " << fn_.name.function_name << " is not implemented."; - return Status(ss.str()); - } - - RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr( - fn_.hdfs_location, fn_.aggregate_fn.init_fn_symbol, &init_fn_, &cache_entry_)); - RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr( - fn_.hdfs_location, fn_.aggregate_fn.update_fn_symbol, &update_fn_, &cache_entry_)); - - // Merge() is not loaded if evaluating the agg fn as an analytic function. - if (!is_analytic_fn_) { - RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location, - fn_.aggregate_fn.merge_fn_symbol, &merge_fn_, &cache_entry_)); - } +cleanup: + DCHECK(!status.ok()); + agg_fn_eval->Close(state); + return status; +} - // Serialize(), GetValue(), Remove() and Finalize() are optional - if (!fn_.aggregate_fn.serialize_fn_symbol.empty()) { - RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr( - fn_.hdfs_location, fn_.aggregate_fn.serialize_fn_symbol, &serialize_fn_, - &cache_entry_)); - } - if (!fn_.aggregate_fn.get_value_fn_symbol.empty()) { - RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr( - fn_.hdfs_location, fn_.aggregate_fn.get_value_fn_symbol, &get_value_fn_, - &cache_entry_)); - } - if (!fn_.aggregate_fn.remove_fn_symbol.empty()) { - RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location, - fn_.aggregate_fn.remove_fn_symbol, &remove_fn_, &cache_entry_)); +Status AggFnEvaluator::Create(const vector<AggFn*>& agg_fns, RuntimeState* state, + ObjectPool* pool, MemPool* mem_pool, vector<AggFnEvaluator*>* evals) { + for (const AggFn* agg_fn : agg_fns) { + AggFnEvaluator* agg_fn_eval; + RETURN_IF_ERROR(AggFnEvaluator::Create(*agg_fn, state, pool, mem_pool, &agg_fn_eval)); + evals->push_back(agg_fn_eval); } - if (!fn_.aggregate_fn.finalize_fn_symbol.empty()) { - RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location, - fn_.aggregate_fn.finalize_fn_symbol, &finalize_fn_, &cache_entry_)); - } - *agg_fn_ctx = FunctionContextImpl::CreateContext(state, agg_fn_pool, - GetIntermediateTypeDesc(), GetOutputTypeDesc(), arg_type_descs_); return Status::OK(); } -Status AggFnEvaluator::Open(RuntimeState* state, FunctionContext* agg_fn_ctx) { - RETURN_IF_ERROR(Expr::Open(input_expr_ctxs_, state)); +Status AggFnEvaluator::Open(RuntimeState* state) { + if (opened_) return Status::OK(); + opened_ = true; + RETURN_IF_ERROR(ScalarExprEvaluator::Open(input_evals_, state)); // Now that we have opened all our input exprs, it is safe to evaluate any constant // values for the UDA's FunctionContext (we cannot evaluate exprs before calling Open() // on them). - vector<AnyVal*> constant_args(input_expr_ctxs_.size()); - for (int i = 0; i < input_expr_ctxs_.size(); ++i) { - ExprContext* input_ctx = input_expr_ctxs_[i]; - AnyVal* const_val; - RETURN_IF_ERROR(input_ctx->root()->GetConstVal(state, input_ctx, &const_val)); - constant_args[i] = const_val; + vector<AnyVal*> constant_args(input_evals_.size(), nullptr); + for (int i = 0; i < input_evals_.size(); ++i) { + ScalarExprEvaluator* eval = input_evals_[i]; + RETURN_IF_ERROR(eval->GetConstValue(state, *(agg_fn_.GetChild(i)), + &constant_args[i])); } - agg_fn_ctx->impl()->SetConstantArgs(move(constant_args)); + agg_fn_ctx_->impl()->SetConstantArgs(move(constant_args)); + return Status::OK(); +} + +Status AggFnEvaluator::Open( + const vector<AggFnEvaluator*>& evals, RuntimeState* state) { + for (AggFnEvaluator* eval : evals) RETURN_IF_ERROR(eval->Open(state)); return Status::OK(); } void AggFnEvaluator::Close(RuntimeState* state) { - Expr::Close(input_expr_ctxs_, state); + if (closed_) return; + closed_ = true; + if (!is_clone_) ScalarExprEvaluator::Close(input_evals_, state); + FreeLocalAllocations(); + agg_fn_ctx_->impl()->Close(); + agg_fn_ctx_.reset(); + input_evals_.clear(); +} - if (cache_entry_ != NULL) { - LibCache::instance()->DecrementUseCount(cache_entry_); - cache_entry_ = NULL; - } +void AggFnEvaluator::Close( + const vector<AggFnEvaluator*>& evals, RuntimeState* state) { + for (AggFnEvaluator* eval : evals) eval->Close(state); } -inline void AggFnEvaluator::SetDstSlot(FunctionContext* ctx, const AnyVal* src, - const SlotDescriptor* dst_slot_desc, Tuple* dst) { +void AggFnEvaluator::SetDstSlot(const AnyVal* src, const SlotDescriptor& dst_slot_desc, + Tuple* dst) { if (src->is_null) { - dst->SetNull(dst_slot_desc->null_indicator_offset()); + dst->SetNull(dst_slot_desc.null_indicator_offset()); return; } - dst->SetNotNull(dst_slot_desc->null_indicator_offset()); - void* slot = dst->GetSlot(dst_slot_desc->tuple_offset()); - switch (dst_slot_desc->type().type) { + dst->SetNotNull(dst_slot_desc.null_indicator_offset()); + void* slot = dst->GetSlot(dst_slot_desc.tuple_offset()); + switch (dst_slot_desc.type().type) { case TYPE_NULL: return; case TYPE_BOOLEAN: @@ -278,7 +227,7 @@ inline void AggFnEvaluator::SetDstSlot(FunctionContext* ctx, const AnyVal* src, return; case TYPE_CHAR: if (slot != reinterpret_cast<const StringVal*>(src)->ptr) { - ctx->SetError("UDA should not set pointer of CHAR(N) intermediate"); + agg_fn_ctx_->SetError("UDA should not set pointer of CHAR(N) intermediate"); } return; case TYPE_TIMESTAMP: @@ -286,7 +235,7 @@ inline void AggFnEvaluator::SetDstSlot(FunctionContext* ctx, const AnyVal* src, *reinterpret_cast<const TimestampVal*>(src)); return; case TYPE_DECIMAL: - switch (dst_slot_desc->type().GetByteSize()) { + switch (dst_slot_desc.type().GetByteSize()) { case 4: *reinterpret_cast<int32_t*>(slot) = reinterpret_cast<const DecimalVal*>(src)->val4; @@ -310,96 +259,101 @@ inline void AggFnEvaluator::SetDstSlot(FunctionContext* ctx, const AnyVal* src, break; } default: - DCHECK(false) << "NYI: " << dst_slot_desc->type(); + DCHECK(false) << "NYI: " << dst_slot_desc.type(); } } // This function would be replaced in codegen. -void AggFnEvaluator::Init(FunctionContext* agg_fn_ctx, Tuple* dst) { - DCHECK(init_fn_ != NULL); - for (ExprContext* ctx : input_expr_ctxs_) DCHECK(ctx->opened()); +void AggFnEvaluator::Init(Tuple* dst) { + DCHECK(opened_); + DCHECK(agg_fn_.init_fn_ != nullptr); + for (ScalarExprEvaluator* input_eval : input_evals_) { + DCHECK(input_eval->opened()); + } - if (intermediate_type().type == TYPE_CHAR) { + const ColumnType& type = intermediate_type(); + const SlotDescriptor& slot_desc = intermediate_slot_desc(); + if (type.type == TYPE_CHAR) { // For type char, we want to initialize the staging_intermediate_val_ with // a pointer into the tuple (the UDA should not be allocating it). - void* slot = dst->GetSlot(intermediate_slot_desc_->tuple_offset()); + void* slot = dst->GetSlot(slot_desc.tuple_offset()); StringVal* sv = reinterpret_cast<StringVal*>(staging_intermediate_val_); - sv->is_null = dst->IsNull(intermediate_slot_desc_->null_indicator_offset()); - sv->ptr = reinterpret_cast<uint8_t*>( - StringValue::CharSlotToPtr(slot, intermediate_type())); - sv->len = intermediate_type().len; + sv->is_null = dst->IsNull(slot_desc.null_indicator_offset()); + sv->ptr = reinterpret_cast<uint8_t*>(StringValue::CharSlotToPtr(slot, type)); + sv->len = type.len; } - reinterpret_cast<InitFn>(init_fn_)(agg_fn_ctx, staging_intermediate_val_); - SetDstSlot(agg_fn_ctx, staging_intermediate_val_, intermediate_slot_desc_, dst); - agg_fn_ctx->impl()->set_num_updates(0); - agg_fn_ctx->impl()->set_num_removes(0); + reinterpret_cast<InitFn>(agg_fn_.init_fn_)( + agg_fn_ctx_.get(), staging_intermediate_val_); + SetDstSlot(staging_intermediate_val_, slot_desc, dst); + agg_fn_ctx_->impl()->set_num_updates(0); + agg_fn_ctx_->impl()->set_num_removes(0); } -static void SetAnyVal(const SlotDescriptor* desc, Tuple* tuple, AnyVal* dst) { - bool is_null = tuple->IsNull(desc->null_indicator_offset()); - void* slot = NULL; - if (!is_null) slot = tuple->GetSlot(desc->tuple_offset()); - AnyValUtil::SetAnyVal(slot, desc->type(), dst); +static void SetAnyVal(const SlotDescriptor& desc, Tuple* tuple, AnyVal* dst) { + bool is_null = tuple->IsNull(desc.null_indicator_offset()); + void* slot = nullptr; + if (!is_null) slot = tuple->GetSlot(desc.tuple_offset()); + AnyValUtil::SetAnyVal(slot, desc.type(), dst); } -void AggFnEvaluator::Update( - FunctionContext* agg_fn_ctx, const TupleRow* row, Tuple* dst, void* fn) { - if (fn == NULL) return; +void AggFnEvaluator::Update(const TupleRow* row, Tuple* dst, void* fn) { + if (fn == nullptr) return; - SetAnyVal(intermediate_slot_desc_, dst, staging_intermediate_val_); + const SlotDescriptor& slot_desc = intermediate_slot_desc(); + SetAnyVal(slot_desc, dst, staging_intermediate_val_); - for (int i = 0; i < input_expr_ctxs_.size(); ++i) { - void* src_slot = input_expr_ctxs_[i]->GetValue(row); - AnyValUtil::SetAnyVal( - src_slot, input_expr_ctxs_[i]->root()->type(), staging_input_vals_[i]); + for (int i = 0; i < input_evals_.size(); ++i) { + void* src_slot = input_evals_[i]->GetValue(row); + DCHECK(&input_evals_[i]->root() == agg_fn_.GetChild(i)); + AnyValUtil::SetAnyVal(src_slot, agg_fn_.GetChild(i)->type(), staging_input_vals_[i]); } // TODO: this part is not so good and not scalable. It can be replaced with // codegen but we can also consider leaving it for the first few cases for // debugging. - switch (input_expr_ctxs_.size()) { + switch (input_evals_.size()) { case 0: - reinterpret_cast<UpdateFn0>(fn)(agg_fn_ctx, staging_intermediate_val_); + reinterpret_cast<UpdateFn0>(fn)(agg_fn_ctx_.get(), staging_intermediate_val_); break; case 1: - reinterpret_cast<UpdateFn1>(fn)(agg_fn_ctx, + reinterpret_cast<UpdateFn1>(fn)(agg_fn_ctx_.get(), *staging_input_vals_[0], staging_intermediate_val_); break; case 2: - reinterpret_cast<UpdateFn2>(fn)(agg_fn_ctx, + reinterpret_cast<UpdateFn2>(fn)(agg_fn_ctx_.get(), *staging_input_vals_[0], *staging_input_vals_[1], staging_intermediate_val_); break; case 3: - reinterpret_cast<UpdateFn3>(fn)(agg_fn_ctx, + reinterpret_cast<UpdateFn3>(fn)(agg_fn_ctx_.get(), *staging_input_vals_[0], *staging_input_vals_[1], *staging_input_vals_[2], staging_intermediate_val_); break; case 4: - reinterpret_cast<UpdateFn4>(fn)(agg_fn_ctx, + reinterpret_cast<UpdateFn4>(fn)(agg_fn_ctx_.get(), *staging_input_vals_[0], *staging_input_vals_[1], *staging_input_vals_[2], *staging_input_vals_[3], staging_intermediate_val_); break; case 5: - reinterpret_cast<UpdateFn5>(fn)(agg_fn_ctx, + reinterpret_cast<UpdateFn5>(fn)(agg_fn_ctx_.get(), *staging_input_vals_[0], *staging_input_vals_[1], *staging_input_vals_[2], *staging_input_vals_[3], *staging_input_vals_[4], staging_intermediate_val_); break; case 6: - reinterpret_cast<UpdateFn6>(fn)(agg_fn_ctx, + reinterpret_cast<UpdateFn6>(fn)(agg_fn_ctx_.get(), *staging_input_vals_[0], *staging_input_vals_[1], *staging_input_vals_[2], *staging_input_vals_[3], *staging_input_vals_[4], *staging_input_vals_[5], staging_intermediate_val_); break; case 7: - reinterpret_cast<UpdateFn7>(fn)(agg_fn_ctx, + reinterpret_cast<UpdateFn7>(fn)(agg_fn_ctx_.get(), *staging_input_vals_[0], *staging_input_vals_[1], *staging_input_vals_[2], *staging_input_vals_[3], *staging_input_vals_[4], *staging_input_vals_[5], *staging_input_vals_[6], staging_intermediate_val_); break; case 8: - reinterpret_cast<UpdateFn8>(fn)(agg_fn_ctx, + reinterpret_cast<UpdateFn8>(fn)(agg_fn_ctx_.get(), *staging_input_vals_[0], *staging_input_vals_[1], *staging_input_vals_[2], *staging_input_vals_[3], *staging_input_vals_[4], *staging_input_vals_[5], @@ -409,101 +363,113 @@ void AggFnEvaluator::Update( default: DCHECK(false) << "NYI"; } - SetDstSlot(agg_fn_ctx, staging_intermediate_val_, intermediate_slot_desc_, dst); + SetDstSlot(staging_intermediate_val_, slot_desc, dst); } -void AggFnEvaluator::Merge(FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst) { - DCHECK(merge_fn_ != NULL); +void AggFnEvaluator::Merge(Tuple* src, Tuple* dst) { + DCHECK(agg_fn_.merge_fn_ != nullptr); - SetAnyVal(intermediate_slot_desc_, dst, staging_intermediate_val_); - SetAnyVal(intermediate_slot_desc_, src, staging_merge_input_val_); + const SlotDescriptor& slot_desc = intermediate_slot_desc(); + SetAnyVal(slot_desc, dst, staging_intermediate_val_); + SetAnyVal(slot_desc, src, staging_merge_input_val_); // The merge fn always takes one input argument. - reinterpret_cast<UpdateFn1>(merge_fn_)(agg_fn_ctx, + reinterpret_cast<UpdateFn1>(agg_fn_.merge_fn_)(agg_fn_ctx_.get(), *staging_merge_input_val_, staging_intermediate_val_); - SetDstSlot(agg_fn_ctx, staging_intermediate_val_, intermediate_slot_desc_, dst); + SetDstSlot(staging_intermediate_val_, slot_desc, dst); } -void AggFnEvaluator::SerializeOrFinalize(FunctionContext* agg_fn_ctx, Tuple* src, - const SlotDescriptor* dst_slot_desc, Tuple* dst, void* fn) { +void AggFnEvaluator::SerializeOrFinalize(Tuple* src, + const SlotDescriptor& dst_slot_desc, Tuple* dst, void* fn) { // No fn was given and the src and dst are identical. Nothing to be done. - if (fn == NULL && src == dst) return; + if (fn == nullptr && src == dst) return; // src != dst means we are performing a Finalize(), so even if fn == null we // still must copy the value of the src slot into dst. - bool src_slot_null = src->IsNull(intermediate_slot_desc_->null_indicator_offset()); - void* src_slot = NULL; - if (!src_slot_null) src_slot = src->GetSlot(intermediate_slot_desc_->tuple_offset()); + const SlotDescriptor& slot_desc = intermediate_slot_desc(); + bool src_slot_null = src->IsNull(slot_desc.null_indicator_offset()); + void* src_slot = nullptr; + if (!src_slot_null) src_slot = src->GetSlot(slot_desc.tuple_offset()); // No fn was given but the src and dst tuples are different (doing a Finalize()). // Just copy the src slot into the dst tuple. - if (fn == NULL) { - DCHECK_EQ(intermediate_type(), dst_slot_desc->type()); - RawValue::Write(src_slot, dst, dst_slot_desc, NULL); + if (fn == nullptr) { + DCHECK_EQ(intermediate_type(), dst_slot_desc.type()); + RawValue::Write(src_slot, dst, &dst_slot_desc, nullptr); return; } AnyValUtil::SetAnyVal(src_slot, intermediate_type(), staging_intermediate_val_); - switch (dst_slot_desc->type().type) { + switch (dst_slot_desc.type().type) { case TYPE_BOOLEAN: { typedef BooleanVal(*Fn)(FunctionContext*, AnyVal*); - BooleanVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, staging_intermediate_val_); - SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst); + BooleanVal v = reinterpret_cast<Fn>(fn)( + agg_fn_ctx_.get(), staging_intermediate_val_); + SetDstSlot(&v, dst_slot_desc, dst); break; } case TYPE_TINYINT: { typedef TinyIntVal(*Fn)(FunctionContext*, AnyVal*); - TinyIntVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, staging_intermediate_val_); - SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst); + TinyIntVal v = reinterpret_cast<Fn>(fn)( + agg_fn_ctx_.get(), staging_intermediate_val_); + SetDstSlot(&v, dst_slot_desc, dst); break; } case TYPE_SMALLINT: { typedef SmallIntVal(*Fn)(FunctionContext*, AnyVal*); - SmallIntVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, staging_intermediate_val_); - SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst); + SmallIntVal v = reinterpret_cast<Fn>(fn)( + agg_fn_ctx_.get(), staging_intermediate_val_); + SetDstSlot(&v, dst_slot_desc, dst); break; } case TYPE_INT: { typedef IntVal(*Fn)(FunctionContext*, AnyVal*); - IntVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, staging_intermediate_val_); - SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst); + IntVal v = reinterpret_cast<Fn>(fn)( + agg_fn_ctx_.get(), staging_intermediate_val_); + SetDstSlot(&v, dst_slot_desc, dst); break; } case TYPE_BIGINT: { typedef BigIntVal(*Fn)(FunctionContext*, AnyVal*); - BigIntVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, staging_intermediate_val_); - SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst); + BigIntVal v = reinterpret_cast<Fn>(fn)( + agg_fn_ctx_.get(), staging_intermediate_val_); + SetDstSlot(&v, dst_slot_desc, dst); break; } case TYPE_FLOAT: { typedef FloatVal(*Fn)(FunctionContext*, AnyVal*); - FloatVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, staging_intermediate_val_); - SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst); + FloatVal v = reinterpret_cast<Fn>(fn)( + agg_fn_ctx_.get(), staging_intermediate_val_); + SetDstSlot(&v, dst_slot_desc, dst); break; } case TYPE_DOUBLE: { typedef DoubleVal(*Fn)(FunctionContext*, AnyVal*); - DoubleVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, staging_intermediate_val_); - SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst); + DoubleVal v = reinterpret_cast<Fn>(fn)( + agg_fn_ctx_.get(), staging_intermediate_val_); + SetDstSlot(&v, dst_slot_desc, dst); break; } case TYPE_STRING: case TYPE_VARCHAR: { typedef StringVal(*Fn)(FunctionContext*, AnyVal*); - StringVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, staging_intermediate_val_); - SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst); + StringVal v = reinterpret_cast<Fn>(fn)( + agg_fn_ctx_.get(), staging_intermediate_val_); + SetDstSlot(&v, dst_slot_desc, dst); break; } case TYPE_DECIMAL: { typedef DecimalVal(*Fn)(FunctionContext*, AnyVal*); - DecimalVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, staging_intermediate_val_); - SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst); + DecimalVal v = reinterpret_cast<Fn>(fn)( + agg_fn_ctx_.get(), staging_intermediate_val_); + SetDstSlot(&v, dst_slot_desc, dst); break; } case TYPE_TIMESTAMP: { typedef TimestampVal(*Fn)(FunctionContext*, AnyVal*); - TimestampVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, staging_intermediate_val_); - SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst); + TimestampVal v = reinterpret_cast<Fn>(fn)( + agg_fn_ctx_.get(), staging_intermediate_val_); + SetDstSlot(&v, dst_slot_desc, dst); break; } default: @@ -511,56 +477,34 @@ void AggFnEvaluator::SerializeOrFinalize(FunctionContext* agg_fn_ctx, Tuple* src } } -/// Gets the update or merge function for this UDA. -Status AggFnEvaluator::GetUpdateOrMergeFunction(LlvmCodeGen* codegen, Function** uda_fn) { - const string& symbol = - is_merge_ ? fn_.aggregate_fn.merge_fn_symbol : fn_.aggregate_fn.update_fn_symbol; - vector<ColumnType> fn_arg_types; - for (ExprContext* input_expr_ctx : input_expr_ctxs_) { - fn_arg_types.push_back(input_expr_ctx->root()->type()); - } - // The intermediate value is passed as the last argument. - fn_arg_types.push_back(intermediate_type()); - RETURN_IF_ERROR(codegen->LoadFunction(fn_, symbol, NULL, fn_arg_types, - fn_arg_types.size(), false, uda_fn, &cache_entry_)); - - // Inline constants into the function body (if there is an IR body). - if (!(*uda_fn)->isDeclaration()) { - // TODO: IMPALA-4785: we should also replace references to GetIntermediateType() - // with constants. - codegen->InlineConstFnAttrs(GetOutputTypeDesc(), arg_type_descs_, *uda_fn); - *uda_fn = codegen->FinalizeFunction(*uda_fn); - if (*uda_fn == NULL) { - return Status(TErrorCode::UDF_VERIFY_FAILED, symbol, fn_.hdfs_location); - } - } - return Status::OK(); -} - -FunctionContext::TypeDesc AggFnEvaluator::GetIntermediateTypeDesc() const { - return AnyValUtil::ColumnTypeToTypeDesc(intermediate_slot_desc_->type()); +void AggFnEvaluator::ShallowClone(ObjectPool* pool, MemPool* mem_pool, + AggFnEvaluator** cloned_eval) const { + DCHECK(opened_); + *cloned_eval = pool->Add(new AggFnEvaluator(agg_fn_, mem_pool, true)); + (*cloned_eval)->agg_fn_ctx_.reset(agg_fn_ctx_->impl()->Clone(mem_pool)); + DCHECK_EQ((*cloned_eval)->input_evals_.size(), 0); + (*cloned_eval)->input_evals_ = input_evals_; + (*cloned_eval)->staging_input_vals_ = staging_input_vals_; + (*cloned_eval)->staging_intermediate_val_ = staging_intermediate_val_; + (*cloned_eval)->staging_merge_input_val_ = staging_merge_input_val_; + (*cloned_eval)->opened_ = true; } -FunctionContext::TypeDesc AggFnEvaluator::GetOutputTypeDesc() const { - return AnyValUtil::ColumnTypeToTypeDesc(output_slot_desc_->type()); +void AggFnEvaluator::ShallowClone(ObjectPool* pool, MemPool* mem_pool, + const vector<AggFnEvaluator*>& evals, + vector<AggFnEvaluator*>* cloned_evals) { + for (const AggFnEvaluator* eval : evals) { + AggFnEvaluator* cloned_eval; + eval->ShallowClone(pool, mem_pool, &cloned_eval); + cloned_evals->push_back(cloned_eval); + } } -string AggFnEvaluator::DebugString(const vector<AggFnEvaluator*>& exprs) { - stringstream out; - out << "["; - for (int i = 0; i < exprs.size(); ++i) { - out << (i == 0 ? "" : " ") << exprs[i]->DebugString(); - } - out << "]"; - return out.str(); +void AggFnEvaluator::FreeLocalAllocations() { + ScalarExprEvaluator::FreeLocalAllocations(input_evals_); + agg_fn_ctx_->impl()->FreeLocalAllocations(); } -string AggFnEvaluator::DebugString() const { - stringstream out; - out << "AggFnEvaluator(op=" << agg_op_; - for (int i = 0; i < input_expr_ctxs_.size(); ++i) { - out << " " << input_expr_ctxs_[i]->root()->DebugString() << ")"; - } - out << ")"; - return out.str(); +void AggFnEvaluator::FreeLocalAllocations(const vector<AggFnEvaluator*>& evals) { + for (AggFnEvaluator* eval : evals) eval->FreeLocalAllocations(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/agg-fn-evaluator.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/agg-fn-evaluator.h b/be/src/exprs/agg-fn-evaluator.h index 712bf40..2ce968c 100644 --- a/be/src/exprs/agg-fn-evaluator.h +++ b/be/src/exprs/agg-fn-evaluator.h @@ -23,6 +23,7 @@ #include <boost/scoped_ptr.hpp> #include "common/status.h" +#include "exprs/agg-fn.h" #include "runtime/descriptors.h" #include "runtime/lib-cache.h" #include "runtime/tuple-row.h" @@ -34,13 +35,8 @@ #include "gen-cpp/PlanNodes_types.h" #include "gen-cpp/Types_types.h" -using namespace impala_udf; - namespace impala { -class AggregationNode; -class Expr; -class ExprContext; class MemPool; class MemTracker; class ObjectPool; @@ -51,98 +47,94 @@ class Tuple; class TupleRow; class TExprNode; -/// This class evaluates aggregate functions. Aggregate functions can either be -/// builtins or external UDAs. For both of types types, they can either use codegen -/// or not. -// -/// This class provides an interface that's 1:1 with the UDA interface and serves -/// as glue code between the TupleRow/Tuple signature used by the AggregationNode -/// and the AnyVal signature of the UDA interface. It handles evaluating input -/// slots from TupleRows and aggregating the result to the result tuple. -// -/// This class is not threadsafe. However, it can be used for multiple interleaved -/// evaluations of the aggregation function by using multiple FunctionContexts. +/// AggFnEvaluator is the interface for evaluating aggregate functions during execution. +/// +/// AggFnEvaluator contains runtime state and implements wrapper functions which convert +/// the input TupleRow into AnyVal format expected by UDAF functions defined in AggFn. +/// It also evaluates TupleRow against input expressions, stores the results in staging +/// input values which are passed to Update() function to update the intermediate value +/// and handles the merging of intermediate values in the merge phases of execution. +/// +/// This class is not threadsafe. An evaluator can be cloned to isolate resource +/// consumption per partition in an aggregation node. +/// class AggFnEvaluator { public: - /// TODO: The aggregation node has custom codegen paths for a few of the builtins. - /// That logic needs to be removed. For now, add some enums for those builtins. - enum AggregationOp { - COUNT, - MIN, - MAX, - SUM, - AVG, - NDV, - OTHER, - }; - - /// Creates an AggFnEvaluator object from desc. The object is added to 'pool' - /// and returned in *result. This constructs the input Expr trees for - /// this aggregate function as specified in desc. The result is returned in - /// *result. - static Status Create(ObjectPool* pool, const TExpr& desc, AggFnEvaluator** result); - - /// Creates an AggFnEvaluator object from desc. If is_analytic_fn, the evaluator is - /// prepared for analytic function evaluation. - /// TODO: Avoid parameter for analytic fns, should this be added to TAggregateExpr? - static Status Create(ObjectPool* pool, const TExpr& desc, bool is_analytic_fn, - AggFnEvaluator** result); - - /// Initializes the agg expr. 'desc' must be the row descriptor for the input TupleRow. - /// It is used to get the input values in the Update() and Merge() functions. - /// 'intermediate_slot_desc' is the slot into which this evaluator should write the - /// results of Update()/Merge()/Serialize(). - /// 'output_slot_desc' is the slot into which this evaluator should write the results - /// of Finalize() - /// 'agg_fn_ctx' will be initialized for the agg function using 'agg_fn_pool'. Caller - /// is responsible for closing and deleting 'agg_fn_ctx'. - Status Prepare(RuntimeState* state, const RowDescriptor& desc, - const SlotDescriptor* intermediate_slot_desc, - const SlotDescriptor* output_slot_desc, - MemPool* agg_fn_pool, FunctionContext** agg_fn_ctx); + /// Creates an AggFnEvaluator object from the aggregate expression 'agg_fn'. + /// The evaluator is added to 'pool' and returned in 'eval'. This will also + /// create a single evaluator for each input expression. All allocations will come + /// from 'mem_pool'. Note that it's the responsibility to call Close() all evaluators + /// even if this function returns error status on initialization failure. + static Status Create(const AggFn& agg_fn, RuntimeState* state, ObjectPool* pool, + MemPool* mem_pool, AggFnEvaluator** eval) WARN_UNUSED_RESULT; + + /// Convenience functions for creating evaluators for multiple aggregate functions. + static Status Create(const std::vector<AggFn*>& agg_fns, RuntimeState* state, + ObjectPool* pool, MemPool* mem_pool, std::vector<AggFnEvaluator*>* evals) + WARN_UNUSED_RESULT; ~AggFnEvaluator(); - /// 'agg_fn_ctx' may be cloned after calling Open(). Note that closing all - /// FunctionContexts, including the original one returned by Prepare(), is the - /// responsibility of the caller. - Status Open(RuntimeState* state, FunctionContext* agg_fn_ctx); - + /// Initializes the evaluator by calling Open() on all the input expressions' evaluators + /// and caches all constant input arguments. + /// TODO: Move the evaluation of constant input arguments to AggFn setup. + Status Open(RuntimeState* state) WARN_UNUSED_RESULT; + + /// Convenience functions for opening multiple AggFnEvaluators. + static Status Open(const std::vector<AggFnEvaluator*>& evals, + RuntimeState* state) WARN_UNUSED_RESULT; + + /// Used by PartitionedAggregation node to initialize one evaluator per partition. + /// Avoid the overhead of re-initializing an evaluator (e.g. calling GetConstVal() + /// on the input expressions). Cannot be called until after Open() has been called. + /// 'cloned_eval' is a shallow copy of this evaluator: all input values, staging + /// intermediate values and merge values are shared with the original evaluator. Only + /// the FunctionContext 'agg_fn_ctx' is cloned for resource isolation per partition. + /// So, it's not safe to use cloned evaluators concurrently. + void ShallowClone( + ObjectPool* pool, MemPool* mem_pool, AggFnEvaluator** cloned_eval) const; + + /// Convenience function for cloning multiple evaluators. The newly cloned evaluators + /// are appended to 'cloned_evals'. + static void ShallowClone(ObjectPool* pool, MemPool* mem_pool, + const std::vector<AggFnEvaluator*>& evals, + std::vector<AggFnEvaluator*>* cloned_evals); + + /// Free resources owned by the evaluator. void Close(RuntimeState* state); + static void Close(const std::vector<AggFnEvaluator*>& evals, RuntimeState* state); - const ColumnType& intermediate_type() const { return intermediate_slot_desc_->type(); } - bool is_merge() const { return is_merge_; } - AggregationOp agg_op() const { return agg_op_; } - const std::vector<ExprContext*>& input_expr_ctxs() const { return input_expr_ctxs_; } - bool is_count_star() const { return agg_op_ == COUNT && input_expr_ctxs_.empty(); } - bool is_builtin() const { return fn_.binary_type == TFunctionBinaryType::BUILTIN; } - bool SupportsRemove() const { return remove_fn_ != NULL; } - bool SupportsSerialize() const { return serialize_fn_ != NULL; } - const std::string& fn_name() const { return fn_.name.function_name; } - const SlotDescriptor* output_slot_desc() const { return output_slot_desc_; } - - static std::string DebugString(const std::vector<AggFnEvaluator*>& exprs); - std::string DebugString() const; + const AggFn& agg_fn() const { return agg_fn_; } + + FunctionContext* IR_ALWAYS_INLINE agg_fn_ctx() const; - /// Functions for different phases of the aggregation. - void Init(FunctionContext* agg_fn_ctx, Tuple* dst); + ScalarExprEvaluator* const* IR_ALWAYS_INLINE input_evals() const; + + /// Call the initialization function of the AggFn. May update 'dst'. + void Init(Tuple* dst); /// Updates the intermediate state dst based on adding the input src row. This can be - /// called either to drive the UDA's Update() or Merge() function depending on - /// is_merge_. That is, from the caller, it doesn't mater. - void Add(FunctionContext* agg_fn_ctx, const TupleRow* src, Tuple* dst); + /// called either to drive the UDA's Update() or Merge() function, depending on whether + /// the AggFn is a merging aggregation. + void Add(const TupleRow* src, Tuple* dst); - /// Updates the intermediate state dst to remove the input src row, i.e. undoes + /// Updates the intermediate state dst to remove the input src row, i.e. undo /// Add(src, dst). Only used internally for analytic fn builtins. - void Remove(FunctionContext* agg_fn_ctx, const TupleRow* src, Tuple* dst); + void Remove(const TupleRow* src, Tuple* dst); - /// Explicitly does a merge, even if this evalutor is not marked as merging. + /// Explicitly does a merge, even if this evaluator is not marked as merging. /// This is used by the partitioned agg node when it needs to merge spill results. /// In the non-spilling case, this node would normally not merge. - void Merge(FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst); + void Merge(Tuple* src, Tuple* dst); + + /// Flattens any intermediate values containing pointers, and frees any memory + /// allocated during the init, update and merge phases. + void Serialize(Tuple* dst); - void Serialize(FunctionContext* agg_fn_ctx, Tuple* dst); - void Finalize(FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst); + /// Does one final transformation of the aggregated value in 'agg_val' and stores the + /// result in 'output_val'. Also frees the resources allocated during init, update and + /// merge phases. + void Finalize(Tuple* agg_val, Tuple* output_val); /// Puts the finalized value from Tuple* src in Tuple* dst just as Finalize() does. /// However, unlike Finalize(), GetValue() does not clean up state in src. @@ -150,169 +142,155 @@ class AggFnEvaluator { /// analytic fn builtins. Note that StringVal result is from local allocation (which /// will be freed in the next QueryMaintenance()) so it needs to be copied out if it /// needs to survive beyond QueryMaintenance() (e.g. if 'dst' lives in a row batch). - void GetValue(FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst); + void GetValue(Tuple* src, Tuple* dst); /// Helper functions for calling the above functions on many evaluators. - static void Init(const std::vector<AggFnEvaluator*>& evaluators, - const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst); - static void Add(const std::vector<AggFnEvaluator*>& evaluators, - const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, Tuple* dst); - static void Remove(const std::vector<AggFnEvaluator*>& evaluators, - const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, Tuple* dst); - static void Serialize(const std::vector<AggFnEvaluator*>& evaluators, - const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst); - static void GetValue(const std::vector<AggFnEvaluator*>& evaluators, - const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst); - static void Finalize(const std::vector<AggFnEvaluator*>& evaluators, - const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst); - - /// Gets the codegened update or merge function for this aggregate function. - Status GetUpdateOrMergeFunction(LlvmCodeGen* codegen, llvm::Function** uda_fn); + static void Init(const std::vector<AggFnEvaluator*>& evals, Tuple* dst); + static void Add(const std::vector<AggFnEvaluator*>& evals, const TupleRow* src, + Tuple* dst); + static void Remove(const std::vector<AggFnEvaluator*>& evals, + const TupleRow* src, Tuple* dst); + static void Serialize(const std::vector<AggFnEvaluator*>& evals, + Tuple* dst); + static void GetValue(const std::vector<AggFnEvaluator*>& evals, Tuple* src, + Tuple* dst); + static void Finalize(const std::vector<AggFnEvaluator*>& evals, Tuple* src, + Tuple* dst); + + /// Free local allocations made in UDA functions and input arguments' evals. + void FreeLocalAllocations(); + static void FreeLocalAllocations(const std::vector<AggFnEvaluator*>& evals); + + std::string DebugString() const; + static std::string DebugString(const std::vector<AggFnEvaluator*>& evals); + + static const char* LLVM_CLASS_NAME; private: - const TFunction fn_; - /// Indicates whether to Update() or Merge() - const bool is_merge_; - /// Indicates which functions must be loaded. - const bool is_analytic_fn_; + /// True if the evaluator has been initialized. + bool opened_ = false; - /// Slot into which Update()/Merge()/Serialize() write their result. Not owned. - const SlotDescriptor* intermediate_slot_desc_; + /// True if the evaluator has been closed. + bool closed_ = false; - /// Slot into which Finalize() results are written. Not owned. Identical to - /// intermediate_slot_desc_ if this agg fn has the same intermediate and output type. - const SlotDescriptor* output_slot_desc_; + /// True if this evaluator is created from a ShallowClone() call. + const bool is_clone_; - /// Expression contexts for this AggFnEvaluator. Empty if there is no - /// expression (e.g. count(*)). - std::vector<ExprContext*> input_expr_ctxs_; + const AggFn& agg_fn_; - /// The types of the arguments to the aggregate function. - const std::vector<FunctionContext::TypeDesc> arg_type_descs_; + /// Pointer to the MemPool which all allocations come from. + /// Owned by the exec node which owns this evaluator. + MemPool* mem_pool_ = nullptr; - /// The enum for some of the builtins that still require special cased logic. - AggregationOp agg_op_; + /// This contains runtime state such as constant input arguments to the aggregate + /// functions and a FreePool from which the intermediate values are allocated. + /// Owned by this evaluator. + boost::scoped_ptr<FunctionContext> agg_fn_ctx_; - /// Created to a subclass of AnyVal for type(). We use this to convert values - /// from the UDA interface to the Expr interface. - /// These objects are allocated in the runtime state's object pool. - /// TODO: this is awful, remove this when exprs are updated. - std::vector<impala_udf::AnyVal*> staging_input_vals_; - impala_udf::AnyVal* staging_intermediate_val_; - impala_udf::AnyVal* staging_merge_input_val_; + /// Evaluators for input expressions for this aggregate function. + /// Empty if there is no input expression (e.g. count(*)). + std::vector<ScalarExprEvaluator*> input_evals_; - /// Cache entry for the library containing the function ptrs. - LibCacheEntry* cache_entry_; + /// Staging input values used by the interpreted Update() / Merge() paths. + /// It stores the evaluation results of input expressions to be passed to the + /// Update() / Merge() function. + std::vector<impala_udf::AnyVal*> staging_input_vals_; - /// Function ptrs for the different phases of the aggregate function. - void* init_fn_; - void* update_fn_; - void* remove_fn_; - void* merge_fn_; - void* serialize_fn_; - void* get_value_fn_; - void* finalize_fn_; + /// Staging intermediate and merged values used in the interpreted + /// Update() / Merge() paths. + impala_udf::AnyVal* staging_intermediate_val_ = nullptr; + impala_udf::AnyVal* staging_merge_input_val_ = nullptr; /// Use Create() instead. - AggFnEvaluator(const TExprNode& desc, bool is_analytic_fn); + AggFnEvaluator(const AggFn& agg_fn, MemPool* mem_pool, bool is_clone); /// Return the intermediate type of the aggregate function. - FunctionContext::TypeDesc GetIntermediateTypeDesc() const; - - /// Return the output type of the aggregate function. - FunctionContext::TypeDesc GetOutputTypeDesc() const; - - /// TODO: these functions below are not extensible and we need to use codegen to - /// generate the calls into the UDA functions (like for UDFs). - /// Remove these functions when this is supported. - - /// Sets up the arguments to call fn. This converts from the agg-expr signature, - /// taking TupleRow to the UDA signature taking AnvVals by populating the staging - /// AnyVals. - /// fn must be a function that implement's the UDA Update() signature. - void Update(FunctionContext* agg_fn_ctx, const TupleRow* row, Tuple* dst, void* fn); + inline const SlotDescriptor& intermediate_slot_desc() const; + inline const ColumnType& intermediate_type() const; + + /// The interpreted path for the UDA's Update() function. It sets up the arguments to + /// call 'fn' is either the 'update_fn_' or 'merge_fn_' of agg_fn_, depending on whether + /// agg_fn_ is a merging aggregation. This converts from the agg-expr signature, taking + /// TupleRow to the UDA signature taking AnyVals by evaluating any input expressions + /// and populating the staging input values. + /// + /// Note that this function may be superseded by the codegend Update() IR function + /// generated by AggFn::CodegenUpdateOrMergeFunction() when codegen is enabled. + void Update(const TupleRow* row, Tuple* dst, void* fn); /// Sets up the arguments to call 'fn'. This converts from the agg-expr signature, - /// taking TupleRow to the UDA signature taking AnvVals. Writes the serialize/finalize + /// taking TupleRow to the UDA signature taking AnyVals. Writes the serialize/finalize /// result to the given destination slot/tuple. 'fn' can be NULL to indicate the src /// value should simply be written into the destination. Note that StringVal result is /// from local allocation (which will be freed in the next QueryMaintenance()) so it /// needs to be copied out if it needs to survive beyond QueryMaintenance() (e.g. if /// 'dst' lives in a row batch). - void SerializeOrFinalize(FunctionContext* agg_fn_ctx, Tuple* src, - const SlotDescriptor* dst_slot_desc, Tuple* dst, void* fn); + void SerializeOrFinalize(Tuple* src, const SlotDescriptor& dst_slot_desc, + Tuple* dst, void* fn); /// Writes the result in src into dst pointed to by dst_slot_desc - void SetDstSlot(FunctionContext* ctx, const impala_udf::AnyVal* src, - const SlotDescriptor* dst_slot_desc, Tuple* dst); + inline void SetDstSlot( + const impala_udf::AnyVal* src, const SlotDescriptor& dst_slot_desc, Tuple* dst); }; -inline void AggFnEvaluator::Add( - FunctionContext* agg_fn_ctx, const TupleRow* row, Tuple* dst) { - agg_fn_ctx->impl()->IncrementNumUpdates(); - Update(agg_fn_ctx, row, dst, is_merge() ? merge_fn_ : update_fn_); +inline void AggFnEvaluator::Add(const TupleRow* row, Tuple* dst) { + agg_fn_ctx_->impl()->IncrementNumUpdates(); + Update(row, dst, agg_fn_.merge_or_update_fn()); } -inline void AggFnEvaluator::Remove( - FunctionContext* agg_fn_ctx, const TupleRow* row, Tuple* dst) { - agg_fn_ctx->impl()->IncrementNumRemoves(); - Update(agg_fn_ctx, row, dst, remove_fn_); + +inline void AggFnEvaluator::Remove(const TupleRow* row, Tuple* dst) { + agg_fn_ctx_->impl()->IncrementNumRemoves(); + Update(row, dst, agg_fn_.remove_fn()); } -inline void AggFnEvaluator::Serialize( - FunctionContext* agg_fn_ctx, Tuple* tuple) { - SerializeOrFinalize(agg_fn_ctx, tuple, intermediate_slot_desc_, tuple, serialize_fn_); + +inline void AggFnEvaluator::Serialize(Tuple* tuple) { + SerializeOrFinalize(tuple, agg_fn_.intermediate_slot_desc(), tuple, + agg_fn_.serialize_fn()); } -inline void AggFnEvaluator::Finalize( - FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst) { - SerializeOrFinalize(agg_fn_ctx, src, output_slot_desc_, dst, finalize_fn_); + +inline void AggFnEvaluator::Finalize(Tuple* agg_val, Tuple* output_val) { + SerializeOrFinalize(agg_val, agg_fn_.output_slot_desc(), output_val, + agg_fn_.finalize_fn()); } -inline void AggFnEvaluator::GetValue( - FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst) { - SerializeOrFinalize(agg_fn_ctx, src, output_slot_desc_, dst, get_value_fn_); + +inline void AggFnEvaluator::GetValue(Tuple* src, Tuple* dst) { + SerializeOrFinalize(src, agg_fn_.output_slot_desc(), dst, + agg_fn_.get_value_fn()); } -inline void AggFnEvaluator::Init(const std::vector<AggFnEvaluator*>& evaluators, - const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst) { - DCHECK_EQ(evaluators.size(), fn_ctxs.size()); - for (int i = 0; i < evaluators.size(); ++i) { - evaluators[i]->Init(fn_ctxs[i], dst); - } +inline void AggFnEvaluator::Init(const std::vector<AggFnEvaluator*>& evals, Tuple* dst) { + for (int i = 0; i < evals.size(); ++i) evals[i]->Init(dst); } -inline void AggFnEvaluator::Add(const std::vector<AggFnEvaluator*>& evaluators, - const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, Tuple* dst) { - DCHECK_EQ(evaluators.size(), fn_ctxs.size()); - for (int i = 0; i < evaluators.size(); ++i) { - evaluators[i]->Add(fn_ctxs[i], src, dst); - } + +inline void AggFnEvaluator::Add(const std::vector<AggFnEvaluator*>& evals, + const TupleRow* src, Tuple* dst) { + for (int i = 0; i < evals.size(); ++i) evals[i]->Add(src, dst); } -inline void AggFnEvaluator::Remove(const std::vector<AggFnEvaluator*>& evaluators, - const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, Tuple* dst) { - DCHECK_EQ(evaluators.size(), fn_ctxs.size()); - for (int i = 0; i < evaluators.size(); ++i) { - evaluators[i]->Remove(fn_ctxs[i], src, dst); - } + +inline void AggFnEvaluator::Remove(const std::vector<AggFnEvaluator*>& evals, + const TupleRow* src, Tuple* dst) { + for (int i = 0; i < evals.size(); ++i) evals[i]->Remove(src, dst); } -inline void AggFnEvaluator::Serialize(const std::vector<AggFnEvaluator*>& evaluators, - const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst) { - DCHECK_EQ(evaluators.size(), fn_ctxs.size()); - for (int i = 0; i < evaluators.size(); ++i) { - evaluators[i]->Serialize(fn_ctxs[i], dst); - } + +inline void AggFnEvaluator::Serialize(const std::vector<AggFnEvaluator*>& evals, + Tuple* dst) { + for (int i = 0; i < evals.size(); ++i) evals[i]->Serialize(dst); } -inline void AggFnEvaluator::GetValue(const std::vector<AggFnEvaluator*>& evaluators, - const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst) { - DCHECK_EQ(evaluators.size(), fn_ctxs.size()); - for (int i = 0; i < evaluators.size(); ++i) { - evaluators[i]->GetValue(fn_ctxs[i], src, dst); - } + +inline void AggFnEvaluator::GetValue(const std::vector<AggFnEvaluator*>& evals, + Tuple* src, Tuple* dst) { + for (int i = 0; i < evals.size(); ++i) evals[i]->GetValue(src, dst); } -inline void AggFnEvaluator::Finalize(const std::vector<AggFnEvaluator*>& evaluators, - const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst) { - DCHECK_EQ(evaluators.size(), fn_ctxs.size()); - for (int i = 0; i < evaluators.size(); ++i) { - evaluators[i]->Finalize(fn_ctxs[i], src, dst); + +inline void AggFnEvaluator::Finalize(const std::vector<AggFnEvaluator*>& evals, + Tuple* agg_val, Tuple* output_val) { + for (int i = 0; i < evals.size(); ++i) { + evals[i]->Finalize(agg_val, output_val); } } + + } #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/agg-fn.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/agg-fn.cc b/be/src/exprs/agg-fn.cc new file mode 100644 index 0000000..6e1d75f --- /dev/null +++ b/be/src/exprs/agg-fn.cc @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/agg-fn.h" + +#include "codegen/llvm-codegen.h" +#include "exprs/anyval-util.h" +#include "exprs/scalar-expr.h" +#include "runtime/descriptors.h" +#include "runtime/lib-cache.h" + +#include "common/names.h" + +using namespace impala_udf; +using namespace llvm; + +namespace impala { + +AggFn::AggFn(const TExprNode& tnode, const SlotDescriptor& intermediate_slot_desc, + const SlotDescriptor& output_slot_desc) + : Expr(tnode), + is_merge_(tnode.agg_expr.is_merge_agg), + intermediate_slot_desc_(intermediate_slot_desc), + output_slot_desc_(output_slot_desc), + arg_type_descs_(AnyValUtil::ColumnTypesToTypeDescs( + ColumnType::FromThrift(tnode.agg_expr.arg_types))) { + DCHECK(tnode.__isset.fn); + DCHECK(tnode.fn.__isset.aggregate_fn); + DCHECK_EQ(tnode.node_type, TExprNodeType::AGGREGATE_EXPR); + DCHECK_EQ(ColumnType::FromThrift(tnode.type).type, + ColumnType::FromThrift(fn_.ret_type).type); + const string& fn_name = fn_.name.function_name; + if (fn_name == "count") { + agg_op_ = COUNT; + } else if (fn_name == "min") { + agg_op_ = MIN; + } else if (fn_name == "max") { + agg_op_ = MAX; + } else if (fn_name == "sum") { + agg_op_ = SUM; + } else if (fn_name == "avg") { + agg_op_ = AVG; + } else if (fn_name == "ndv" || fn_name == "ndv_no_finalize") { + agg_op_ = NDV; + } else { + agg_op_ = OTHER; + } +} + +Status AggFn::Init(const RowDescriptor& row_desc, RuntimeState* state) { + // Initialize all children (i.e. input exprs to this aggregate expr). + for (ScalarExpr* input_expr : children()) { + RETURN_IF_ERROR(input_expr->Init(row_desc, state)); + } + + // Initialize the aggregate expressions' internals. + const TAggregateFunction& aggregate_fn = fn_.aggregate_fn; + DCHECK_EQ(intermediate_slot_desc_.type().type, + ColumnType::FromThrift(aggregate_fn.intermediate_type).type); + DCHECK_EQ(output_slot_desc_.type().type, ColumnType::FromThrift(fn_.ret_type).type); + + // Load the function pointers. Must have init() and update(). + if (aggregate_fn.init_fn_symbol.empty() || + aggregate_fn.update_fn_symbol.empty() || + (aggregate_fn.merge_fn_symbol.empty() && !aggregate_fn.is_analytic_only_fn)) { + // This path is only for partially implemented builtins. + DCHECK_EQ(fn_.binary_type, TFunctionBinaryType::BUILTIN); + stringstream ss; + ss << "Function " << fn_.name.function_name << " is not implemented."; + return Status(ss.str()); + } + + RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location, + aggregate_fn.init_fn_symbol, &init_fn_, &cache_entry_)); + RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location, + aggregate_fn.update_fn_symbol, &update_fn_, &cache_entry_)); + + // Merge() is not defined for purely analytic function. + if (!aggregate_fn.is_analytic_only_fn) { + RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location, + aggregate_fn.merge_fn_symbol, &merge_fn_, &cache_entry_)); + } + // Serialize(), GetValue(), Remove() and Finalize() are optional + if (!aggregate_fn.serialize_fn_symbol.empty()) { + RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location, + aggregate_fn.serialize_fn_symbol, &serialize_fn_, &cache_entry_)); + } + if (!aggregate_fn.get_value_fn_symbol.empty()) { + RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location, + aggregate_fn.get_value_fn_symbol, &get_value_fn_, &cache_entry_)); + } + if (!aggregate_fn.remove_fn_symbol.empty()) { + RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location, + aggregate_fn.remove_fn_symbol, &remove_fn_, &cache_entry_)); + } + if (!aggregate_fn.finalize_fn_symbol.empty()) { + RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location, + fn_.aggregate_fn.finalize_fn_symbol, &finalize_fn_, &cache_entry_)); + } + return Status::OK(); +} + +Status AggFn::Create(const TExpr& texpr, const RowDescriptor& row_desc, + const SlotDescriptor& intermediate_slot_desc, const SlotDescriptor& output_slot_desc, + RuntimeState* state, AggFn** agg_fn) { + *agg_fn = nullptr; + ObjectPool* pool = state->obj_pool(); + const TExprNode& texpr_node = texpr.nodes[0]; + DCHECK_EQ(texpr_node.node_type, TExprNodeType::AGGREGATE_EXPR); + if (!texpr_node.__isset.fn) { + return Status("Function not set in thrift AGGREGATE_EXPR node"); + } + AggFn* new_agg_fn = + pool->Add(new AggFn(texpr_node, intermediate_slot_desc, output_slot_desc)); + RETURN_IF_ERROR(Expr::CreateTree(texpr, pool, new_agg_fn)); + Status status = new_agg_fn->Init(row_desc, state); + if (UNLIKELY(!status.ok())) { + new_agg_fn->Close(); + return status; + } + for (ScalarExpr* input_expr : new_agg_fn->children()) { + int fn_ctx_idx = 0; + input_expr->AssignFnCtxIdx(&fn_ctx_idx); + } + *agg_fn = new_agg_fn; + return Status::OK(); +} + +FunctionContext::TypeDesc AggFn::GetIntermediateTypeDesc() const { + return AnyValUtil::ColumnTypeToTypeDesc(intermediate_slot_desc_.type()); +} + +FunctionContext::TypeDesc AggFn::GetOutputTypeDesc() const { + return AnyValUtil::ColumnTypeToTypeDesc(output_slot_desc_.type()); +} + +Status AggFn::CodegenUpdateOrMergeFunction(LlvmCodeGen* codegen, Function** uda_fn) { + const string& symbol = + is_merge_ ? fn_.aggregate_fn.merge_fn_symbol : fn_.aggregate_fn.update_fn_symbol; + vector<ColumnType> fn_arg_types; + for (ScalarExpr* input_expr : children()) { + fn_arg_types.push_back(input_expr->type()); + } + // The intermediate value is passed as the last argument. + fn_arg_types.push_back(intermediate_type()); + RETURN_IF_ERROR(codegen->LoadFunction(fn_, symbol, nullptr, fn_arg_types, + fn_arg_types.size(), false, uda_fn, &cache_entry_)); + + // Inline constants into the function body (if there is an IR body). + if (!(*uda_fn)->isDeclaration()) { + // TODO: IMPALA-4785: we should also replace references to GetIntermediateType() + // with constants. + codegen->InlineConstFnAttrs(GetOutputTypeDesc(), arg_type_descs_, *uda_fn); + *uda_fn = codegen->FinalizeFunction(*uda_fn); + if (*uda_fn == nullptr) { + return Status(TErrorCode::UDF_VERIFY_FAILED, symbol, fn_.hdfs_location); + } + } + return Status::OK(); +} + +void AggFn::Close() { + // This also closes all the input expressions. + Expr::Close(); +} + +void AggFn::Close(const vector<AggFn*>& exprs) { + for (AggFn* expr : exprs) expr->Close(); +} + +string AggFn::DebugString() const { + stringstream out; + out << "AggFn(op=" << agg_op_; + for (ScalarExpr* input_expr : children()) { + out << " " << input_expr->DebugString() << ")"; + } + out << ")"; + return out.str(); +} + +string AggFn::DebugString(const vector<AggFn*>& agg_fns) { + stringstream out; + out << "["; + for (int i = 0; i < agg_fns.size(); ++i) { + out << (i == 0 ? "" : " ") << agg_fns[i]->DebugString(); + } + out << "]"; + return out.str(); +} + +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/agg-fn.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/agg-fn.h b/be/src/exprs/agg-fn.h new file mode 100644 index 0000000..9c8dbe1 --- /dev/null +++ b/be/src/exprs/agg-fn.h @@ -0,0 +1,180 @@ + // Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#ifndef IMPALA_EXPRS_AGG_FN_H +#define IMPALA_EXPRS_AGG_FN_H + +#include "exprs/expr.h" +#include "runtime/descriptors.h" +#include "udf/udf.h" + +namespace impala { + +using impala_udf::FunctionContext; + +class LlvmCodeGen; +class MemPool; +class MemTracker; +class ObjectPool; +class RuntimeState; +class Tuple; +class TupleRow; +class TExprNode; + +/// --- AggFn overview +/// +/// An aggregate function generates an output over a set of tuple rows. +/// An example would be AVG() which computes the average of all input rows. +/// The built-in aggregate functions such as min, max, sum, avg, ndv etc are +/// in this category. +/// +/// --- Implementation +/// +/// AggFn contains the aggregation operations, pointers to the UDAF interface functions +/// implementing various states of aggregation and the descriptors for the intermediate +/// and output values. Please see udf/udf.h for details of the UDAF interfaces. +/// +/// AggFnEvaluator is the interface for evaluating aggregate functions against input +/// tuple rows. It invokes the following functions at different phases of the aggregation: +/// +/// init_fn_ : An initialization function that initializes the aggregate value. +/// +/// update_fn_ : An update function that processes the arguments for each row in the +/// query result set and accumulates an intermediate result. For example, +/// this function might increment a counter, append to a string buffer or +/// add the input to a culmulative sum. +/// +/// merge_fn_ : A merge function that combines multiple intermediate results into a +/// single value. +/// +/// serialize_fn_: A serialization function that flattens any intermediate values +/// containing pointers, and frees any memory allocated during the init, +/// update and merge phases. +/// +/// finalize_fn_ : A finalize function that either passes through the combined result +/// unchanged, or does one final transformation. Also frees the resources +/// allocated during init, update and merge phases. +/// +/// get_value_fn_: Used by AnalyticEval node to obtain the current intermediate value. +/// +/// remove_fn_ : Used by AnalyticEval node to undo the update to the intermediate value +/// by an input row as it falls out of a sliding window. +/// +class AggFn : public Expr { + public: + + /// Override the base class' implementation. + virtual bool IsAggFn() const { return true; } + + /// Enum for some built-in aggregation ops. + enum AggregationOp { + COUNT, + MIN, + MAX, + SUM, + AVG, + NDV, + OTHER, + }; + + /// Creates and initializes an aggregate function from 'texpr' and returns it in + /// 'agg_fn'. The returned AggFn lives in the ObjectPool of 'state'. 'row_desc' is + /// the row descriptor of the input tuple row; 'intermediate_slot_desc' is the slot + /// descriptor of the intermediate value; 'output_slot_desc' is the slot descriptor + /// of the output value. On failure, returns error status and sets 'agg_fn' to NULL. + static Status Create(const TExpr& texpr, const RowDescriptor& row_desc, + const SlotDescriptor& intermediate_slot_desc, + const SlotDescriptor& output_slot_desc, RuntimeState* state, AggFn** agg_fn) + WARN_UNUSED_RESULT; + + bool is_merge() const { return is_merge_; } + AggregationOp agg_op() const { return agg_op_; } + bool is_count_star() const { return agg_op_ == COUNT && children_.empty(); } + bool is_builtin() const { return fn_.binary_type == TFunctionBinaryType::BUILTIN; } + const std::string& fn_name() const { return fn_.name.function_name; } + const ColumnType& intermediate_type() const { return intermediate_slot_desc_.type(); } + const SlotDescriptor& intermediate_slot_desc() const { return intermediate_slot_desc_; } + // Output type is the same as Expr::type(). + const SlotDescriptor& output_slot_desc() const { return output_slot_desc_; } + void* remove_fn() const { return remove_fn_; } + void* merge_or_update_fn() const { return is_merge_ ? merge_fn_ : update_fn_; } + void* serialize_fn() const { return serialize_fn_; } + void* get_value_fn() const { return get_value_fn_; } + void* finalize_fn() const { return finalize_fn_; } + bool SupportsRemove() const { return remove_fn_ != nullptr; } + bool SupportsSerialize() const { return serialize_fn_ != nullptr; } + FunctionContext::TypeDesc GetIntermediateTypeDesc() const; + FunctionContext::TypeDesc GetOutputTypeDesc() const; + const std::vector<FunctionContext::TypeDesc>& arg_type_descs() const { + return arg_type_descs_; + } + + /// Generates an IR wrapper function to call update_fn_/merge_fn_ which may either be + /// cross-compiled or loaded from an external library. The generated IR function is + /// returned in 'uda_fn'. Returns error status on failure. + /// TODO: implement codegen path for init, finalize, serialize functions etc. + Status CodegenUpdateOrMergeFunction(LlvmCodeGen* codegen, llvm::Function** uda_fn) + WARN_UNUSED_RESULT; + + /// Releases all cache entries to libCache for all nodes in the expr tree. + virtual void Close(); + static void Close(const std::vector<AggFn*>& exprs); + + virtual std::string DebugString() const; + static std::string DebugString(const std::vector<AggFn*>& exprs); + + private: + friend class Expr; + friend class AggFnEvaluator; + + /// True if this is a merging aggregation. + const bool is_merge_; + + /// Slot into which Update()/Merge()/Serialize() write their result. Not owned. + const SlotDescriptor& intermediate_slot_desc_; + + /// Slot into which Finalize() results are written. Not owned. Identical to + /// intermediate_slot_desc_ if this agg fn has the same intermediate and result type. + const SlotDescriptor& output_slot_desc_; + + /// The types of the arguments to the aggregate function. + const std::vector<FunctionContext::TypeDesc> arg_type_descs_; + + /// The aggregation operation. + AggregationOp agg_op_; + + /// Function pointers for the different phases of the aggregate function. + void* init_fn_ = nullptr; + void* update_fn_ = nullptr; + void* remove_fn_ = nullptr; + void* merge_fn_ = nullptr; + void* serialize_fn_ = nullptr; + void* get_value_fn_ = nullptr; + void* finalize_fn_ = nullptr; + + AggFn(const TExprNode& node, const SlotDescriptor& intermediate_slot_desc, + const SlotDescriptor& output_slot_desc); + + /// Initializes the AggFn and its input expressions. May load the UDAF from LibCache + /// if necessary. + virtual Status Init(const RowDescriptor& desc, RuntimeState* state) WARN_UNUSED_RESULT; +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/aggregate-functions-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc index 74badf6..c7f0658 100644 --- a/be/src/exprs/aggregate-functions-ir.cc +++ b/be/src/exprs/aggregate-functions-ir.cc @@ -34,7 +34,6 @@ #include "runtime/timestamp-value.h" #include "runtime/timestamp-value.inline.h" #include "exprs/anyval-util.h" -#include "exprs/expr.h" #include "exprs/hll-bias.h" #include "common/names.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/aggregate-functions.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/aggregate-functions.h b/be/src/exprs/aggregate-functions.h index c504468..5ebcb97 100644 --- a/be/src/exprs/aggregate-functions.h +++ b/be/src/exprs/aggregate-functions.h @@ -21,10 +21,21 @@ #include "udf/udf-internal.h" -using namespace impala_udf; - namespace impala { +using impala_udf::FunctionContext; +using impala_udf::AnyVal; +using impala_udf::BooleanVal; +using impala_udf::TinyIntVal; +using impala_udf::SmallIntVal; +using impala_udf::IntVal; +using impala_udf::BigIntVal; +using impala_udf::FloatVal; +using impala_udf::DoubleVal; +using impala_udf::TimestampVal; +using impala_udf::StringVal; +using impala_udf::DecimalVal; + /// Collection of builtin aggregate functions. Aggregate functions implement /// the various phases of the aggregation: Init(), Update(), Serialize(), Merge(), /// and Finalize(). Not all functions need to implement all of the steps and http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/anyval-util.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/anyval-util.h b/be/src/exprs/anyval-util.h index dfcf1ec..a18786e 100644 --- a/be/src/exprs/anyval-util.h +++ b/be/src/exprs/anyval-util.h @@ -27,10 +27,21 @@ #include "util/decimal-util.h" #include "util/hash-util.h" -using namespace impala_udf; - namespace impala { +using impala_udf::FunctionContext; +using impala_udf::AnyVal; +using impala_udf::BooleanVal; +using impala_udf::TinyIntVal; +using impala_udf::SmallIntVal; +using impala_udf::IntVal; +using impala_udf::BigIntVal; +using impala_udf::FloatVal; +using impala_udf::DoubleVal; +using impala_udf::TimestampVal; +using impala_udf::StringVal; +using impala_udf::DecimalVal; + class ObjectPool; /// Utilities for AnyVals @@ -274,7 +285,7 @@ class AnyValUtil { reinterpret_cast<StringVal*>(dst)); if (type.type == TYPE_VARCHAR) { StringVal* sv = reinterpret_cast<StringVal*>(dst); - DCHECK(type.len >= 0); + DCHECK_GE(type.len, 0); DCHECK_LE(sv->len, type.len); } } else { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/bit-byte-functions.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/bit-byte-functions.h b/be/src/exprs/bit-byte-functions.h index 63d4de6..e000047 100644 --- a/be/src/exprs/bit-byte-functions.h +++ b/be/src/exprs/bit-byte-functions.h @@ -20,10 +20,14 @@ #include "udf/udf.h" -using namespace impala_udf; - namespace impala { +using impala_udf::FunctionContext; +using impala_udf::TinyIntVal; +using impala_udf::SmallIntVal; +using impala_udf::IntVal; +using impala_udf::BigIntVal; + class BitByteFunctions { public: // Count number of bits set in binary representation of integer types (aka popcount)
