http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/old-hash-table.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/old-hash-table.cc b/be/src/exec/old-hash-table.cc index 87b2e49..0050b39 100644 --- a/be/src/exec/old-hash-table.cc +++ b/be/src/exec/old-hash-table.cc @@ -22,8 +22,8 @@ #include "codegen/codegen-anyval.h" #include "codegen/llvm-codegen.h" -#include "exprs/expr.h" -#include "exprs/expr-context.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "exprs/slot-ref.h" #include "runtime/mem-tracker.h" #include "runtime/raw-value.inline.h" @@ -63,16 +63,15 @@ static int64_t NULL_VALUE[] = { HashUtil::FNV_SEED, HashUtil::FNV_SEED, HashUtil::FNV_SEED, HashUtil::FNV_SEED }; OldHashTable::OldHashTable(RuntimeState* state, - const vector<ExprContext*>& build_expr_ctxs, - const vector<ExprContext*>& probe_expr_ctxs, - const vector<ExprContext*>& filter_expr_ctxs, int num_build_tuples, bool stores_nulls, + const vector<ScalarExpr*>& build_exprs, const vector<ScalarExpr*>& probe_exprs, + const vector<ScalarExpr*>& filter_exprs, int num_build_tuples, bool stores_nulls, const vector<bool>& finds_nulls, int32_t initial_seed, MemTracker* mem_tracker, const vector<RuntimeFilter*>& runtime_filters, bool stores_tuples, int64_t num_buckets) : state_(state), - build_expr_ctxs_(build_expr_ctxs), - probe_expr_ctxs_(probe_expr_ctxs), - filter_expr_ctxs_(filter_expr_ctxs), + build_exprs_(build_exprs), + probe_exprs_(probe_exprs), + filter_exprs_(filter_exprs), filters_(runtime_filters), num_build_tuples_(num_build_tuples), stores_nulls_(stores_nulls), @@ -90,8 +89,8 @@ OldHashTable::OldHashTable(RuntimeState* state, mem_tracker_(mem_tracker), mem_limit_exceeded_(false) { DCHECK(mem_tracker != NULL); - DCHECK_EQ(build_expr_ctxs_.size(), probe_expr_ctxs_.size()); - DCHECK_EQ(build_expr_ctxs_.size(), finds_nulls_.size()); + DCHECK_EQ(build_exprs_.size(), probe_exprs_.size()); + DCHECK_EQ(build_exprs_.size(), finds_nulls_.size()); DCHECK_EQ((num_buckets & (num_buckets-1)), 0) << "num_buckets must be a power of 2"; buckets_.resize(num_buckets); num_buckets_ = num_buckets; @@ -99,21 +98,59 @@ OldHashTable::OldHashTable(RuntimeState* state, mem_tracker_->Consume(buckets_.capacity() * sizeof(Bucket)); // Compute the layout and buffer size to store the evaluated expr results - results_buffer_size_ = Expr::ComputeResultsLayout(build_expr_ctxs_, + results_buffer_size_ = ScalarExpr::ComputeResultsLayout(build_exprs_, &expr_values_buffer_offsets_, &var_result_begin_); expr_values_buffer_= new uint8_t[results_buffer_size_]; memset(expr_values_buffer_, 0, sizeof(uint8_t) * results_buffer_size_); - expr_value_null_bits_ = new uint8_t[build_expr_ctxs_.size()]; + expr_value_null_bits_ = new uint8_t[build_exprs_.size()]; GrowNodeArray(); } -void OldHashTable::Close() { +Status OldHashTable::Init(ObjectPool* pool, RuntimeState* state) { + RETURN_IF_ERROR(ScalarExprEvaluator::Create(build_exprs_, state, pool, + mem_pool_.get(), &build_expr_evals_)); + DCHECK_EQ(build_exprs_.size(), build_expr_evals_.size()); + RETURN_IF_ERROR(ScalarExprEvaluator::Create(probe_exprs_, state, pool, + mem_pool_.get(), &probe_expr_evals_)); + DCHECK_EQ(probe_exprs_.size(), probe_expr_evals_.size()); + RETURN_IF_ERROR(ScalarExprEvaluator::Create(filter_exprs_, state, pool, + mem_pool_.get(), &filter_expr_evals_)); + DCHECK_EQ(filter_exprs_.size(), filter_expr_evals_.size()); + return Status::OK(); +} + +Status OldHashTable::Create(ObjectPool* pool, RuntimeState* state, + const vector<ScalarExpr*>& build_exprs, const vector<ScalarExpr*>& probe_exprs, + const vector<ScalarExpr*>& filter_exprs, int num_build_tuples, bool stores_nulls, + const vector<bool>& finds_nulls, int32_t initial_seed, MemTracker* mem_tracker, + const vector<RuntimeFilter*>& runtime_filters, scoped_ptr<OldHashTable>* hash_tbl, + bool stores_tuples, int64_t num_buckets) { + hash_tbl->reset(new OldHashTable(state, build_exprs, probe_exprs, filter_exprs, + num_build_tuples, stores_nulls, finds_nulls, initial_seed, mem_tracker, + runtime_filters, stores_tuples, num_buckets)); + return (*hash_tbl)->Init(pool, state); +} + +Status OldHashTable::Open(RuntimeState* state) { + RETURN_IF_ERROR(ScalarExprEvaluator::Open(build_expr_evals_, state)); + DCHECK_EQ(build_exprs_.size(), build_expr_evals_.size()); + RETURN_IF_ERROR(ScalarExprEvaluator::Open(probe_expr_evals_, state)); + DCHECK_EQ(probe_exprs_.size(), probe_expr_evals_.size()); + RETURN_IF_ERROR(ScalarExprEvaluator::Open(filter_expr_evals_, state)); + DCHECK_EQ(filter_exprs_.size(), filter_expr_evals_.size()); + return Status::OK(); +} + +void OldHashTable::Close(RuntimeState* state) { // TODO: use tr1::array? delete[] expr_values_buffer_; delete[] expr_value_null_bits_; expr_values_buffer_ = NULL; expr_value_null_bits_ = NULL; + ScalarExprEvaluator::Close(build_expr_evals_, state); + ScalarExprEvaluator::Close(probe_expr_evals_, state); + ScalarExprEvaluator::Close(filter_expr_evals_, state); mem_pool_->FreeAll(); if (ImpaladMetrics::HASH_TABLE_TOTAL_BYTES != NULL) { ImpaladMetrics::HASH_TABLE_TOTAL_BYTES->Increment(-num_data_pages_ * HT_PAGE_SIZE); @@ -122,12 +159,18 @@ void OldHashTable::Close() { buckets_.clear(); } +void OldHashTable::FreeLocalAllocations() { + ScalarExprEvaluator::FreeLocalAllocations(build_expr_evals_); + ScalarExprEvaluator::FreeLocalAllocations(probe_expr_evals_); + ScalarExprEvaluator::FreeLocalAllocations(filter_expr_evals_); +} + bool OldHashTable::EvalRow( - TupleRow* row, const vector<ExprContext*>& ctxs) { + TupleRow* row, const vector<ScalarExprEvaluator*>& evals) { bool has_null = false; - for (int i = 0; i < ctxs.size(); ++i) { + for (int i = 0; i < evals.size(); ++i) { void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i]; - void* val = ctxs[i]->GetValue(row); + void* val = evals[i]->GetValue(row); if (val == NULL) { // If the table doesn't store nulls, no reason to keep evaluating if (!stores_nulls_) return true; @@ -138,7 +181,7 @@ bool OldHashTable::EvalRow( } else { expr_value_null_bits_[i] = false; } - RawValue::Write(val, loc, build_expr_ctxs_[i]->root()->type(), NULL); + RawValue::Write(val, loc, build_exprs_[i]->type(), NULL); } return has_null; } @@ -162,10 +205,9 @@ int OldHashTable::AddBloomFilters() { TupleRow* row = iter.GetRow(); for (int i = 0; i < filters_.size(); ++i) { if (bloom_filters[i] == NULL) continue; - void* e = filter_expr_ctxs_[i]->GetValue(row); - uint32_t h = - RawValue::GetHashValue(e, filter_expr_ctxs_[i]->root()->type(), - RuntimeFilterBank::DefaultHashSeed()); + void* e = filter_expr_evals_[i]->GetValue(row); + uint32_t h = RawValue::GetHashValue(e, filter_exprs_[i]->type(), + RuntimeFilterBank::DefaultHashSeed()); bloom_filters[i]->Insert(h); } iter.Next<false>(); @@ -239,7 +281,7 @@ static void CodegenAssignNullValue( } } -// Codegen for evaluating a tuple row over either build_expr_ctxs_ or probe_expr_ctxs_. +// Codegen for evaluating a tuple row over either build_exprs_ or probe_exprs_. // For the case where we are joining on a single int, the IR looks like // define i1 @EvaBuildRow(%"class.impala::OldHashTable"* %this_ptr, // %"class.impala::TupleRow"* %row) { @@ -267,9 +309,10 @@ static void CodegenAssignNullValue( // becomes the start of the next block for codegen (either the next expr or just the // end of the function). Function* OldHashTable::CodegenEvalTupleRow(LlvmCodeGen* codegen, bool build) { - const vector<ExprContext*>& ctxs = build ? build_expr_ctxs_ : probe_expr_ctxs_; - for (int i = 0; i < ctxs.size(); ++i) { - PrimitiveType type = ctxs[i]->root()->type().type; + DCHECK_EQ(build_exprs_.size(), probe_exprs_.size()); + const vector<ScalarExpr*>& exprs = build ? build_exprs_ : probe_exprs_; + for (int i = 0; i < exprs.size(); ++i) { + PrimitiveType type = exprs[i]->type().type; if (type == TYPE_CHAR) return NULL; } @@ -295,14 +338,20 @@ Function* OldHashTable::CodegenEvalTupleRow(LlvmCodeGen* codegen, bool build) { Value* row = args[1]; Value* has_null = codegen->false_value(); + IRFunction::Type fn_name = build ? + IRFunction::OLD_HASH_TABLE_GET_BUILD_EXPR_EVALUATORS : + IRFunction::OLD_HASH_TABLE_GET_PROBE_EXPR_EVALUATORS; + Function* get_expr_eval_fn = codegen->GetFunction(fn_name, false); + DCHECK(get_expr_eval_fn != NULL); + // Aggregation with no grouping exprs also use the hash table interface for - // code simplicity. In that case, there are no build exprs. - if (!build_expr_ctxs_.empty()) { - // Load &build_expr_ctxs_[0] / &probe_expr_ctxs_[0] - Value* ctx_array = codegen->CodegenCallFunction(&builder, build ? - IRFunction::OLD_HASH_TABLE_GET_BUILD_EXPR_CTXS : - IRFunction::OLD_HASH_TABLE_GET_PROBE_EXPR_CTXS, - this_ptr, "ctx_array"); + // code simplicity. In that case, there are no build exprs. + if (!exprs.empty()) { + // Load &build_expr_evals_[0] / &probe_expr_evals_[0] + Value* eval_vector = codegen->CodegenCallFunction(&builder, build ? + IRFunction::OLD_HASH_TABLE_GET_BUILD_EXPR_EVALUATORS : + IRFunction::OLD_HASH_TABLE_GET_PROBE_EXPR_EVALUATORS, + this_ptr, "eval_vector"); // Load expr_values_buffer_ Value* expr_values_buffer = codegen->CodegenCallFunction(&builder, @@ -314,7 +363,7 @@ Function* OldHashTable::CodegenEvalTupleRow(LlvmCodeGen* codegen, bool build) { IRFunction::OLD_HASH_TABLE_GET_EXPR_VALUE_NULL_BITS, this_ptr, "expr_value_null_bits"); - for (int i = 0; i < ctxs.size(); ++i) { + for (int i = 0; i < exprs.size(); ++i) { BasicBlock* null_block = BasicBlock::Create(context, "null", fn); BasicBlock* not_null_block = BasicBlock::Create(context, "not_null", fn); BasicBlock* continue_block = BasicBlock::Create(context, "continue", fn); @@ -323,22 +372,23 @@ Function* OldHashTable::CodegenEvalTupleRow(LlvmCodeGen* codegen, bool build) { Value* llvm_loc = builder.CreateInBoundsGEP(NULL, expr_values_buffer, codegen->GetIntConstant(TYPE_INT, expr_values_buffer_offsets_[i]), "loc_addr"); llvm_loc = builder.CreatePointerCast(llvm_loc, - codegen->GetPtrType(ctxs[i]->root()->type()), "loc"); + codegen->GetPtrType(exprs[i]->type()), "loc"); - // Codegen GetValue() for ctxs[i] + // Codegen GetValue() for exprs[i] Function* expr_fn; - Status status = ctxs[i]->root()->GetCodegendComputeFn(codegen, &expr_fn); + Status status = exprs[i]->GetCodegendComputeFn(codegen, &expr_fn); if (!status.ok()) { fn->eraseFromParent(); // deletes function VLOG_QUERY << "Failed to codegen EvalTupleRow(): " << status.GetDetail(); return NULL; } - // Load ctxs[i] and call GetValue() - Value* expr_ctx = codegen->CodegenArrayAt(&builder, ctx_array, i, "expr_ctx"); - DCHECK(expr_ctx->getType()->isPointerTy()); - CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped( - codegen, &builder, ctxs[i]->root()->type(), expr_fn, {expr_ctx, row}, "result"); + // Load evals[i] and call GetValue() + Value* eval_arg = + codegen->CodegenArrayAt(&builder, eval_vector, i, "eval"); + DCHECK(eval_arg->getType()->isPointerTy()); + CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(codegen, &builder, + exprs[i]->type(), expr_fn, {eval_arg, row}, "result"); Value* is_null = result.GetIsNull(); // Set null-byte result @@ -354,7 +404,7 @@ Function* OldHashTable::CodegenEvalTupleRow(LlvmCodeGen* codegen, bool build) { // hash table doesn't store nulls, no reason to keep evaluating exprs builder.CreateRet(codegen->true_value()); } else { - CodegenAssignNullValue(codegen, &builder, llvm_loc, ctxs[i]->root()->type()); + CodegenAssignNullValue(codegen, &builder, llvm_loc, exprs[i]->type()); has_null = codegen->true_value(); builder.CreateBr(continue_block); } @@ -378,10 +428,12 @@ uint32_t OldHashTable::HashVariableLenRow() { hash = HashUtil::Hash(expr_values_buffer_, var_result_begin_, hash); } - for (int i = 0; i < build_expr_ctxs_.size(); ++i) { + for (int i = 0; i < build_exprs_.size(); ++i) { // non-string and null slots are already part of expr_values_buffer - if (build_expr_ctxs_[i]->root()->type().type != TYPE_STRING - && build_expr_ctxs_[i]->root()->type().type != TYPE_VARCHAR) continue; + if (build_exprs_[i]->type().type != TYPE_STRING && + build_exprs_[i]->type().type != TYPE_VARCHAR) { + continue; + } void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i]; if (expr_value_null_bits_[i]) { @@ -425,9 +477,9 @@ uint32_t OldHashTable::HashVariableLenRow() { // } // TODO: can this be cross-compiled? Function* OldHashTable::CodegenHashCurrentRow(LlvmCodeGen* codegen) { - for (int i = 0; i < build_expr_ctxs_.size(); ++i) { + for (int i = 0; i < build_exprs_.size(); ++i) { // Disable codegen for CHAR - if (build_expr_ctxs_[i]->root()->type().type == TYPE_CHAR) return NULL; + if (build_exprs_[i]->type().type == TYPE_CHAR) return NULL; } // Get types to generate function prototype @@ -469,11 +521,9 @@ Function* OldHashTable::CodegenHashCurrentRow(LlvmCodeGen* codegen) { this_ptr, "expr_value_null_bits"); // Hash string slots - for (int i = 0; i < build_expr_ctxs_.size(); ++i) { - if (build_expr_ctxs_[i]->root()->type().type != TYPE_STRING && - build_expr_ctxs_[i]->root()->type().type != TYPE_VARCHAR) { - continue; - } + for (int i = 0; i < build_exprs_.size(); ++i) { + if (build_exprs_[i]->type().type != TYPE_STRING + && build_exprs_[i]->type().type != TYPE_VARCHAR) continue; BasicBlock* null_block = NULL; BasicBlock* not_null_block = NULL; @@ -543,8 +593,8 @@ Function* OldHashTable::CodegenHashCurrentRow(LlvmCodeGen* codegen) { } bool OldHashTable::Equals(TupleRow* build_row) { - for (int i = 0; i < build_expr_ctxs_.size(); ++i) { - void* val = build_expr_ctxs_[i]->GetValue(build_row); + for (int i = 0; i < build_exprs_.size(); ++i) { + void* val = build_expr_evals_[i]->GetValue(build_row); if (val == NULL) { if (!(stores_nulls_ && finds_nulls_[i])) return false; if (!expr_value_null_bits_[i]) return false; @@ -554,7 +604,7 @@ bool OldHashTable::Equals(TupleRow* build_row) { } void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i]; - if (!RawValue::Eq(loc, val, build_expr_ctxs_[i]->root()->type())) { + if (!RawValue::Eq(loc, val, build_exprs_[i]->type())) { return false; } } @@ -567,8 +617,8 @@ bool OldHashTable::Equals(TupleRow* build_row) { // define i1 @Equals(%"class.impala::OldHashTable"* %this_ptr, // %"class.impala::TupleRow"* %row) { // entry: -// %result = call i64 @GetSlotRef(%"class.impala::ExprContext"* inttoptr -// (i64 146381856 to %"class.impala::ExprContext"*), +// %result = call i64 @GetSlotRef(%"class.impala::ScalarExpr"* inttoptr +// (i64 146381856 to %"class.impala::ScalarExpr"*), // %"class.impala::TupleRow"* %row) // %0 = trunc i64 %result to i1 // br i1 %0, label %null, label %not_null @@ -588,8 +638,8 @@ bool OldHashTable::Equals(TupleRow* build_row) { // // continue: ; preds = %not_null, %null // %result4 = call { i64, i8* } @GetSlotRef1( -// %"class.impala::ExprContext"* inttoptr -// (i64 146381696 to %"class.impala::ExprContext"*), +// %"class.impala::ScalarExpr"* inttoptr +// (i64 146381696 to %"class.impala::ScalarExpr"*), // %"class.impala::TupleRow"* %row) // %4 = extractvalue { i64, i8* } %result4, 0 // %5 = trunc i64 %4 to i1 @@ -612,9 +662,9 @@ bool OldHashTable::Equals(TupleRow* build_row) { // ret i1 true // } Function* OldHashTable::CodegenEquals(LlvmCodeGen* codegen) { - for (int i = 0; i < build_expr_ctxs_.size(); ++i) { + for (int i = 0; i < build_exprs_.size(); ++i) { // Disable codegen for CHAR - if (build_expr_ctxs_[i]->root()->type().type == TYPE_CHAR) return NULL; + if (build_exprs_[i]->type().type == TYPE_CHAR) return NULL; } // Get types to generate function prototype @@ -637,12 +687,13 @@ Function* OldHashTable::CodegenEquals(LlvmCodeGen* codegen) { Value* this_ptr = args[0]; Value* row = args[1]; - if (!build_expr_ctxs_.empty()) { + if (!build_exprs_.empty()) { BasicBlock* false_block = BasicBlock::Create(context, "false_block", fn); - // Load &build_expr_ctxs_[0] - Value* ctx_vector = codegen->CodegenCallFunction(&builder, - IRFunction::OLD_HASH_TABLE_GET_BUILD_EXPR_CTXS, this_ptr, "ctx_vector"); + // Load &build_expr_evals_[0] + Value* eval_vector = codegen->CodegenCallFunction(&builder, + IRFunction::OLD_HASH_TABLE_GET_BUILD_EXPR_EVALUATORS, + this_ptr, "eval_vector"); // Load expr_values_buffer_ Value* expr_values_buffer = codegen->CodegenCallFunction(&builder, @@ -654,25 +705,25 @@ Function* OldHashTable::CodegenEquals(LlvmCodeGen* codegen) { IRFunction::OLD_HASH_TABLE_GET_EXPR_VALUE_NULL_BITS, this_ptr, "expr_value_null_bits"); - for (int i = 0; i < build_expr_ctxs_.size(); ++i) { + for (int i = 0; i < build_exprs_.size(); ++i) { BasicBlock* null_block = BasicBlock::Create(context, "null", fn); BasicBlock* not_null_block = BasicBlock::Create(context, "not_null", fn); BasicBlock* continue_block = BasicBlock::Create(context, "continue", fn); - // Generate GetValue() of build_expr_ctxs_[i] + // Generate GetValue() of build_expr_evals_[i] Function* expr_fn; - Status status = - build_expr_ctxs_[i]->root()->GetCodegendComputeFn(codegen, &expr_fn); + Status status = build_exprs_[i]->GetCodegendComputeFn(codegen, &expr_fn); if (!status.ok()) { fn->eraseFromParent(); // deletes function VLOG_QUERY << "Failed to codegen Equals(): " << status.GetDetail(); return NULL; } - // Call GetValue() on build_expr_ctxs_[i] - Value* expr_ctx = codegen->CodegenArrayAt(&builder, ctx_vector, i, "expr_ctx"); + // Call GetValue() on build_expr_evals_[i] + Value* eval_arg = + codegen->CodegenArrayAt(&builder, eval_vector, i, "eval"); CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(codegen, &builder, - build_expr_ctxs_[i]->root()->type(), expr_fn, {expr_ctx, row}, "result"); + build_exprs_[i]->type(), expr_fn, {eval_arg, row}, "result"); Value* is_null = result.GetIsNull(); // Determine if probe is null (i.e. expr_value_null_bits_[i] == true). In @@ -690,7 +741,7 @@ Function* OldHashTable::CodegenEquals(LlvmCodeGen* codegen) { Value* probe_val = builder.CreateInBoundsGEP(NULL, expr_values_buffer, codegen->GetIntConstant(TYPE_INT, expr_values_buffer_offsets_[i]), "probe_val"); probe_val = builder.CreatePointerCast( - probe_val, codegen->GetPtrType(build_expr_ctxs_[i]->root()->type())); + probe_val, codegen->GetPtrType(build_exprs_[i]->type())); // Branch for GetValue() returning NULL builder.CreateCondBr(is_null, null_block, not_null_block); @@ -703,7 +754,7 @@ Function* OldHashTable::CodegenEquals(LlvmCodeGen* codegen) { builder.SetInsertPoint(not_null_block); if (stores_nulls_) { BasicBlock* cmp_block = BasicBlock::Create(context, "cmp", fn); - // First need to compare that probe expr[i] is not null + // First need to compare that probe_expr[i] is not null builder.CreateCondBr(probe_is_null, false_block, cmp_block); builder.SetInsertPoint(cmp_block); }
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/old-hash-table.h ---------------------------------------------------------------------- diff --git a/be/src/exec/old-hash-table.h b/be/src/exec/old-hash-table.h index c5a487d..406f360 100644 --- a/be/src/exec/old-hash-table.h +++ b/be/src/exec/old-hash-table.h @@ -34,13 +34,13 @@ namespace llvm { namespace impala { -class Expr; -class ExprContext; class LlvmCodeGen; class MemTracker; class RuntimeFilter; class RowDescriptor; class RuntimeState; +class ScalarExpr; +class ScalarExprEvaluator; class Tuple; class TupleRow; @@ -95,7 +95,7 @@ class OldHashTable { /// Create a hash table. /// - build_exprs are the exprs that should be used to evaluate rows during Insert(). /// - probe_exprs are used during Find() - /// - filter_expr_ctxs are used to build runtime filters. + /// - filter_exprs are used to build runtime filters. /// - num_build_tuples: number of Tuples in the build tuple row /// - stores_nulls: if false, TupleRows with nulls are ignored during Insert /// - finds_nulls: if finds_nulls[i] is false, Find() returns End() for TupleRows with @@ -110,15 +110,24 @@ class OldHashTable { /// with '<=>' and others with '=', stores_nulls could distinguish between columns /// in which nulls are stored and columns in which they are not, which could save /// space by not storing some rows we know will never match. - OldHashTable(RuntimeState* state, const std::vector<ExprContext*>& build_expr_ctxs, - const std::vector<ExprContext*>& probe_expr_ctxs, - const std::vector<ExprContext*>& filter_expr_ctxs, int num_build_tuples, - bool stores_nulls, const std::vector<bool>& finds_nulls, int32_t initial_seed, - MemTracker* mem_tracker, const std::vector<RuntimeFilter*>& filters, - bool stores_tuples = false, int64_t num_buckets = 1024); + static Status Create(ObjectPool* pool, RuntimeState* state, + const std::vector<ScalarExpr*>& build_exprs, + const std::vector<ScalarExpr*>& probe_exprs, + const std::vector<ScalarExpr*>& filter_exprs, + int num_build_tuples, bool stores_nulls, + const std::vector<bool>& finds_nulls, int32_t initial_seed, + MemTracker* mem_tracker, const std::vector<RuntimeFilter*>& runtime_filters, + boost::scoped_ptr<OldHashTable>* hash_tbl_, bool stores_tuples = false, + int64_t num_buckets = 1024); + + /// Initializes the evaluators for build, probe and filter expressions. + Status Open(RuntimeState* state); /// Call to cleanup any resources. Must be called once. - void Close(); + void Close(RuntimeState* state); + + /// Frees local allocations made by expression evaluators. + void FreeLocalAllocations(); /// Insert row into the hash table. Row will be evaluated over build exprs. /// This will grow the hash table if necessary. @@ -351,13 +360,23 @@ class OldHashTable { Bucket() : node(NULL) { } }; - /// Simple wrappers to return various fields in this class. They are done to avoid - /// the need to make assumption about the order of declaration of these fields when - /// generating the handcrafted IR. + /// Use Create() instead. + OldHashTable(RuntimeState* state, const std::vector<ScalarExpr*>& build_exprs, + const std::vector<ScalarExpr*>& probe_exprs, + const std::vector<ScalarExpr*>& filter_exprs, int num_build_tuples, + bool stores_nulls, const std::vector<bool>& finds_nulls, int32_t initial_seed, + MemTracker* mem_tracker, const std::vector<RuntimeFilter*>& filters, + bool stores_tuples, int64_t num_buckets); + + Status Init(ObjectPool* pool, RuntimeState* state); + + /// Simple wrappers to return various fields in this class. These functions are + /// cross-compiled and they exist to avoid the need to make assumption about the + /// order of declaration of these fields when generating the handcrafted IR. uint8_t* IR_ALWAYS_INLINE expr_values_buffer() const; uint8_t* IR_ALWAYS_INLINE expr_value_null_bits() const; - ExprContext* const* IR_ALWAYS_INLINE build_expr_ctxs() const; - ExprContext* const* IR_ALWAYS_INLINE probe_expr_ctxs() const; + ScalarExprEvaluator* const* IR_ALWAYS_INLINE build_expr_evals() const; + ScalarExprEvaluator* const* IR_ALWAYS_INLINE probe_expr_evals() const; /// Returns the next non-empty bucket and updates idx to be the index of that bucket. /// If there are no more buckets, returns NULL and sets idx to -1 @@ -381,20 +400,20 @@ class OldHashTable { /// Evaluate the exprs over row and cache the results in 'expr_values_buffer_'. /// Returns whether any expr evaluated to NULL /// This will be replaced by codegen - bool EvalRow(TupleRow* row, const std::vector<ExprContext*>& ctxs); + bool EvalRow(TupleRow* row, const std::vector<ScalarExprEvaluator*>& evals); /// Evaluate 'row' over build exprs caching the results in 'expr_values_buffer_' This /// will be replaced by codegen. We do not want this function inlined when cross /// compiled because we need to be able to differentiate between EvalBuildRow and /// EvalProbeRow by name and the build/probe exprs are baked into the codegen'd function. bool IR_NO_INLINE EvalBuildRow(TupleRow* row) { - return EvalRow(row, build_expr_ctxs_); + return EvalRow(row, build_expr_evals_); } /// Evaluate 'row' over probe exprs caching the results in 'expr_values_buffer_' /// This will be replaced by codegen. bool IR_NO_INLINE EvalProbeRow(TupleRow* row) { - return EvalRow(row, probe_expr_ctxs_); + return EvalRow(row, probe_expr_evals_); } /// Compute the hash of the values in expr_values_buffer_. @@ -441,12 +460,18 @@ class OldHashTable { RuntimeState* state_; - const std::vector<ExprContext*>& build_expr_ctxs_; - const std::vector<ExprContext*>& probe_expr_ctxs_; - - /// Expression, one per filter in filters_, to evaluate per-build row which produces the - /// value with which to update the corresponding filter. - std::vector<ExprContext*> filter_expr_ctxs_; + /// References to the build expressions evaluated on each build-row during insertion + /// and lookup, the probe expressions used during lookup and the filter expression, + /// one per filter in filters_, evaluated on per-build row to produce the value with + /// which to update the corresponding filter. + const std::vector<ScalarExpr*>& build_exprs_; + const std::vector<ScalarExpr*>& probe_exprs_; + const std::vector<ScalarExpr*>& filter_exprs_; + + /// Evaluators for the expressions above. + std::vector<ScalarExprEvaluator*> build_expr_evals_; + std::vector<ScalarExprEvaluator*> probe_expr_evals_; + std::vector<ScalarExprEvaluator*> filter_expr_evals_; /// List of filters to build during build phase. std::vector<RuntimeFilter*> filters_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/partitioned-aggregation-node-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc b/be/src/exec/partitioned-aggregation-node-ir.cc index b696131..cd5d336 100644 --- a/be/src/exec/partitioned-aggregation-node-ir.cc +++ b/be/src/exec/partitioned-aggregation-node-ir.cc @@ -19,21 +19,18 @@ #include "exec/hash-table.inline.h" #include "exprs/agg-fn-evaluator.h" -#include "exprs/expr-context.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "runtime/buffered-tuple-stream.inline.h" #include "runtime/row-batch.h" #include "runtime/tuple-row.h" using namespace impala; -ExprContext* const* PartitionedAggregationNode::GetAggExprContexts(int i) const { - return agg_expr_ctxs_[i]; -} - Status PartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch) { Tuple* output_tuple = singleton_output_tuple_; FOREACH_ROW(batch, 0, batch_iter) { - UpdateTuple(&agg_fn_ctxs_[0], output_tuple, batch_iter.Get()); + UpdateTuple(agg_fn_evals_.data(), output_tuple, batch_iter.Get()); } return Status::OK(); } @@ -130,7 +127,7 @@ Status PartitionedAggregationNode::ProcessRow(TupleRow* __restrict__ row, DCHECK(!found); } else if (found) { // Row is already in hash table. Do the aggregation and we're done. - UpdateTuple(&dst_partition->agg_fn_ctxs[0], it.GetTuple(), row); + UpdateTuple(dst_partition->agg_fn_evals.data(), it.GetTuple(), row); return Status::OK(); } @@ -144,11 +141,12 @@ Status PartitionedAggregationNode::AddIntermediateTuple(Partition* __restrict__ TupleRow* __restrict__ row, uint32_t hash, HashTable::Iterator insert_it) { while (true) { DCHECK(partition->aggregated_row_stream->is_pinned()); - Tuple* intermediate_tuple = ConstructIntermediateTuple(partition->agg_fn_ctxs, + Tuple* intermediate_tuple = ConstructIntermediateTuple(partition->agg_fn_evals, partition->aggregated_row_stream.get(), &process_batch_status_); if (LIKELY(intermediate_tuple != NULL)) { - UpdateTuple(&partition->agg_fn_ctxs[0], intermediate_tuple, row, AGGREGATED_ROWS); + UpdateTuple(partition->agg_fn_evals.data(), intermediate_tuple, + row, AGGREGATED_ROWS); // After copying and initializing the tuple, insert it into the hash table. insert_it.SetTuple(intermediate_tuple, hash); return Status::OK(); @@ -200,13 +198,13 @@ Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize, &process_batch_status_)) { RETURN_IF_ERROR(std::move(process_batch_status_)); // Tuple is not going into hash table, add it to the output batch. - Tuple* intermediate_tuple = ConstructIntermediateTuple(agg_fn_ctxs_, + Tuple* intermediate_tuple = ConstructIntermediateTuple(agg_fn_evals_, out_batch->tuple_data_pool(), &process_batch_status_); if (UNLIKELY(intermediate_tuple == NULL)) { DCHECK(!process_batch_status_.ok()); return std::move(process_batch_status_); } - UpdateTuple(&agg_fn_ctxs_[0], intermediate_tuple, in_row); + UpdateTuple(agg_fn_evals_.data(), intermediate_tuple, in_row); out_batch_iterator.Get()->SetTuple(0, intermediate_tuple); out_batch_iterator.Next(); out_batch->CommitLastRow(); @@ -218,8 +216,7 @@ Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize, } if (needs_serialize) { FOREACH_ROW(out_batch, 0, out_batch_iter) { - AggFnEvaluator::Serialize(aggregate_evaluators_, agg_fn_ctxs_, - out_batch_iter.Get()->GetTuple(0)); + AggFnEvaluator::Serialize(agg_fn_evals_, out_batch_iter.Get()->GetTuple(0)); } } @@ -242,7 +239,7 @@ bool PartitionedAggregationNode::TryAddToHashTable( } else if (*remaining_capacity == 0) { return false; } else { - intermediate_tuple = ConstructIntermediateTuple(partition->agg_fn_ctxs, + intermediate_tuple = ConstructIntermediateTuple(partition->agg_fn_evals, partition->aggregated_row_stream.get(), status); if (LIKELY(intermediate_tuple != NULL)) { it.SetTuple(intermediate_tuple, hash); @@ -254,7 +251,7 @@ bool PartitionedAggregationNode::TryAddToHashTable( } } - UpdateTuple(&partition->agg_fn_ctxs[0], intermediate_tuple, in_row); + UpdateTuple(partition->agg_fn_evals.data(), intermediate_tuple, in_row); return true; }
