IMPALA-4008: Don't bake ExprContext pointers into IR code To allow genearated code to be shared across multiple fragment instances, this change removes the ExprContext pointers baked into various IR functions (e.g. AGG/PAGG/hash-table).
Change-Id: I42039eed803a39fa716b9ed647510b6440974ae5 Reviewed-on: http://gerrit.cloudera.org:8080/4390 Reviewed-by: Michael Ho <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6cc296ec Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6cc296ec Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6cc296ec Branch: refs/heads/master Commit: 6cc296ec85a4260446333f89b1f5df0d7bd1ec95 Parents: 4849e58 Author: Michael Ho <[email protected]> Authored: Fri Sep 9 00:56:46 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Tue Sep 27 20:20:17 2016 +0000 ---------------------------------------------------------------------- be/src/codegen/gen_ir_descriptions.py | 6 + be/src/codegen/llvm-codegen.cc | 4 + be/src/codegen/llvm-codegen.h | 3 + be/src/exec/aggregation-node-ir.cc | 8 + be/src/exec/aggregation-node.cc | 180 +++++++++++------- be/src/exec/aggregation-node.h | 20 +- be/src/exec/hash-table-ir.cc | 4 + be/src/exec/hash-table.cc | 50 +++-- be/src/exec/hash-table.h | 26 ++- be/src/exec/partitioned-aggregation-node-ir.cc | 8 +- be/src/exec/partitioned-aggregation-node.cc | 192 ++++++++++++-------- be/src/exec/partitioned-aggregation-node.h | 19 +- be/src/exprs/agg-fn-evaluator.h | 4 +- 13 files changed, 349 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/codegen/gen_ir_descriptions.py ---------------------------------------------------------------------- diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py index 9ba9d78..a12d73d 100755 --- a/be/src/codegen/gen_ir_descriptions.py +++ b/be/src/codegen/gen_ir_descriptions.py @@ -44,6 +44,8 @@ options, args = parser.parse_args() ir_functions = [ ["AGG_NODE_PROCESS_ROW_BATCH_WITH_GROUPING", "ProcessRowBatchWithGrouping"], ["AGG_NODE_PROCESS_ROW_BATCH_NO_GROUPING", "ProcessRowBatchNoGrouping"], + ["AGG_NODE_GET_EXPR_CTX", "GetAggExprCtx"], + ["AGG_NODE_GET_FN_CTX", "GetAggFnCtx"], ["PART_AGG_NODE_PROCESS_BATCH_UNAGGREGATED", "PartitionedAggregationNode12ProcessBatchILb0"], ["PART_AGG_NODE_PROCESS_BATCH_AGGREGATED", @@ -52,6 +54,8 @@ ir_functions = [ "PartitionedAggregationNode22ProcessBatchNoGrouping"], ["PART_AGG_NODE_PROCESS_BATCH_STREAMING", "PartitionedAggregationNode21ProcessBatchStreaming"], + ["PART_AGG_NODE_GET_EXPR_CTX", + "PartitionedAggregationNode17GetAggExprContext"], ["AVG_UPDATE_BIGINT", "9AvgUpdateIN10impala_udf9BigIntVal"], ["AVG_UPDATE_DOUBLE", "9AvgUpdateIN10impala_udf9DoubleVal"], ["AVG_UPDATE_TIMESTAMP", "TimestampAvgUpdate"], @@ -89,6 +93,8 @@ ir_functions = [ ["PHJ_PROCESS_PROBE_BATCH_FULL_OUTER_JOIN", "ProcessProbeBatchILi8"], ["PHJ_INSERT_BATCH", "9Partition11InsertBatch"], ["HASH_TABLE_GET_HASH_SEED", "GetHashSeed"], + ["HASH_TABLE_GET_BUILD_EXPR_CTX", "HashTableCtx15GetBuildExprCtx"], + ["HASH_TABLE_GET_PROBE_EXPR_CTX", "HashTableCtx15GetProbeExprCtx"], ["HLL_UPDATE_BOOLEAN", "HllUpdateIN10impala_udf10BooleanVal"], ["HLL_UPDATE_TINYINT", "HllUpdateIN10impala_udf10TinyIntVal"], ["HLL_UPDATE_SMALLINT", "HllUpdateIN10impala_udf11SmallIntVal"], http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/codegen/llvm-codegen.cc ---------------------------------------------------------------------- diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc index b2a3b18..b107c51 100644 --- a/be/src/codegen/llvm-codegen.cc +++ b/be/src/codegen/llvm-codegen.cc @@ -581,6 +581,10 @@ PointerType* LlvmCodeGen::GetPtrType(Type* type) { return PointerType::get(type, 0); } +PointerType* LlvmCodeGen::GetPtrPtrType(Type* type) { + return PointerType::get(PointerType::get(type, 0), 0); +} + // Llvm doesn't let you create a PointerValue from a c-side ptr. Instead // cast it to an int and then to 'type'. Value* LlvmCodeGen::CastPtrToLlvmPtr(Type* type, const void* ptr) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/codegen/llvm-codegen.h ---------------------------------------------------------------------- diff --git a/be/src/codegen/llvm-codegen.h b/be/src/codegen/llvm-codegen.h index fa9f1b1..2ef936f 100644 --- a/be/src/codegen/llvm-codegen.h +++ b/be/src/codegen/llvm-codegen.h @@ -218,6 +218,9 @@ class LlvmCodeGen { /// Return a pointer type to 'type' llvm::PointerType* GetPtrType(llvm::Type* type); + /// Return a pointer to pointer type to 'type'. + llvm::PointerType* GetPtrPtrType(llvm::Type* type); + /// Returns llvm type for the column type llvm::Type* GetType(const ColumnType& type); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/exec/aggregation-node-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/aggregation-node-ir.cc b/be/src/exec/aggregation-node-ir.cc index 185196a..8050b58 100644 --- a/be/src/exec/aggregation-node-ir.cc +++ b/be/src/exec/aggregation-node-ir.cc @@ -28,6 +28,14 @@ using namespace impala; // Functions in this file are cross compiled to IR with clang. These functions // are modified at runtime with a query specific codegen'd UpdateAggTuple +FunctionContext* AggregationNode::GetAggFnCtx(int i) const { + return agg_fn_ctxs_[i]; +} + +ExprContext* AggregationNode::GetAggExprCtx(int i) const { + return agg_expr_ctxs_[i]; +} + void AggregationNode::ProcessRowBatchNoGrouping(RowBatch* batch) { for (int i = 0; i < batch->num_rows(); ++i) { UpdateTuple(singleton_intermediate_tuple_, batch->GetRow(i)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/exec/aggregation-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc index 2b9550a..909d42b 100644 --- a/be/src/exec/aggregation-node.cc +++ b/be/src/exec/aggregation-node.cc @@ -81,6 +81,18 @@ Status AggregationNode::Init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(AggFnEvaluator::Create( pool_, tnode.agg_node.aggregate_functions[i], &evaluator)); aggregate_evaluators_.push_back(evaluator); + ExprContext* agg_expr_ctx; + if (evaluator->input_expr_ctxs().size() == 1) { + agg_expr_ctx = evaluator->input_expr_ctxs()[0]; + } else { + // CodegenUpdateSlot() can only support aggregate operator with only one ExprContext + // so it doesn't support operator such as group_concat. There are also aggregate + // operators with no ExprContext (e.g. count(*)). In cases above, 'agg_expr_ctxs_' + // will contain NULL for that entry. + DCHECK(evaluator->agg_op() == AggFnEvaluator::OTHER || evaluator->is_count_star()); + agg_expr_ctx = NULL; + } + agg_expr_ctxs_.push_back(agg_expr_ctx); } return Status::OK(); } @@ -302,6 +314,7 @@ void AggregationNode::Close(RuntimeState* state) { if (tuple_pool_.get() != NULL) tuple_pool_->FreeAll(); if (hash_tbl_.get() != NULL) hash_tbl_->Close(); + agg_expr_ctxs_.clear(); DCHECK(agg_fn_ctxs_.empty() || aggregate_evaluators_.size() == agg_fn_ctxs_.size()); for (int i = 0; i < aggregate_evaluators_.size(); ++i) { aggregate_evaluators_[i]->Close(state); @@ -432,23 +445,26 @@ IRFunction::Type GetHllUpdateFunction2(const ColumnType& type) { } // IR Generation for updating a single aggregation slot. Signature is: -// void UpdateSlot(FunctionContext* fn_ctx, AggTuple* agg_tuple, char** row) +// void UpdateSlot(FunctionContext* fn_ctx, ExprContext* expr_ctx, +// AggTuple* agg_tuple, char** row) // // The IR for sum(double_col) is: // define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %fn_ctx, -// { i8, double }* %agg_tuple, -// %"class.impala::TupleRow"* %row) #20 { +// %"class.impala::ExprContext"* %expr_ctx, +// { i8, [7 x i8], double }* %agg_tuple, +// %"class.impala::TupleRow"* %row) #34 { // entry: -// %src = call { i8, double } @GetSlotRef(%"class.impala::ExprContext"* inttoptr -// (i64 128241264 to %"class.impala::ExprContext"*), %"class.impala::TupleRow"* %row) +// %src = call { i8, double } @GetSlotRef(%"class.impala::ExprContext"* %expr_ctx, +// %"class.impala::TupleRow"* %row) // %0 = extractvalue { i8, double } %src, 0 // %is_null = trunc i8 %0 to i1 // br i1 %is_null, label %ret, label %src_not_null // // src_not_null: ; preds = %entry -// %dst_slot_ptr = getelementptr inbounds { i8, double }* %agg_tuple, i32 0, i32 1 -// call void @SetNotNull({ i8, double }* %agg_tuple) -// %dst_val = load double* %dst_slot_ptr +// %dst_slot_ptr = getelementptr inbounds { i8, [7 x i8], double }, +// { i8, [7 x i8], double }* %agg_tuple, i32 0, i32 2 +// call void @SetNotNull({ i8, [7 x i8], double }* %agg_tuple) +// %dst_val = load double, double* %dst_slot_ptr // %val = extractvalue { i8, double } %src, 1 // %1 = fadd double %dst_val, %val // store double %1, double* %dst_slot_ptr @@ -460,25 +476,27 @@ IRFunction::Type GetHllUpdateFunction2(const ColumnType& type) { // // The IR for ndv(double_col) is: // define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %fn_ctx, -// { i8, %"struct.impala::StringValue" }* %agg_tuple, -// %"class.impala::TupleRow"* %row) #20 { +// %"class.impala::ExprContext"* %expr_ctx, +// { i8, [7 x i8], %"struct.impala::StringValue" }* %agg_tuple, +// %"class.impala::TupleRow"* %row) #34 { // entry: // %dst_lowered_ptr = alloca { i64, i8* } -// %src_lowered_ptr = alloca { i8, double } -// %src = call { i8, double } @GetSlotRef(%"class.impala::ExprContext"* inttoptr -// (i64 120530832 to %"class.impala::ExprContext"*), %"class.impala::TupleRow"* %row) -// %0 = extractvalue { i8, double } %src, 0 -// %is_null = trunc i8 %0 to i1 +// %src_lowered_ptr = alloca { i64, i8* } +// %src = call { i64, i8* } @GetSlotRef(%"class.impala::ExprContext"* %expr_ctx, +// %"class.impala::TupleRow"* %row) +// %0 = extractvalue { i64, i8* } %src, 0 +// %is_null = trunc i64 %0 to i1 // br i1 %is_null, label %ret, label %src_not_null // // src_not_null: ; preds = %entry -// %dst_slot_ptr = getelementptr inbounds -// { i8, %"struct.impala::StringValue" }* %agg_tuple, i32 0, i32 1 -// call void @SetNotNull({ i8, %"struct.impala::StringValue" }* %agg_tuple) -// %dst_val = load %"struct.impala::StringValue"* %dst_slot_ptr -// store { i8, double } %src, { i8, double }* %src_lowered_ptr -// %src_unlowered_ptr = bitcast { i8, double }* %src_lowered_ptr -// to %"struct.impala_udf::DoubleVal"* +// %dst_slot_ptr = getelementptr inbounds { i8, [7 x i8], %"struct.impala::StringValue" }, +// { i8, [7 x i8], %"struct.impala::StringValue" }* %agg_tuple, i32 0, i32 2 +// call void @SetNotNull({ i8, [7 x i8], %"struct.impala::StringValue" }* %agg_tuple) +// %dst_val = +// load %"struct.impala::StringValue", %"struct.impala::StringValue"* %dst_slot_ptr +// store { i64, i8* } %src, { i64, i8* }* %src_lowered_ptr +// %src_unlowered_ptr = +// bitcast { i64, i8* }* %src_lowered_ptr to %"struct.impala_udf::StringVal"* // %ptr = extractvalue %"struct.impala::StringValue" %dst_val, 0 // %dst_stringval = insertvalue { i64, i8* } zeroinitializer, i8* %ptr, 1 // %len = extractvalue %"struct.impala::StringValue" %dst_val, 1 @@ -489,18 +507,18 @@ IRFunction::Type GetHllUpdateFunction2(const ColumnType& type) { // %5 = or i64 %4, %3 // %dst_stringval1 = insertvalue { i64, i8* } %dst_stringval, i64 %5, 0 // store { i64, i8* } %dst_stringval1, { i64, i8* }* %dst_lowered_ptr -// %dst_unlowered_ptr = bitcast { i64, i8* }* %dst_lowered_ptr -// to %"struct.impala_udf::StringVal"* -// call void @HllUpdate(%"class.impala_udf::FunctionContext"* %fn_ctx, -// %"struct.impala_udf::DoubleVal"* %src_unlowered_ptr, -// %"struct.impala_udf::StringVal"* %dst_unlowered_ptr) -// %anyval_result = load { i64, i8* }* %dst_lowered_ptr -// %6 = extractvalue { i64, i8* } %anyval_result, 1 -// %7 = insertvalue %"struct.impala::StringValue" zeroinitializer, i8* %6, 0 -// %8 = extractvalue { i64, i8* } %anyval_result, 0 -// %9 = ashr i64 %8, 32 -// %10 = trunc i64 %9 to i32 -// %11 = insertvalue %"struct.impala::StringValue" %7, i32 %10, 1 +// %dst_unlowered_ptr = +// bitcast { i64, i8* }* %dst_lowered_ptr to %"struct.impala_udf::StringVal"* +// call void @HllMerge(%"class.impala_udf::FunctionContext"* %fn_ctx, +// %"struct.impala_udf::StringVal"* %src_unlowered_ptr, +// %"struct.impala_udf::StringVal"* %dst_unlowered_ptr) +// %anyval_result = load { i64, i8* }, { i64, i8* }* %dst_lowered_ptr +// %6 = extractvalue { i64, i8* } %anyval_result, 0 +// %7 = ashr i64 %6, 32 +// %8 = trunc i64 %7 to i32 +// %9 = insertvalue %"struct.impala::StringValue" zeroinitializer, i32 %8, 1 +// %10 = extractvalue { i64, i8* } %anyval_result, 1 +// %11 = insertvalue %"struct.impala::StringValue" %9, i8* %10, 0 // store %"struct.impala::StringValue" %11, %"struct.impala::StringValue"* %dst_slot_ptr // br label %ret // @@ -512,6 +530,8 @@ llvm::Function* AggregationNode::CodegenUpdateSlot( LlvmCodeGen* codegen; if (!state->GetCodegen(&codegen).ok()) return NULL; + // TODO: Fix this DCHECK and Init() once CodegenUpdateSlot() can handle AggFnEvaluator + // with multiple input expressions (e.g. group_concat). DCHECK_EQ(evaluator->input_expr_ctxs().size(), 1); ExprContext* input_expr_ctx = evaluator->input_expr_ctxs()[0]; Expr* input_expr = input_expr_ctx->root(); @@ -525,33 +545,34 @@ llvm::Function* AggregationNode::CodegenUpdateSlot( } DCHECK(agg_expr_fn != NULL); - PointerType* fn_ctx_type = + PointerType* fn_ctx_ptr_type = codegen->GetPtrType(FunctionContextImpl::LLVM_FUNCTIONCONTEXT_NAME); + PointerType* expr_ctx_ptr_type = codegen->GetPtrType(ExprContext::LLVM_CLASS_NAME); StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen); - PointerType* tuple_ptr_type = PointerType::get(tuple_struct, 0); + PointerType* tuple_ptr_type = codegen->GetPtrType(tuple_struct); PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME); // Create UpdateSlot prototype LlvmCodeGen::FnPrototype prototype(codegen, "UpdateSlot", codegen->void_type()); - prototype.AddArgument(LlvmCodeGen::NamedVariable("fn_ctx", fn_ctx_type)); + prototype.AddArgument(LlvmCodeGen::NamedVariable("fn_ctx", fn_ctx_ptr_type)); + prototype.AddArgument(LlvmCodeGen::NamedVariable("expr_ctx", expr_ctx_ptr_type)); prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_tuple", tuple_ptr_type)); prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type)); LlvmCodeGen::LlvmBuilder builder(codegen->context()); - Value* args[3]; + Value* args[4]; Function* fn = prototype.GeneratePrototype(&builder, &args[0]); Value* fn_ctx_arg = args[0]; - Value* agg_tuple_arg = args[1]; - Value* row_arg = args[2]; + Value* expr_ctx_arg = args[1]; + Value* agg_tuple_arg = args[2]; + Value* row_arg = args[3]; BasicBlock* src_not_null_block = BasicBlock::Create(codegen->context(), "src_not_null", fn); BasicBlock* ret_block = BasicBlock::Create(codegen->context(), "ret", fn); // Call expr function to get src slot value - Value* ctx_arg = codegen->CastPtrToLlvmPtr( - codegen->GetPtrType(ExprContext::LLVM_CLASS_NAME), input_expr_ctx); - Value* agg_expr_fn_args[] = { ctx_arg, row_arg }; + Value* agg_expr_fn_args[] = { expr_ctx_arg, row_arg }; CodegenAnyVal src = CodegenAnyVal::CreateCallWrapped( codegen, &builder, input_expr->type(), agg_expr_fn, agg_expr_fn_args, "src"); @@ -660,23 +681,38 @@ llvm::Function* AggregationNode::CodegenUpdateSlot( // For the query: // select count(*), count(int_col), sum(double_col) the IR looks like: // +// ; Function Attrs: alwaysinline // define void @UpdateTuple(%"class.impala::AggregationNode"* %this_ptr, // %"class.impala::Tuple"* %agg_tuple, -// %"class.impala::TupleRow"* %tuple_row) #20 { +// %"class.impala::TupleRow"* %tuple_row) #34 { // entry: -// %tuple = bitcast %"class.impala::Tuple"* %agg_tuple to { i8, i64, i64, double }* -// %src_slot = getelementptr inbounds { i8, i64, i64, double }* %tuple, i32 0, i32 1 -// %count_star_val = load i64* %src_slot +// %tuple = +// bitcast %"class.impala::Tuple"* %agg_tuple to { i8, [7 x i8], i64, i64, double }* +// %src_slot = getelementptr inbounds { i8, [7 x i8], i64, i64, double }, +// { i8, [7 x i8], i64, i64, double }* %tuple, i32 0, i32 2 +// %count_star_val = load i64, i64* %src_slot // %count_star_inc = add i64 %count_star_val, 1 // store i64 %count_star_inc, i64* %src_slot -// call void @UpdateSlot(%"class.impala_udf::FunctionContext"* inttoptr -// (i64 44521296 to %"class.impala_udf::FunctionContext"*), -// { i8, i64, i64, double }* %tuple, +// %0 = call %"class.impala_udf::FunctionContext"* +// @_ZNK6impala15AggregationNode11GetAggFnCtxEi( +// %"class.impala::AggregationNode"* %this_ptr, i32 1) +// %1 = call %"class.impala::ExprContext"* +// @_ZNK6impala15AggregationNode13GetAggExprCtxEi( +// %"class.impala::AggregationNode"* %this_ptr, i32 1) +// call void @UpdateSlot(%"class.impala_udf::FunctionContext"* %0, +// %"class.impala::ExprContext"* %1, +// { i8, [7 x i8], i64, i64, double }* %tuple, // %"class.impala::TupleRow"* %tuple_row) -// call void @UpdateSlot5(%"class.impala_udf::FunctionContext"* inttoptr -// (i64 44521328 to %"class.impala_udf::FunctionContext"*), -// { i8, i64, i64, double }* %tuple, -// %"class.impala::TupleRow"* %tuple_row) +// %2 = call %"class.impala_udf::FunctionContext"* +// @_ZNK6impala15AggregationNode11GetAggFnCtxEi( +// %"class.impala::AggregationNode"* %this_ptr, i32 2) +// %3 = call %"class.impala::ExprContext"* +// @_ZNK6impala15AggregationNode13GetAggExprCtxEi( +// %"class.impala::AggregationNode"* %this_ptr, i32 2) +// call void @UpdateSlot.3(%"class.impala_udf::FunctionContext"* %2, +// %"class.impala::ExprContext"* %3, +// { i8, [7 x i8], i64, i64, double }* %tuple, +// %"class.impala::TupleRow"* %tuple_row) // ret void // } Function* AggregationNode::CodegenUpdateTuple(RuntimeState* state) { @@ -721,12 +757,13 @@ Function* AggregationNode::CodegenUpdateTuple(RuntimeState* state) { DCHECK(agg_tuple_type != NULL); DCHECK(tuple_row_type != NULL); - PointerType* agg_node_ptr_type = PointerType::get(agg_node_type, 0); - PointerType* agg_tuple_ptr_type = PointerType::get(agg_tuple_type, 0); - PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0); + PointerType* agg_node_ptr_type = codegen->GetPtrType(agg_node_type); + PointerType* agg_tuple_ptr_type = codegen->GetPtrType(agg_tuple_type); + PointerType* tuple_row_ptr_type = codegen->GetPtrType(tuple_row_type); // Signature for UpdateTuple is - // void UpdateTuple(AggregationNode* this, Tuple* tuple, TupleRow* row) + // void UpdateTuple(AggregationNode* this, FunctionContext** fn_ctx, + // ExprContext** expr_ctx, Tuple* tuple, TupleRow* row) // This signature needs to match the non-codegen'd signature exactly. StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen); PointerType* tuple_ptr = PointerType::get(tuple_struct, 0); @@ -741,7 +778,15 @@ Function* AggregationNode::CodegenUpdateTuple(RuntimeState* state) { // Cast the parameter types to the internal llvm runtime types. // TODO: get rid of this by using right type in function signature - args[1] = builder.CreateBitCast(args[1], tuple_ptr, "tuple"); + Value* this_arg = args[0]; + Value* agg_tuple_arg = builder.CreateBitCast(args[1], tuple_ptr, "tuple"); + Value* row_arg = args[2]; + + Function* get_fn_ctx_fn = codegen->GetFunction(IRFunction::AGG_NODE_GET_FN_CTX, false); + DCHECK(get_fn_ctx_fn != NULL); + Function* get_expr_ctx_fn = + codegen->GetFunction(IRFunction::AGG_NODE_GET_EXPR_CTX, false); + DCHECK(get_expr_ctx_fn != NULL); // Loop over each expr and generate the IR for that slot. If the expr is not // count(*), generate a helper IR function to update the slot and call that. @@ -754,18 +799,23 @@ Function* AggregationNode::CodegenUpdateTuple(RuntimeState* state) { // increment the slot by the number of rows in the batch. int field_idx = slot_desc->llvm_field_idx(); Value* const_one = codegen->GetIntConstant(TYPE_BIGINT, 1); - Value* slot_ptr = builder.CreateStructGEP(NULL, args[1], field_idx, "src_slot"); + Value* slot_ptr = builder.CreateStructGEP(NULL, agg_tuple_arg, field_idx, + "src_slot"); Value* slot_loaded = builder.CreateLoad(slot_ptr, "count_star_val"); Value* count_inc = builder.CreateAdd(slot_loaded, const_one, "count_star_inc"); builder.CreateStore(count_inc, slot_ptr); } else { Function* update_slot_fn = CodegenUpdateSlot(state, evaluator, slot_desc); if (update_slot_fn == NULL) return NULL; - Value* fn_ctx_arg = codegen->CastPtrToLlvmPtr( - codegen->GetPtrType(FunctionContextImpl::LLVM_FUNCTIONCONTEXT_NAME), - agg_fn_ctxs_[i]); - builder.CreateCall(update_slot_fn, - ArrayRef<Value*>({fn_ctx_arg, args[1], args[2]})); + // Call GetAggFnCtx() to get the function context. + Value* get_fn_ctx_args[] = { this_arg, codegen->GetIntConstant(TYPE_INT, i) }; + Value* fn_ctx = builder.CreateCall(get_fn_ctx_fn, get_fn_ctx_args); + // Call GetAggExprCtx() to get the expression context. + DCHECK(agg_expr_ctxs_[i] != NULL); + Value* get_expr_ctx_args[] = { this_arg, codegen->GetIntConstant(TYPE_INT, i) }; + Value* expr_ctx = builder.CreateCall(get_expr_ctx_fn, get_expr_ctx_args); + Value* update_slot_args[] = { fn_ctx, expr_ctx, agg_tuple_arg, row_arg }; + builder.CreateCall(update_slot_fn, update_slot_args); } } builder.CreateRetVoid(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/exec/aggregation-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/aggregation-node.h b/be/src/exec/aggregation-node.h index eaaf97c..5d87d82 100644 --- a/be/src/exec/aggregation-node.h +++ b/be/src/exec/aggregation-node.h @@ -69,12 +69,20 @@ class AggregationNode : public ExecNode { boost::scoped_ptr<OldHashTable> hash_tbl_; OldHashTable::Iterator output_iterator_; + /// The list of all aggregate operations for this exec node. std::vector<AggFnEvaluator*> aggregate_evaluators_; - /// FunctionContext for each agg fn and backing pool. + /// FunctionContexts and backing MemPools of 'aggregate_evaluators_'. + /// FunctionContexts objects are stored in ObjectPool of RuntimeState. std::vector<impala_udf::FunctionContext*> agg_fn_ctxs_; boost::scoped_ptr<MemPool> agg_fn_pool_; + /// Cache of the ExprContexts of 'aggregate_evaluators_'. Used in the codegen'ed + /// version of UpdateTuple() to avoid loading aggregate_evaluators_[i] at runtime. + /// An entry is NULL if the aggregate evaluator is not codegen'ed or there is no + /// Expr in the aggregate evaluator (e.g. count(*)). + std::vector<ExprContext*> agg_expr_ctxs_; + /// Exprs used to evaluate input rows std::vector<ExprContext*> probe_expr_ctxs_; /// Exprs used to insert constructed aggregation tuple into the hash table. @@ -124,7 +132,7 @@ class AggregationNode : public ExecNode { Tuple* ConstructIntermediateTuple(); /// Updates the aggregation intermediate tuple 'tuple' with aggregation values - /// computed over 'row'. + /// computed over 'row'. This function is replaced by codegen. void UpdateTuple(Tuple* tuple, TupleRow* row); /// Called on the intermediate tuple of each group after all input rows have been @@ -135,6 +143,14 @@ class AggregationNode : public ExecNode { /// Returns the tuple holding the final aggregate values. Tuple* FinalizeTuple(Tuple* tuple, MemPool* pool); + /// Accessor for the function context of an AggFnEvaluator. Used only in codegen'ed + /// version of the UpdateSlot(). + FunctionContext* IR_ALWAYS_INLINE GetAggFnCtx(int i) const; + + /// Accessor for the expression context of an AggFnEvaluator. Used only in codegen'ed + /// version of the UpdateSlot(). + ExprContext* IR_ALWAYS_INLINE GetAggExprCtx(int i) const; + /// Do the aggregation for all tuple rows in the batch void ProcessRowBatchNoGrouping(RowBatch* batch); void ProcessRowBatchWithGrouping(RowBatch* batch); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/exec/hash-table-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table-ir.cc b/be/src/exec/hash-table-ir.cc index ce9c317..a702736 100644 --- a/be/src/exec/hash-table-ir.cc +++ b/be/src/exec/hash-table-ir.cc @@ -23,4 +23,8 @@ using namespace impala; uint32_t HashTableCtx::GetHashSeed() const { return seeds_[level_]; } +ExprContext* HashTableCtx::GetBuildExprCtx(int i) const { return build_expr_ctxs_[i]; } + +ExprContext* HashTableCtx::GetProbeExprCtx(int i) const { return probe_expr_ctxs_[i]; } + #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/exec/hash-table.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc index dfa700e..0d780b9 100644 --- a/be/src/exec/hash-table.cc +++ b/be/src/exec/hash-table.cc @@ -183,8 +183,8 @@ bool HashTableCtx::EvalRow(const TupleRow* row, const vector<ExprContext*>& ctxs return has_null; } -uint32_t HashTableCtx::HashVariableLenRow( - const uint8_t* expr_values, const uint8_t* expr_values_null) const { +uint32_t HashTableCtx::HashVariableLenRow(const uint8_t* expr_values, + const uint8_t* expr_values_null) const { uint32_t hash = seeds_[level_]; int var_result_offset = expr_values_cache_.var_result_offset(); // Hash the non-var length portions (if there are any) @@ -699,30 +699,36 @@ Status HashTableCtx::CodegenEvalRow(RuntimeState* state, bool build, Function** RETURN_IF_ERROR(state->GetCodegen(&codegen)); // Get types to generate function prototype - Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME); - DCHECK(tuple_row_type != NULL); - PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0); - Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME); DCHECK(this_type != NULL); - PointerType* this_ptr_type = PointerType::get(this_type, 0); + PointerType* this_ptr_type = codegen->GetPtrType(this_type); + Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME); + DCHECK(tuple_row_type != NULL); + PointerType* tuple_row_ptr_type = codegen->GetPtrType(tuple_row_type); LlvmCodeGen::FnPrototype prototype(codegen, build ? "EvalBuildRow" : "EvalProbeRow", codegen->GetType(TYPE_BOOLEAN)); prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type)); prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type)); prototype.AddArgument(LlvmCodeGen::NamedVariable("expr_values", codegen->ptr_type())); - prototype.AddArgument( - LlvmCodeGen::NamedVariable("expr_values_null", codegen->ptr_type())); + prototype.AddArgument(LlvmCodeGen::NamedVariable("expr_values_null", + codegen->ptr_type())); LLVMContext& context = codegen->context(); LlvmCodeGen::LlvmBuilder builder(context); Value* args[4]; *fn = prototype.GeneratePrototype(&builder, args); + Value* this_ptr = args[0]; Value* row = args[1]; Value* expr_values = args[2]; Value* expr_values_null = args[3]; Value* has_null = codegen->false_value(); + IRFunction::Type get_expr_ctx_fn_name = build ? + IRFunction::HASH_TABLE_GET_BUILD_EXPR_CTX : + IRFunction::HASH_TABLE_GET_PROBE_EXPR_CTX; + Function* get_expr_ctx_fn = codegen->GetFunction(get_expr_ctx_fn_name, false); + DCHECK(get_expr_ctx_fn != NULL); + for (int i = 0; i < ctxs.size(); ++i) { // TODO: refactor this to somewhere else? This is not hash table specific except for // the null handling bit and would be used for anyone that needs to materialize a @@ -748,8 +754,8 @@ Status HashTableCtx::CodegenEvalRow(RuntimeState* state, bool build, Function** status.GetDetail())); } - Value* ctx_arg = codegen->CastPtrToLlvmPtr( - codegen->GetPtrType(ExprContext::LLVM_CLASS_NAME), ctxs[i]); + Value* get_expr_ctx_args[] = { this_ptr, codegen->GetIntConstant(TYPE_INT, i) }; + Value* ctx_arg = builder.CreateCall(get_expr_ctx_fn, get_expr_ctx_args, "expr_ctx"); Value* expr_fn_args[] = { ctx_arg, row }; CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped( codegen, &builder, ctxs[i]->root()->type(), expr_fn, expr_fn_args, "result"); @@ -845,7 +851,7 @@ Status HashTableCtx::CodegenHashRow(RuntimeState* state, bool use_murmur, Functi // Get types to generate function prototype Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME); DCHECK(this_type != NULL); - PointerType* this_ptr_type = PointerType::get(this_type, 0); + PointerType* this_ptr_type = codegen->GetPtrType(this_type); LlvmCodeGen::FnPrototype prototype( codegen, (use_murmur ? "MurmurHashRow" : "HashRow"), codegen->GetType(TYPE_INT)); @@ -1050,13 +1056,13 @@ Status HashTableCtx::CodegenEquals(RuntimeState* state, bool force_null_equality LlvmCodeGen* codegen; RETURN_IF_ERROR(state->GetCodegen(&codegen)); // Get types to generate function prototype + Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME); + DCHECK(this_type != NULL); + PointerType* this_ptr_type = codegen->GetPtrType(this_type); Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME); DCHECK(tuple_row_type != NULL); - PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0); + PointerType* tuple_row_ptr_type = codegen->GetPtrType(tuple_row_type); - Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME); - DCHECK(this_type != NULL); - PointerType* this_ptr_type = PointerType::get(this_type, 0); LlvmCodeGen::FnPrototype prototype(codegen, "Equals", codegen->GetType(TYPE_BOOLEAN)); prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type)); prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type)); @@ -1068,10 +1074,15 @@ Status HashTableCtx::CodegenEquals(RuntimeState* state, bool force_null_equality LlvmCodeGen::LlvmBuilder builder(context); Value* args[4]; *fn = prototype.GeneratePrototype(&builder, args); + Value* this_ptr = args[0]; Value* row = args[1]; Value* expr_values = args[2]; Value* expr_values_null = args[3]; + Function* get_expr_ctx_fn = + codegen->GetFunction(IRFunction::HASH_TABLE_GET_BUILD_EXPR_CTX, false); + DCHECK(get_expr_ctx_fn != NULL); + BasicBlock* false_block = BasicBlock::Create(context, "false_block", *fn); for (int i = 0; i < build_expr_ctxs_.size(); ++i) { BasicBlock* null_block = BasicBlock::Create(context, "null", *fn); @@ -1088,8 +1099,11 @@ Status HashTableCtx::CodegenEquals(RuntimeState* state, bool force_null_equality status.GetDetail())); } - Value* ctx_arg = codegen->CastPtrToLlvmPtr( - codegen->GetPtrType(ExprContext::LLVM_CLASS_NAME), build_expr_ctxs_[i]); + // Load ExprContext* from 'build_expr_ctxs_'. + Value* get_expr_ctx_args[] = { this_ptr, codegen->GetIntConstant(TYPE_INT, i) }; + Value* ctx_arg = builder.CreateCall(get_expr_ctx_fn, get_expr_ctx_args, "expr_ctx"); + + // Evaluate the expression. Value* expr_fn_args[] = { ctx_arg, row }; CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(codegen, &builder, build_expr_ctxs_[i]->root()->type(), expr_fn, expr_fn_args, "result"); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/exec/hash-table.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h index 58078ad..fead1f7 100644 --- a/be/src/exec/hash-table.h +++ b/be/src/exec/hash-table.h @@ -300,7 +300,9 @@ class HashTableCtx { uint32_t ALWAYS_INLINE CurExprValuesHash() const { return *cur_expr_values_hash_; } /// Sets the hash values for the current row. - void ALWAYS_INLINE SetCurExprValuesHash(uint32_t hash) { *cur_expr_values_hash_ = hash; } + void ALWAYS_INLINE SetCurExprValuesHash(uint32_t hash) { + *cur_expr_values_hash_ = hash; + } /// Returns a pointer to the expression value at 'expr_idx' in 'expr_values'. uint8_t* ExprValuePtr(uint8_t* expr_values, int expr_idx) const; @@ -410,19 +412,19 @@ class HashTableCtx { uint32_t Hash(const void* input, int len, uint32_t hash) const; /// Evaluate 'row' over build exprs, storing values into 'expr_values' and nullness into - /// 'expr_values_null'. This will be replaced by codegen. We do not want this - /// function inlined when cross compiled because we need to be able to differentiate - /// between EvalBuildRow and EvalProbeRow by name and the build/probe exprs are baked - /// into the codegen'd function. - bool IR_NO_INLINE EvalBuildRow( - const TupleRow* row, uint8_t* expr_values, uint8_t* expr_values_null) { + /// 'expr_values_null'. This will be replaced by codegen. We do not want this function + /// inlined when cross compiled because we need to be able to differentiate between + /// EvalBuildRow and EvalProbeRow by name and the build/probe exprs are baked into the + /// codegen'd function. + bool IR_NO_INLINE EvalBuildRow(const TupleRow* row, uint8_t* expr_values, + uint8_t* expr_values_null) { return EvalRow(row, build_expr_ctxs_, expr_values, expr_values_null); } /// Evaluate 'row' over probe exprs, storing the values into 'expr_values' and nullness /// into 'expr_values_null'. This will be replaced by codegen. - bool IR_NO_INLINE EvalProbeRow( - const TupleRow* row, uint8_t* expr_values, uint8_t* expr_values_null) { + bool IR_NO_INLINE EvalProbeRow(const TupleRow* row, uint8_t* expr_values, + uint8_t* expr_values_null) { return EvalRow(row, probe_expr_ctxs_, expr_values, expr_values_null); } @@ -454,12 +456,16 @@ class HashTableCtx { } /// Cross-compiled function to access member variables used in CodegenHashRow(). - uint32_t GetHashSeed() const; + uint32_t IR_ALWAYS_INLINE GetHashSeed() const; /// Functions to be replaced by codegen to specialize the hash table. bool IR_NO_INLINE stores_nulls() const { return stores_nulls_; } bool IR_NO_INLINE finds_some_nulls() const { return finds_some_nulls_; } + /// Cross-compiled function to access the build/probe expression context. + ExprContext* IR_ALWAYS_INLINE GetBuildExprCtx(int i) const; + ExprContext* IR_ALWAYS_INLINE GetProbeExprCtx(int i) const; + const std::vector<ExprContext*>& build_expr_ctxs_; const std::vector<ExprContext*>& probe_expr_ctxs_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/exec/partitioned-aggregation-node-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc b/be/src/exec/partitioned-aggregation-node-ir.cc index 194f6c4..ed95844 100644 --- a/be/src/exec/partitioned-aggregation-node-ir.cc +++ b/be/src/exec/partitioned-aggregation-node-ir.cc @@ -26,6 +26,10 @@ using namespace impala; +ExprContext* PartitionedAggregationNode::GetAggExprContext(int i) const { + return agg_expr_ctxs_[i]; +} + Status PartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch) { Tuple* output_tuple = singleton_output_tuple_; FOREACH_ROW(batch, 0, batch_iter) { @@ -202,7 +206,7 @@ Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize, DCHECK(!process_batch_status_.ok()); return process_batch_status_; } - UpdateTuple(&agg_fn_ctxs_[0], intermediate_tuple, in_row, false); + UpdateTuple(&agg_fn_ctxs_[0], intermediate_tuple, in_row); out_batch_iterator.Get()->SetTuple(0, intermediate_tuple); out_batch_iterator.Next(); out_batch->CommitLastRow(); @@ -250,7 +254,7 @@ bool PartitionedAggregationNode::TryAddToHashTable( } } - UpdateTuple(&partition->agg_fn_ctxs[0], intermediate_tuple, in_row, false); + UpdateTuple(&partition->agg_fn_ctxs[0], intermediate_tuple, in_row); return true; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/exec/partitioned-aggregation-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc index eb5addc..ba2d9f7 100644 --- a/be/src/exec/partitioned-aggregation-node.cc +++ b/be/src/exec/partitioned-aggregation-node.cc @@ -153,6 +153,18 @@ Status PartitionedAggregationNode::Init(const TPlanNode& tnode, RuntimeState* st RETURN_IF_ERROR(AggFnEvaluator::Create( pool_, tnode.agg_node.aggregate_functions[i], &evaluator)); aggregate_evaluators_.push_back(evaluator); + ExprContext* agg_expr_ctx; + if (evaluator->input_expr_ctxs().size() == 1) { + agg_expr_ctx = evaluator->input_expr_ctxs()[0]; + } else { + // CodegenUpdateSlot() can only support aggregate operator with only one ExprContext + // so it doesn't support operator such as group_concat. There are also aggregate + // operators with no ExprContext (e.g. count(*)). In cases above, 'agg_expr_ctxs_' + // will contain NULL for that entry. + DCHECK(evaluator->agg_op() == AggFnEvaluator::OTHER || evaluator->is_count_star()); + agg_expr_ctx = NULL; + } + agg_expr_ctxs_.push_back(agg_expr_ctx); } return Status::OK(); } @@ -696,6 +708,7 @@ void PartitionedAggregationNode::Close(RuntimeState* state) { for (int i = 0; i < aggregate_evaluators_.size(); ++i) { aggregate_evaluators_[i]->Close(state); } + agg_expr_ctxs_.clear(); for (int i = 0; i < agg_fn_ctxs_.size(); ++i) { agg_fn_ctxs_[i]->impl()->Close(); } @@ -1407,23 +1420,28 @@ Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) { } // IR Generation for updating a single aggregation slot. Signature is: -// void UpdateSlot(FunctionContext* fn_ctx, AggTuple* agg_tuple, char** row) +// void UpdateSlot(FunctionContext* agg_fn_ctx, ExprContext* agg_expr_ctx, +// AggTuple* agg_tuple, char** row) // // The IR for sum(double_col) is: -// define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %fn_ctx, -// { i8, double }* %agg_tuple, -// %"class.impala::TupleRow"* %row) #20 { +// +// define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %agg_fn_ctx, +// %"class.impala::ExprContext"* %agg_expr_ctx, +// { i8, [7 x i8], double }* %agg_tuple, +// %"class.impala::TupleRow"* %row) #34 { +// // entry: -// %src = call { i8, double } @GetSlotRef(%"class.impala::ExprContext"* inttoptr -// (i64 128241264 to %"class.impala::ExprContext"*), %"class.impala::TupleRow"* %row) +// %src = call { i8, double } @GetSlotRef(%"class.impala::ExprContext"* %agg_expr_ctx, +// %"class.impala::TupleRow"* %row) // %0 = extractvalue { i8, double } %src, 0 // %is_null = trunc i8 %0 to i1 // br i1 %is_null, label %ret, label %src_not_null // // src_not_null: ; preds = %entry -// %dst_slot_ptr = getelementptr inbounds { i8, double }* %agg_tuple, i32 0, i32 1 -// call void @SetNotNull({ i8, double }* %agg_tuple) -// %dst_val = load double* %dst_slot_ptr +// %dst_slot_ptr = getelementptr inbounds { i8, [7 x i8], double }, +// { i8, [7 x i8], double }* %agg_tuple, i32 0, i32 2 +// call void @SetNotNull({ i8, [7 x i8], double }* %agg_tuple) +// %dst_val = load double, double* %dst_slot_ptr // %val = extractvalue { i8, double } %src, 1 // %1 = fadd double %dst_val, %val // store double %1, double* %dst_slot_ptr @@ -1434,48 +1452,51 @@ Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) { // } // // The IR for ndv(double_col) is: -// define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %fn_ctx, -// { i8, %"struct.impala::StringValue" }* %agg_tuple, -// %"class.impala::TupleRow"* %row) #20 { +// +// define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %agg_fn_ctx, +// %"class.impala::ExprContext"* %agg_expr_ctx, +// { i8, [7 x i8], %"struct.impala::StringValue" }* %agg_tuple, +// %"class.impala::TupleRow"* %row) #34 { // entry: // %dst_lowered_ptr = alloca { i64, i8* } // %src_lowered_ptr = alloca { i8, double } -// %src = call { i8, double } @GetSlotRef(%"class.impala::ExprContext"* inttoptr -// (i64 120530832 to %"class.impala::ExprContext"*), %"class.impala::TupleRow"* %row) +// %src = call { i8, double } @GetSlotRef(%"class.impala::ExprContext"* %agg_expr_ctx, +// %"class.impala::TupleRow"* %row) // %0 = extractvalue { i8, double } %src, 0 // %is_null = trunc i8 %0 to i1 // br i1 %is_null, label %ret, label %src_not_null // // src_not_null: ; preds = %entry -// %dst_slot_ptr = getelementptr inbounds -// { i8, %"struct.impala::StringValue" }* %agg_tuple, i32 0, i32 1 -// call void @SetNotNull({ i8, %"struct.impala::StringValue" }* %agg_tuple) -// %dst_val = load %"struct.impala::StringValue"* %dst_slot_ptr +// %dst_slot_ptr = getelementptr inbounds { i8, [7 x i8], %"struct.impala::StringValue" }, +// { i8, [7 x i8], %"struct.impala::StringValue" }* %agg_tuple, i32 0, i32 2 +// call void @SetNotNull({ i8, [7 x i8], %"struct.impala::StringValue" }* %agg_tuple) +// %dst_val = +// load %"struct.impala::StringValue", %"struct.impala::StringValue"* %dst_slot_ptr // store { i8, double } %src, { i8, double }* %src_lowered_ptr -// %src_unlowered_ptr = bitcast { i8, double }* %src_lowered_ptr -// to %"struct.impala_udf::DoubleVal"* +// %src_unlowered_ptr = +// bitcast { i8, double }* %src_lowered_ptr to %"struct.impala_udf::DoubleVal"* // %ptr = extractvalue %"struct.impala::StringValue" %dst_val, 0 -// %dst_stringval = insertvalue { i64, i8* } zeroinitializer, i8* %ptr, 1 +// %dst = insertvalue { i64, i8* } zeroinitializer, i8* %ptr, 1 // %len = extractvalue %"struct.impala::StringValue" %dst_val, 1 -// %1 = extractvalue { i64, i8* } %dst_stringval, 0 +// %1 = extractvalue { i64, i8* } %dst, 0 // %2 = zext i32 %len to i64 // %3 = shl i64 %2, 32 // %4 = and i64 %1, 4294967295 // %5 = or i64 %4, %3 -// %dst_stringval1 = insertvalue { i64, i8* } %dst_stringval, i64 %5, 0 -// store { i64, i8* } %dst_stringval1, { i64, i8* }* %dst_lowered_ptr -// %dst_unlowered_ptr = bitcast { i64, i8* }* %dst_lowered_ptr -// to %"struct.impala_udf::StringVal"* -// call void @HllUpdate(%"class.impala_udf::FunctionContext"* %fn_ctx, +// %dst1 = insertvalue { i64, i8* } %dst, i64 %5, 0 +// store { i64, i8* } %dst1, { i64, i8* }* %dst_lowered_ptr +// %dst_unlowered_ptr = +// bitcast { i64, i8* }* %dst_lowered_ptr to %"struct.impala_udf::StringVal"* +// call void @HllUpdate(%"class.impala_udf::FunctionContext"* %agg_fn_ctx, // %"struct.impala_udf::DoubleVal"* %src_unlowered_ptr, // %"struct.impala_udf::StringVal"* %dst_unlowered_ptr) -// %anyval_result = load { i64, i8* }* %dst_lowered_ptr -// %6 = extractvalue { i64, i8* } %anyval_result, 1 -// %7 = insertvalue %"struct.impala::StringValue" zeroinitializer, i8* %6, 0 -// %8 = extractvalue { i64, i8* } %anyval_result, 0 -// %9 = ashr i64 %8, 32 -// %10 = trunc i64 %9 to i32 -// %11 = insertvalue %"struct.impala::StringValue" %7, i32 %10, 1 +// %anyval_result = load { i64, i8* }, { i64, i8* }* %dst_lowered_ptr +// %6 = extractvalue { i64, i8* } %anyval_result, 0 +// %7 = ashr i64 %6, 32 +// %8 = trunc i64 %7 to i32 +// %9 = insertvalue %"struct.impala::StringValue" zeroinitializer, i32 %8, 1 +// %10 = extractvalue { i64, i8* } %anyval_result, 1 +// %11 = insertvalue %"struct.impala::StringValue" %9, i8* %10, 0 // store %"struct.impala::StringValue" %11, %"struct.impala::StringValue"* %dst_slot_ptr // br label %ret // @@ -1487,53 +1508,56 @@ Status PartitionedAggregationNode::CodegenUpdateSlot( LlvmCodeGen* codegen; RETURN_IF_ERROR(state_->GetCodegen(&codegen)); + // TODO: Fix this DCHECK and Init() once CodegenUpdateSlot() can handle AggFnEvaluator + // with multiple input expressions (e.g. group_concat). DCHECK_EQ(evaluator->input_expr_ctxs().size(), 1); - ExprContext* input_expr_ctx = evaluator->input_expr_ctxs()[0]; - Expr* input_expr = input_expr_ctx->root(); + ExprContext* agg_expr_ctx = evaluator->input_expr_ctxs()[0]; + Expr* agg_expr = agg_expr_ctx->root(); // TODO: implement timestamp - if (input_expr->type().type == TYPE_TIMESTAMP && + if (agg_expr->type().type == TYPE_TIMESTAMP && evaluator->agg_op() != AggFnEvaluator::AVG) { return Status("PartitionedAggregationNode::CodegenUpdateSlot(): timestamp input type " "NYI"); } Function* agg_expr_fn; - RETURN_IF_ERROR(input_expr->GetCodegendComputeFn(state_, &agg_expr_fn)); + RETURN_IF_ERROR(agg_expr->GetCodegendComputeFn(state_, &agg_expr_fn)); PointerType* fn_ctx_type = codegen->GetPtrType(FunctionContextImpl::LLVM_FUNCTIONCONTEXT_NAME); + PointerType* expr_ctx_type = codegen->GetPtrType(ExprContext::LLVM_CLASS_NAME); StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen); if (tuple_struct == NULL) { return Status("PartitionedAggregationNode::CodegenUpdateSlot(): failed to generate " "intermediate tuple desc"); } - PointerType* tuple_ptr_type = PointerType::get(tuple_struct, 0); + PointerType* tuple_ptr_type = codegen->GetPtrType(tuple_struct); PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME); // Create UpdateSlot prototype LlvmCodeGen::FnPrototype prototype(codegen, "UpdateSlot", codegen->void_type()); - prototype.AddArgument(LlvmCodeGen::NamedVariable("fn_ctx", fn_ctx_type)); + prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_fn_ctx", fn_ctx_type)); + prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_expr_ctx", expr_ctx_type)); prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_tuple", tuple_ptr_type)); prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type)); LlvmCodeGen::LlvmBuilder builder(codegen->context()); - Value* args[3]; + Value* args[4]; *fn = prototype.GeneratePrototype(&builder, &args[0]); - Value* fn_ctx_arg = args[0]; - Value* agg_tuple_arg = args[1]; - Value* row_arg = args[2]; + Value* agg_fn_ctx_arg = args[0]; + Value* agg_expr_ctx_arg = args[1]; + Value* agg_tuple_arg = args[2]; + Value* row_arg = args[3]; BasicBlock* src_not_null_block = BasicBlock::Create(codegen->context(), "src_not_null", *fn); BasicBlock* ret_block = BasicBlock::Create(codegen->context(), "ret", *fn); // Call expr function to get src slot value - Value* expr_ctx = codegen->CastPtrToLlvmPtr( - codegen->GetPtrType(ExprContext::LLVM_CLASS_NAME), input_expr_ctx); - Value* agg_expr_fn_args[] = { expr_ctx, row_arg }; + Value* agg_expr_fn_args[] = { agg_expr_ctx_arg, row_arg }; CodegenAnyVal src = CodegenAnyVal::CreateCallWrapped( - codegen, &builder, input_expr->type(), agg_expr_fn, agg_expr_fn_args, "src"); + codegen, &builder, agg_expr->type(), agg_expr_fn, agg_expr_fn_args, "src"); Value* src_is_null = src.GetIsNull(); builder.CreateCondBr(src_is_null, ret_block, src_not_null_block); @@ -1597,7 +1621,7 @@ Status PartitionedAggregationNode::CodegenUpdateSlot( // Clone and replace constants. ir_fn = codegen->CloneFunction(ir_fn); vector<FunctionContext::TypeDesc> arg_types; - arg_types.push_back(AnyValUtil::ColumnTypeToTypeDesc(input_expr->type())); + arg_types.push_back(AnyValUtil::ColumnTypeToTypeDesc(agg_expr->type())); Expr::InlineConstants(AnyValUtil::ColumnTypeToTypeDesc(dst_type), arg_types, codegen, ir_fn); @@ -1606,7 +1630,7 @@ Status PartitionedAggregationNode::CodegenUpdateSlot( *fn, LlvmCodeGen::NamedVariable("src_lowered_ptr", src.value()->getType())); builder.CreateStore(src.value(), src_lowered_ptr); Type* unlowered_ptr_type = - CodegenAnyVal::GetUnloweredPtrType(codegen, input_expr->type()); + CodegenAnyVal::GetUnloweredPtrType(codegen, agg_expr->type()); Value* src_unlowered_ptr = builder.CreateBitCast(src_lowered_ptr, unlowered_ptr_type, "src_unlowered_ptr"); @@ -1624,7 +1648,7 @@ Status PartitionedAggregationNode::CodegenUpdateSlot( // Call 'ir_fn' builder.CreateCall(ir_fn, - ArrayRef<Value*>({fn_ctx_arg, src_unlowered_ptr, dst_unlowered_ptr})); + ArrayRef<Value*>({agg_fn_ctx_arg, src_unlowered_ptr, dst_unlowered_ptr})); // Convert StringVal intermediate 'dst_arg' back to StringValue Value* anyval_result = builder.CreateLoad(dst_lowered_ptr, "anyval_result"); @@ -1656,28 +1680,41 @@ Status PartitionedAggregationNode::CodegenUpdateSlot( // For the query: // select count(*), count(int_col), sum(double_col) the IR looks like: // - // ; Function Attrs: alwaysinline // define void @UpdateTuple(%"class.impala::PartitionedAggregationNode"* %this_ptr, // %"class.impala_udf::FunctionContext"** %agg_fn_ctxs, // %"class.impala::Tuple"* %tuple, // %"class.impala::TupleRow"* %row, -// i1 %is_merge) #20 { +// i1 %is_merge) #34 { // entry: -// %tuple1 = bitcast %"class.impala::Tuple"* %tuple to { i8, i64, i64, double }* -// %src_slot = getelementptr inbounds { i8, i64, i64, double }* %tuple1, i32 0, i32 1 -// %count_star_val = load i64* %src_slot +// %tuple1 = +// bitcast %"class.impala::Tuple"* %tuple to { i8, [7 x i8], i64, i64, double }* +// %src_slot = getelementptr inbounds { i8, [7 x i8], i64, i64, double }, +// { i8, [7 x i8], i64, i64, double }* %tuple1, i32 0, i32 2 +// %count_star_val = load i64, i64* %src_slot // %count_star_inc = add i64 %count_star_val, 1 // store i64 %count_star_inc, i64* %src_slot -// %0 = getelementptr %"class.impala_udf::FunctionContext"** %agg_fn_ctxs, i32 1 -// %fn_ctx = load %"class.impala_udf::FunctionContext"** %0 -// call void @UpdateSlot(%"class.impala_udf::FunctionContext"* %fn_ctx, -// { i8, i64, i64, double }* %tuple1, +// %0 = getelementptr %"class.impala_udf::FunctionContext"*, +// %"class.impala_udf::FunctionContext"** %agg_fn_ctxs, i32 1 +// %agg_fn_ctx = load %"class.impala_udf::FunctionContext"*, +// %"class.impala_udf::FunctionContext"** %0 +// %1 = call %"class.impala::ExprContext"* +// @_ZNK6impala26PartitionedAggregationNode17GetAggExprContextEi( +// %"class.impala::PartitionedAggregationNode"* %this_ptr, i32 1) +// call void @UpdateSlot(%"class.impala_udf::FunctionContext"* %agg_fn_ctx, +// %"class.impala::ExprContext"* %1, +// { i8, [7 x i8], i64, i64, double }* %tuple1, // %"class.impala::TupleRow"* %row) -// %1 = getelementptr %"class.impala_udf::FunctionContext"** %agg_fn_ctxs, i32 2 -// %fn_ctx2 = load %"class.impala_udf::FunctionContext"** %1 -// call void @UpdateSlot5(%"class.impala_udf::FunctionContext"* %fn_ctx2, -// { i8, i64, i64, double }* %tuple1, +// %2 = getelementptr %"class.impala_udf::FunctionContext"*, +// %"class.impala_udf::FunctionContext"** %agg_fn_ctxs, i32 2 +// %agg_fn_ctx2 = load %"class.impala_udf::FunctionContext"*, +// %"class.impala_udf::FunctionContext"** %2 +// %3 = call %"class.impala::ExprContext"* +// @_ZNK6impala26PartitionedAggregationNode17GetAggExprContextEi( +// %"class.impala::PartitionedAggregationNode"* %this_ptr, i32 2) +// call void @UpdateSlot.3(%"class.impala_udf::FunctionContext"* %agg_fn_ctx2, +// %"class.impala::ExprContext"* %3, +// { i8, [7 x i8], i64, i64, double }* %tuple1, // %"class.impala::TupleRow"* %row) // ret void // } @@ -1726,13 +1763,13 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(Function** fn) { Type* tuple_type = codegen->GetType(Tuple::LLVM_CLASS_NAME); Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME); - PointerType* agg_node_ptr_type = agg_node_type->getPointerTo(); - PointerType* fn_ctx_ptr_ptr_type = fn_ctx_type->getPointerTo()->getPointerTo(); - PointerType* tuple_ptr_type = tuple_type->getPointerTo(); - PointerType* tuple_row_ptr_type = tuple_row_type->getPointerTo(); + PointerType* agg_node_ptr_type = codegen->GetPtrType(agg_node_type); + PointerType* fn_ctx_ptr_ptr_type = codegen->GetPtrPtrType(fn_ctx_type); + PointerType* tuple_ptr_type = codegen->GetPtrType(tuple_type); + PointerType* tuple_row_ptr_type = codegen->GetPtrType(tuple_row_type); StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen); - PointerType* tuple_ptr = PointerType::get(tuple_struct, 0); + PointerType* tuple_ptr = codegen->GetPtrType(tuple_struct); LlvmCodeGen::FnPrototype prototype(codegen, "UpdateTuple", codegen->void_type()); prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", agg_node_ptr_type)); prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_fn_ctxs", fn_ctx_ptr_ptr_type)); @@ -1743,7 +1780,7 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(Function** fn) { LlvmCodeGen::LlvmBuilder builder(codegen->context()); Value* args[5]; *fn = prototype.GeneratePrototype(&builder, &args[0]); - + Value* this_arg = args[0]; Value* agg_fn_ctxs_arg = args[1]; Value* tuple_arg = args[2]; Value* row_arg = args[3]; @@ -1752,6 +1789,10 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(Function** fn) { // TODO: get rid of this by using right type in function signature tuple_arg = builder.CreateBitCast(tuple_arg, tuple_ptr, "tuple"); + Function* get_expr_ctx_fn = + codegen->GetFunction(IRFunction::PART_AGG_NODE_GET_EXPR_CTX, false); + DCHECK(get_expr_ctx_fn != NULL); + // Loop over each expr and generate the IR for that slot. If the expr is not // count(*), generate a helper IR function to update the slot and call that. j = grouping_expr_ctxs_.size(); @@ -1770,9 +1811,14 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(Function** fn) { } else { Function* update_slot_fn; RETURN_IF_ERROR(CodegenUpdateSlot(evaluator, slot_desc, &update_slot_fn)); - Value* fn_ctx_ptr = builder.CreateConstGEP1_32(agg_fn_ctxs_arg, i); - Value* fn_ctx = builder.CreateLoad(fn_ctx_ptr, "fn_ctx"); - builder.CreateCall(update_slot_fn, ArrayRef<Value*>({fn_ctx, tuple_arg, row_arg})); + Value* agg_fn_ctx_ptr = builder.CreateConstGEP1_32(agg_fn_ctxs_arg, i); + Value* agg_fn_ctx = builder.CreateLoad(agg_fn_ctx_ptr, "agg_fn_ctx"); + // Call GetExprCtx() to get the expression context. + DCHECK(agg_expr_ctxs_[i] != NULL); + Value* get_expr_ctx_args[] = { this_arg, codegen->GetIntConstant(TYPE_INT, i) }; + Value* agg_expr_ctx = builder.CreateCall(get_expr_ctx_fn, get_expr_ctx_args); + Value* update_slot_args[] = { agg_fn_ctx, agg_expr_ctx, tuple_arg, row_arg }; + builder.CreateCall(update_slot_fn, update_slot_args); } } builder.CreateRetVoid(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/exec/partitioned-aggregation-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h index 952dcd7..c766ab2 100644 --- a/be/src/exec/partitioned-aggregation-node.h +++ b/be/src/exec/partitioned-aggregation-node.h @@ -193,11 +193,18 @@ class PartitionedAggregationNode : public ExecNode { /// are doing a streaming preaggregation. bool is_streaming_preagg_; - /// Contains any evaluators that require the serialize step. + /// True if any of the evaluators require the serialize step. bool needs_serialize_; + /// The list of all aggregate operations for this exec node. std::vector<AggFnEvaluator*> aggregate_evaluators_; + /// Cache of the ExprContexts of 'aggregate_evaluators_'. Used in the codegen'ed + /// version of UpdateTuple() to avoid loading aggregate_evaluators_[i] at runtime. + /// An entry is NULL if the aggregate evaluator is not codegen'ed or there is no Expr + /// in the aggregate evaluator (e.g. count(*)). + std::vector<ExprContext*> agg_expr_ctxs_; + /// FunctionContext for each aggregate function and backing MemPool. String data /// returned by the aggregate functions is allocated via these contexts. /// These contexts are only passed to the evaluators in the non-partitioned @@ -468,9 +475,9 @@ class PartitionedAggregationNode : public ExecNode { /// belonging to the same partition independent of whether the agg fn evaluators have /// is_merge() == true. /// This function is replaced by codegen (which is why we don't use a vector argument - /// for agg_fn_ctxs). Any var-len data is allocated from the FunctionContexts. + /// for agg_fn_ctxs).. Any var-len data is allocated from the FunctionContexts. void UpdateTuple(impala_udf::FunctionContext** agg_fn_ctxs, Tuple* tuple, TupleRow* row, - bool is_merge = false); + bool is_merge = false); /// Called on the intermediate tuple of each group after all input rows have been /// consumed and aggregated. Computes the final aggregate values to be returned in @@ -482,7 +489,7 @@ class PartitionedAggregationNode : public ExecNode { /// TODO: Coordinate the allocation of new tuples with the release of memory /// so as not to make memory consumption blow up. Tuple* GetOutputTuple(const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs, - Tuple* tuple, MemPool* pool); + Tuple* tuple, MemPool* pool); /// Do the aggregation for all tuple rows in the batch when there is no grouping. /// This function is replaced by codegen. @@ -517,6 +524,10 @@ class PartitionedAggregationNode : public ExecNode { template<bool AGGREGATED_ROWS> Status IR_ALWAYS_INLINE ProcessRow(TupleRow* row, HashTableCtx* ht_ctx); + /// Accessor for the expression context of an AggFnEvaluator. Used only in codegen'ed + /// version of UpdateTuple(). + ExprContext* IR_ALWAYS_INLINE GetAggExprContext(int i) const; + /// Create a new intermediate tuple in partition, initialized with row. ht_ctx is /// the context for the partition's hash table and hash is the precomputed hash of /// the row. The row can be an unaggregated or aggregated row depending on http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/exprs/agg-fn-evaluator.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/agg-fn-evaluator.h b/be/src/exprs/agg-fn-evaluator.h index fb39789..cde969c 100644 --- a/be/src/exprs/agg-fn-evaluator.h +++ b/be/src/exprs/agg-fn-evaluator.h @@ -189,6 +189,8 @@ class AggFnEvaluator { /// intermediate_slot_desc_ if this agg fn has the same intermediate and output type. const SlotDescriptor* output_slot_desc_; + /// Expression contexts for this AggFnEvaluator. Empty if there is no + /// expression (e.g. count(*)). std::vector<ExprContext*> input_expr_ctxs_; /// The enum for some of the builtins that still require special cased logic. @@ -270,7 +272,7 @@ inline void AggFnEvaluator::Init(const std::vector<AggFnEvaluator*>& evaluators, } } inline void AggFnEvaluator::Add(const std::vector<AggFnEvaluator*>& evaluators, - const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, Tuple* dst) { + const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, Tuple* dst) { DCHECK_EQ(evaluators.size(), fn_ctxs.size()); for (int i = 0; i < evaluators.size(); ++i) { evaluators[i]->Add(fn_ctxs[i], src, dst);
