http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hash-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-join-node.cc b/be/src/exec/hash-join-node.cc index ecde742..35202a2 100644 --- a/be/src/exec/hash-join-node.cc +++ b/be/src/exec/hash-join-node.cc @@ -23,7 +23,7 @@ #include "codegen/llvm-codegen.h" #include "exec/old-hash-table.inline.h" -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" #include "gutil/strings/substitute.h" #include "runtime/mem-tracker.h" #include "runtime/row-batch.h" @@ -62,10 +62,10 @@ HashJoinNode::HashJoinNode( DCHECK_NE(join_op_, TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN); match_all_probe_ = - (join_op_ == TJoinOp::LEFT_OUTER_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN); - match_one_build_ = (join_op_ == TJoinOp::LEFT_SEMI_JOIN); + join_op_ == TJoinOp::LEFT_OUTER_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN; + match_one_build_ = join_op_ == TJoinOp::LEFT_SEMI_JOIN; match_all_build_ = - (join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN); + join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN; } Status HashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state) { @@ -73,17 +73,25 @@ Status HashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state) { DCHECK(tnode.__isset.hash_join_node); const vector<TEqJoinCondition>& eq_join_conjuncts = tnode.hash_join_node.eq_join_conjuncts; + for (int i = 0; i < eq_join_conjuncts.size(); ++i) { - ExprContext* ctx; - RETURN_IF_ERROR(Expr::CreateExprTree(pool_, eq_join_conjuncts[i].left, &ctx)); - probe_expr_ctxs_.push_back(ctx); - RETURN_IF_ERROR(Expr::CreateExprTree(pool_, eq_join_conjuncts[i].right, &ctx)); - build_expr_ctxs_.push_back(ctx); + ScalarExpr* probe_expr; + RETURN_IF_ERROR(ScalarExpr::Create(eq_join_conjuncts[i].left, child(0)->row_desc(), + state, &probe_expr)); + probe_exprs_.push_back(probe_expr); + ScalarExpr* build_expr; + RETURN_IF_ERROR(ScalarExpr::Create(eq_join_conjuncts[i].right, child(1)->row_desc(), + state, &build_expr)); + build_exprs_.push_back(build_expr); is_not_distinct_from_.push_back(eq_join_conjuncts[i].is_not_distinct_from); } - RETURN_IF_ERROR( - Expr::CreateExprTrees(pool_, tnode.hash_join_node.other_join_conjuncts, - &other_join_conjunct_ctxs_)); + + // other_join_conjunct_evals_ are evaluated in the context of rows assembled from + // all build and probe tuples; full_row_desc is not necessarily the same as the output + // row desc, e.g., because semi joins only return the build xor probe tuples + RowDescriptor full_row_desc(child(0)->row_desc(), child(1)->row_desc()); + RETURN_IF_ERROR(ScalarExpr::Create(tnode.hash_join_node.other_join_conjuncts, + full_row_desc, state, &other_join_conjuncts_)); for (const TRuntimeFilterDesc& tfilter: tnode.runtime_filters) { // If filter propagation not enabled, only consider building broadcast joins (that may @@ -92,14 +100,15 @@ Status HashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state) { !tfilter.is_broadcast_join) { continue; } - if (state->query_options().disable_row_runtime_filtering - && !tfilter.applied_on_partition_columns) { + if (state->query_options().disable_row_runtime_filtering && + !tfilter.applied_on_partition_columns) { continue; } filters_.push_back(state->filter_bank()->RegisterFilter(tfilter, true)); - ExprContext* ctx; - RETURN_IF_ERROR(Expr::CreateExprTree(pool_, tfilter.src_expr, &ctx)); - filter_expr_ctxs_.push_back(ctx); + ScalarExpr* filter_expr; + RETURN_IF_ERROR( + ScalarExpr::Create(tfilter.src_expr, child(1)->row_desc(), state, &filter_expr)); + filter_exprs_.push_back(filter_expr); } return Status::OK(); } @@ -115,32 +124,20 @@ Status HashJoinNode::Prepare(RuntimeState* state) { // build and probe exprs are evaluated in the context of the rows produced by our // right and left children, respectively - RETURN_IF_ERROR( - Expr::Prepare(build_expr_ctxs_, state, child(1)->row_desc(), expr_mem_tracker())); - AddExprCtxsToFree(build_expr_ctxs_); - RETURN_IF_ERROR( - Expr::Prepare(probe_expr_ctxs_, state, child(0)->row_desc(), expr_mem_tracker())); - AddExprCtxsToFree(probe_expr_ctxs_); - RETURN_IF_ERROR( - Expr::Prepare(filter_expr_ctxs_, state, child(1)->row_desc(), expr_mem_tracker())); - AddExprCtxsToFree(filter_expr_ctxs_); - - // other_join_conjunct_ctxs_ are evaluated in the context of rows assembled from all - // build and probe tuples; full_row_desc is not necessarily the same as the output row - // desc, e.g., because semi joins only return the build xor probe tuples - RowDescriptor full_row_desc(child(0)->row_desc(), child(1)->row_desc()); - RETURN_IF_ERROR(Expr::Prepare( - other_join_conjunct_ctxs_, state, full_row_desc, expr_mem_tracker())); - AddExprCtxsToFree(other_join_conjunct_ctxs_); + RETURN_IF_ERROR(ScalarExprEvaluator::Create(other_join_conjuncts_, state, pool_, + expr_mem_pool(), &other_join_conjunct_evals_)); + AddEvaluatorsToFree(other_join_conjunct_evals_); // TODO: default buckets const bool stores_nulls = join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN || std::accumulate(is_not_distinct_from_.begin(), is_not_distinct_from_.end(), false, std::logical_or<bool>()); - hash_tbl_.reset(new OldHashTable(state, build_expr_ctxs_, probe_expr_ctxs_, - filter_expr_ctxs_, child(1)->row_desc().tuple_descriptors().size(), stores_nulls, - is_not_distinct_from_, state->fragment_hash_seed(), mem_tracker(), filters_)); + + RETURN_IF_ERROR(OldHashTable::Create(pool_, state, build_exprs_, probe_exprs_, + filter_exprs_, child(1)->row_desc().tuple_descriptors().size(), stores_nulls, + is_not_distinct_from_, state->fragment_hash_seed(), mem_tracker(), filters_, + &hash_tbl_)); build_pool_.reset(new MemPool(mem_tracker())); AddCodegenDisabledMessage(state); return Status::OK(); @@ -189,22 +186,21 @@ Status HashJoinNode::Reset(RuntimeState* state) { void HashJoinNode::Close(RuntimeState* state) { if (is_closed()) return; - if (hash_tbl_.get() != NULL) hash_tbl_->Close(); + if (hash_tbl_.get() != NULL) hash_tbl_->Close(state); if (build_pool_.get() != NULL) build_pool_->FreeAll(); - Expr::Close(build_expr_ctxs_, state); - Expr::Close(probe_expr_ctxs_, state); - Expr::Close(filter_expr_ctxs_, state); - Expr::Close(other_join_conjunct_ctxs_, state); + ScalarExprEvaluator::Close(other_join_conjunct_evals_, state); + ScalarExpr::Close(probe_exprs_); + ScalarExpr::Close(build_exprs_); + ScalarExpr::Close(other_join_conjuncts_); + ScalarExpr::Close(filter_exprs_); BlockingJoinNode::Close(state); } Status HashJoinNode::Open(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(BlockingJoinNode::Open(state)); - RETURN_IF_ERROR(Expr::Open(build_expr_ctxs_, state)); - RETURN_IF_ERROR(Expr::Open(probe_expr_ctxs_, state)); - RETURN_IF_ERROR(Expr::Open(filter_expr_ctxs_, state)); - RETURN_IF_ERROR(Expr::Open(other_join_conjunct_ctxs_, state)); + RETURN_IF_ERROR(hash_tbl_->Open(state)); + RETURN_IF_ERROR(ScalarExprEvaluator::Open(other_join_conjunct_evals_, state)); // Check for errors and free local allocations before opening children. RETURN_IF_CANCELLED(state); @@ -216,6 +212,11 @@ Status HashJoinNode::Open(RuntimeState* state) { return Status::OK(); } +Status HashJoinNode::QueryMaintenance(RuntimeState* state) { + if (hash_tbl_.get() != nullptr) hash_tbl_->FreeLocalAllocations(); + return ExecNode::QueryMaintenance(state); +} + Status HashJoinNode::ProcessBuildInput(RuntimeState* state) { // Do a full scan of child(1) and store everything in hash_tbl_ // The hash join node needs to keep in memory all build tuples, including the tuple @@ -301,11 +302,11 @@ Status HashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch, bool* eos return LeftJoinGetNext(state, out_batch, eos); } - ExprContext* const* other_conjunct_ctxs = &other_join_conjunct_ctxs_[0]; - int num_other_conjunct_ctxs = other_join_conjunct_ctxs_.size(); + const int num_other_conjuncts = other_join_conjuncts_.size(); + DCHECK_EQ(num_other_conjuncts, other_join_conjunct_evals_.size()); - ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0]; - int num_conjunct_ctxs = conjunct_ctxs_.size(); + const int num_conjuncts = conjuncts_.size(); + DCHECK_EQ(num_conjuncts, conjunct_evals_.size()); // Explicitly manage the timer counter to avoid measuring time in the child // GetNext call. @@ -322,7 +323,8 @@ Status HashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch, bool* eos TupleRow* matched_build_row = hash_tbl_iterator_.GetRow(); CreateOutputRow(out_row, current_probe_row_, matched_build_row); - if (!EvalConjuncts(other_conjunct_ctxs, num_other_conjunct_ctxs, out_row)) { + if (!EvalConjuncts(other_join_conjunct_evals_.data(), + num_other_conjuncts, out_row)) { hash_tbl_iterator_.Next<true>(); continue; } @@ -336,7 +338,7 @@ Status HashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch, bool* eos } hash_tbl_iterator_.Next<true>(); - if (EvalConjuncts(conjunct_ctxs, num_conjunct_ctxs, out_row)) { + if (EvalConjuncts(conjunct_evals_.data(), num_conjuncts, out_row)) { out_batch->CommitLastRow(); VLOG_ROW << "match row: " << PrintRow(out_row, row_desc()); ++num_rows_returned_; @@ -355,7 +357,7 @@ Status HashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch, bool* eos int row_idx = out_batch->AddRow(); TupleRow* out_row = out_batch->GetRow(row_idx); CreateOutputRow(out_row, current_probe_row_, NULL); - if (EvalConjuncts(conjunct_ctxs, num_conjunct_ctxs, out_row)) { + if (EvalConjuncts(conjunct_evals_.data(), num_conjuncts, out_row)) { out_batch->CommitLastRow(); VLOG_ROW << "match row: " << PrintRow(out_row, row_desc()); ++num_rows_returned_; @@ -425,7 +427,7 @@ Status HashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch, bool* eos int row_idx = out_batch->AddRow(); TupleRow* out_row = out_batch->GetRow(row_idx); CreateOutputRow(out_row, NULL, build_row); - if (EvalConjuncts(conjunct_ctxs, num_conjunct_ctxs, out_row)) { + if (EvalConjuncts(conjunct_evals_.data(), num_conjuncts, out_row)) { out_batch->CommitLastRow(); VLOG_ROW << "match row: " << PrintRow(out_row, row_desc()); ++num_rows_returned_; @@ -492,8 +494,8 @@ void HashJoinNode::AddToDebugString(int indentation_level, stringstream* out) co *out << " hash_tbl="; *out << string(indentation_level * 2, ' '); *out << "HashTbl(" - << " build_exprs=" << Expr::DebugString(build_expr_ctxs_) - << " probe_exprs=" << Expr::DebugString(probe_expr_ctxs_); + << " build_exprs=" << ScalarExpr::DebugString(build_exprs_) + << " probe_exprs=" << ScalarExpr::DebugString(probe_exprs_); *out << ")"; } @@ -638,13 +640,13 @@ Function* HashJoinNode::CodegenProcessProbeBatch(LlvmCodeGen* codegen, // Codegen evaluating other join conjuncts Function* eval_other_conjuncts_fn; - Status status = ExecNode::CodegenEvalConjuncts(codegen, other_join_conjunct_ctxs_, + Status status = ExecNode::CodegenEvalConjuncts(codegen, other_join_conjuncts_, &eval_other_conjuncts_fn, "EvalOtherConjuncts"); if (!status.ok()) return NULL; // Codegen evaluating conjuncts Function* eval_conjuncts_fn; - status = ExecNode::CodegenEvalConjuncts(codegen, conjunct_ctxs_, &eval_conjuncts_fn); + status = ExecNode::CodegenEvalConjuncts(codegen, conjuncts_, &eval_conjuncts_fn); if (!status.ok()) return NULL; // Replace all call sites with codegen version
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hash-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-join-node.h b/be/src/exec/hash-join-node.h index e65dc16..b49f8bb 100644 --- a/be/src/exec/hash-join-node.h +++ b/be/src/exec/hash-join-node.h @@ -26,6 +26,8 @@ #include "exec/exec-node.h" #include "exec/old-hash-table.h" #include "exec/blocking-join-node.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "util/promise.h" #include "gen-cpp/PlanNodes_types.h" // for TJoinOp @@ -34,6 +36,8 @@ namespace impala { class MemPool; class RowBatch; +class ScalarExpr; +class ScalarExprEvaluator; class TupleRow; /// Node for in-memory hash joins: @@ -63,6 +67,7 @@ class HashJoinNode : public BlockingJoinNode { static const char* LLVM_CLASS_NAME; protected: + virtual Status QueryMaintenance(RuntimeState* state); virtual void AddToDebugString(int indentation_level, std::stringstream* out) const; virtual Status ProcessBuildInput(RuntimeState* state); @@ -75,18 +80,19 @@ class HashJoinNode : public BlockingJoinNode { /// our equi-join predicates "<lhs> = <rhs>" are separated into /// build_exprs_ (over child(1)) and probe_exprs_ (over child(0)) - std::vector<ExprContext*> probe_expr_ctxs_; - std::vector<ExprContext*> build_expr_ctxs_; + std::vector<ScalarExpr*> probe_exprs_; + std::vector<ScalarExpr*> build_exprs_; /// Expressions used to build runtime filters, one per entry in filters_. - std::vector<ExprContext*> filter_expr_ctxs_; + std::vector<ScalarExpr*> filter_exprs_; /// is_not_distinct_from_[i] is true if and only if the ith equi-join predicate is IS /// NOT DISTINCT FROM, rather than equality. std::vector<bool> is_not_distinct_from_; /// non-equi-join conjuncts from the JOIN clause - std::vector<ExprContext*> other_join_conjunct_ctxs_; + std::vector<ScalarExpr*> other_join_conjuncts_; + std::vector<ScalarExprEvaluator*> other_join_conjunct_evals_; /// Derived from join_op_ /// Output all rows coming from the probe input. Used in LEFT_OUTER_JOIN and http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hash-table-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table-ir.cc b/be/src/exec/hash-table-ir.cc index 4da81e2..6ba0874 100644 --- a/be/src/exec/hash-table-ir.cc +++ b/be/src/exec/hash-table-ir.cc @@ -23,12 +23,12 @@ using namespace impala; uint32_t HashTableCtx::GetHashSeed() const { return seeds_[level_]; } -ExprContext* const* HashTableCtx::GetBuildExprCtxs() const { - return build_expr_ctxs_.data(); +ScalarExprEvaluator* const* HashTableCtx::build_expr_evals() const { + return build_expr_evals_.data(); } -ExprContext* const* HashTableCtx::GetProbeExprCtxs() const { - return probe_expr_ctxs_.data(); +ScalarExprEvaluator* const* HashTableCtx::probe_expr_evals() const { + return probe_expr_evals_.data(); } #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hash-table-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc index e776a68..42bc7e1 100644 --- a/be/src/exec/hash-table-test.cc +++ b/be/src/exec/hash-table-test.cc @@ -26,8 +26,8 @@ #include "common/compiler-util.h" #include "common/init.h" #include "exec/hash-table.inline.h" -#include "exprs/expr.h" -#include "exprs/expr-context.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "exprs/slot-ref.h" #include "runtime/mem-pool.h" #include "runtime/mem-tracker.h" @@ -56,35 +56,41 @@ class HashTableTest : public testing::Test { ObjectPool pool_; MemTracker tracker_; MemPool mem_pool_; - vector<ExprContext*> build_expr_ctxs_; - vector<ExprContext*> probe_expr_ctxs_; + vector<ScalarExpr*> build_exprs_; + vector<ScalarExprEvaluator*> build_expr_evals_; + vector<ScalarExpr*> probe_exprs_; + vector<ScalarExprEvaluator*> probe_expr_evals_; int next_query_id_ = 0; virtual void SetUp() { test_env_.reset(new TestEnv()); ASSERT_OK(test_env_->Init()); - RowDescriptor desc; - Status status; // Not very easy to test complex tuple layouts so this test will use the // simplest. The purpose of these tests is to exercise the hash map // internals so a simple build/probe expr is fine. - Expr* expr = pool_.Add(new SlotRef(TYPE_INT, 1, true /* nullable */)); - build_expr_ctxs_.push_back(pool_.Add(new ExprContext(expr))); - ASSERT_OK(Expr::Prepare(build_expr_ctxs_, NULL, desc, &tracker_)); - ASSERT_OK(Expr::Open(build_expr_ctxs_, NULL)); - - expr = pool_.Add(new SlotRef(TYPE_INT, 1, true /* nullable */)); - probe_expr_ctxs_.push_back(pool_.Add(new ExprContext(expr))); - ASSERT_OK(Expr::Prepare(probe_expr_ctxs_, NULL, desc, &tracker_)); - ASSERT_OK(Expr::Open(probe_expr_ctxs_, NULL)); + ScalarExpr* build_expr = pool_.Add(new SlotRef(TYPE_INT, 1, true /* nullable */)); + ASSERT_OK(build_expr->Init(desc, nullptr)); + build_exprs_.push_back(build_expr); + ASSERT_OK(ScalarExprEvaluator::Create(build_exprs_, nullptr, &pool_, &mem_pool_, + &build_expr_evals_)); + ASSERT_OK(ScalarExprEvaluator::Open(build_expr_evals_, nullptr)); + + ScalarExpr* probe_expr = pool_.Add(new SlotRef(TYPE_INT, 1, true /* nullable */)); + ASSERT_OK(probe_expr->Init(desc, nullptr)); + probe_exprs_.push_back(probe_expr); + ASSERT_OK(ScalarExprEvaluator::Create(probe_exprs_, nullptr, &pool_, &mem_pool_, + &probe_expr_evals_)); + ASSERT_OK(ScalarExprEvaluator::Open(probe_expr_evals_, nullptr)); } virtual void TearDown() { - Expr::Close(build_expr_ctxs_, NULL); - Expr::Close(probe_expr_ctxs_, NULL); - runtime_state_ = NULL; + ScalarExprEvaluator::Close(build_expr_evals_, nullptr); + ScalarExprEvaluator::Close(probe_expr_evals_, nullptr); + ScalarExpr::Close(build_exprs_); + ScalarExpr::Close(probe_exprs_); + runtime_state_ = nullptr; test_env_.reset(); mem_pool_.FreeAll(); } @@ -123,10 +129,10 @@ class HashTableTest : public testing::Test { HashTable::Iterator iter = table->Begin(ht_ctx); while (!iter.AtEnd()) { TupleRow* row = iter.GetRow(); - int32_t val = *reinterpret_cast<int32_t*>(build_expr_ctxs_[0]->GetValue(row)); + int32_t val = *reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(row)); EXPECT_GE(val, min); EXPECT_LT(val, max); - if (all_unique) EXPECT_TRUE(results[val] == NULL); + if (all_unique) EXPECT_TRUE(results[val] == nullptr); EXPECT_EQ(row->GetTuple(0), expected[val]->GetTuple(0)); results[val] = row; iter.Next(); @@ -138,9 +144,9 @@ class HashTableTest : public testing::Test { void ValidateMatch(TupleRow* probe_row, TupleRow* build_row) { EXPECT_TRUE(probe_row != build_row); int32_t build_val = - *reinterpret_cast<int32_t*>(build_expr_ctxs_[0]->GetValue(probe_row)); + *reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(probe_row)); int32_t probe_val = - *reinterpret_cast<int32_t*>(probe_expr_ctxs_[0]->GetValue(build_row)); + *reinterpret_cast<int32_t*>(probe_expr_evals_[0]->GetValue(build_row)); EXPECT_EQ(build_val, probe_val); } @@ -197,7 +203,7 @@ class HashTableTest : public testing::Test { // Initial_num_buckets must be a power of two. EXPECT_EQ(initial_num_buckets, BitUtil::RoundUpToPowerOfTwo(initial_num_buckets)); int64_t max_num_buckets = 1L << 31; - table->reset(new HashTable(quadratic, runtime_state_, client, true, 1, NULL, + table->reset(new HashTable(quadratic, runtime_state_, client, true, 1, nullptr, max_num_buckets, initial_num_buckets)); return (*table)->Init(); } @@ -210,16 +216,16 @@ class HashTableTest : public testing::Test { TupleRow* probe_row4 = CreateTupleRow(4); int32_t* val_row1 = - reinterpret_cast<int32_t*>(build_expr_ctxs_[0]->GetValue(build_row1)); + reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(build_row1)); EXPECT_EQ(*val_row1, 1); int32_t* val_row2 = - reinterpret_cast<int32_t*>(build_expr_ctxs_[0]->GetValue(build_row2)); + reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(build_row2)); EXPECT_EQ(*val_row2, 2); int32_t* val_row3 = - reinterpret_cast<int32_t*>(probe_expr_ctxs_[0]->GetValue(probe_row3)); + reinterpret_cast<int32_t*>(probe_expr_evals_[0]->GetValue(probe_row3)); EXPECT_EQ(*val_row3, 3); int32_t* val_row4 = - reinterpret_cast<int32_t*>(probe_expr_ctxs_[0]->GetValue(probe_row4)); + reinterpret_cast<int32_t*>(probe_expr_evals_[0]->GetValue(probe_row4)); EXPECT_EQ(*val_row4, 4); // Create and close the hash table. @@ -233,7 +239,7 @@ class HashTableTest : public testing::Test { hash_table->Close(); } - // IMPALA-2897: Build rows that are equivalent (where NULLs are counted as equivalent) + // IMPALA-2897: Build rows that are equivalent (where nullptrs are counted as equivalent) // should not occupy distinct buckets. void NullBuildRowTest() { TupleRow* build_rows[2]; @@ -243,10 +249,10 @@ class HashTableTest : public testing::Test { scoped_ptr<HashTable> hash_table; ASSERT_TRUE(CreateHashTable(true, 1024, &hash_table)); scoped_ptr<HashTableCtx> ht_ctx; - Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_, - probe_expr_ctxs_, true /* stores_nulls_ */, - vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, &ht_ctx); - EXPECT_OK(status); + EXPECT_OK(HashTableCtx::Create(&pool_, runtime_state_, + build_exprs_, probe_exprs_, true /* stores_nulls_ */, + vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, &ht_ctx)); + EXPECT_OK(ht_ctx->Open(runtime_state_)); for (int i = 0; i < 2; ++i) { if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue; @@ -257,7 +263,7 @@ class HashTableTest : public testing::Test { } EXPECT_EQ(hash_table->num_buckets() - hash_table->EmptyBuckets(), 1); hash_table->Close(); - ht_ctx->Close(); + ht_ctx->Close(runtime_state_); } // This test inserts the build rows [0->5) to hash table. It validates that they @@ -279,10 +285,11 @@ class HashTableTest : public testing::Test { scoped_ptr<HashTable> hash_table; ASSERT_TRUE(CreateHashTable(quadratic, initial_num_buckets, &hash_table)); scoped_ptr<HashTableCtx> ht_ctx; - Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_, - probe_expr_ctxs_, false /* !stores_nulls_ */, - vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, &ht_ctx); + Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_, + probe_exprs_, false /* !stores_nulls_ */, + vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, &ht_ctx); EXPECT_OK(status); + EXPECT_OK(ht_ctx->Open(runtime_state_)); bool success = hash_table->CheckAndResize(5, ht_ctx.get()); ASSERT_TRUE(success); for (int i = 0; i < 5; ++i) { @@ -323,7 +330,7 @@ class HashTableTest : public testing::Test { ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false); hash_table->Close(); - ht_ctx->Close(); + ht_ctx->Close(runtime_state_); } void ScanTest(bool quadratic, int initial_size, int rows_to_insert, @@ -333,10 +340,11 @@ class HashTableTest : public testing::Test { int total_rows = rows_to_insert + additional_rows; scoped_ptr<HashTableCtx> ht_ctx; - Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_, - probe_expr_ctxs_, false /* !stores_nulls_ */, - vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, &ht_ctx); + Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_, + probe_exprs_, false /* !stores_nulls_ */, + vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, &ht_ctx); EXPECT_OK(status); + EXPECT_OK(ht_ctx->Open(runtime_state_)); // Add 1 row with val 1, 2 with val 2, etc. vector<TupleRow*> build_rows; @@ -378,7 +386,7 @@ class HashTableTest : public testing::Test { delete [] probe_rows; hash_table->Close(); - ht_ctx->Close(); + ht_ctx->Close(runtime_state_); } // This test continues adding tuples to the hash table and exercises the resize code @@ -391,9 +399,9 @@ class HashTableTest : public testing::Test { scoped_ptr<HashTable> hash_table; ASSERT_TRUE(CreateHashTable(quadratic, num_to_add, &hash_table)); scoped_ptr<HashTableCtx> ht_ctx; - Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_, - probe_expr_ctxs_, false /* !stores_nulls_ */, - vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, &ht_ctx); + Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_, + probe_exprs_, false /* !stores_nulls_ */, + vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, &ht_ctx); EXPECT_OK(status); // Inserts num_to_add + (num_to_add^2) + (num_to_add^4) + ... + (num_to_add^20) @@ -434,7 +442,7 @@ class HashTableTest : public testing::Test { } } hash_table->Close(); - ht_ctx->Close(); + ht_ctx->Close(runtime_state_); } // This test inserts and probes as many elements as the size of the hash table without @@ -446,9 +454,9 @@ class HashTableTest : public testing::Test { ASSERT_TRUE(CreateHashTable(quadratic, table_size, &hash_table)); EXPECT_EQ(hash_table->EmptyBuckets(), table_size); scoped_ptr<HashTableCtx> ht_ctx; - Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_, - probe_expr_ctxs_, false /* !stores_nulls_ */, - vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, &ht_ctx); + Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_, + probe_exprs_, false /* !stores_nulls_ */, + vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, &ht_ctx); EXPECT_OK(status); // Insert and probe table_size different tuples. All of them are expected to be @@ -504,7 +512,7 @@ class HashTableTest : public testing::Test { EXPECT_FALSE(found); hash_table->Close(); - ht_ctx->Close(); + ht_ctx->Close(runtime_state_); } // This test makes sure we can tolerate the low memory case where we do not have enough @@ -518,14 +526,14 @@ class HashTableTest : public testing::Test { ASSERT_FALSE(CreateHashTable(quadratic, table_size, &hash_table, block_size, max_num_blocks, reserved_blocks)); scoped_ptr<HashTableCtx> ht_ctx; - Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_, - probe_expr_ctxs_, false /* !stores_nulls_ */, - vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, &ht_ctx); + Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_, + probe_exprs_, false /* !stores_nulls_ */, vector<bool>(build_exprs_.size(), false), 1, 0, 1, + &mem_pool_, &ht_ctx); EXPECT_OK(status); HashTable::Iterator iter = hash_table->Begin(ht_ctx.get()); EXPECT_TRUE(iter.AtEnd()); hash_table->Close(); - ht_ctx->Close(); + ht_ctx->Close(runtime_state_); } }; @@ -607,20 +615,21 @@ TEST_F(HashTableTest, HashEmpty) { EXPECT_OK(test_env_->CreateQueryStateWithBlockMgr( 0, 100, 8 * 1024 * 1024, nullptr, &runtime_state_)); scoped_ptr<HashTableCtx> ht_ctx; - Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_, probe_expr_ctxs_, - false /* !stores_nulls_ */, vector<bool>(build_expr_ctxs_.size(), false), 1, 2, 1, - &tracker_, &ht_ctx); + Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_, + probe_exprs_, false /* !stores_nulls_ */, + vector<bool>(build_exprs_.size(), false), 1, 2, 1, &mem_pool_, &ht_ctx); EXPECT_OK(status); + EXPECT_OK(ht_ctx->Open(runtime_state_)); uint32_t seed = 9999; ht_ctx->set_level(0); - EXPECT_NE(seed, ht_ctx->Hash(NULL, 0, seed)); + EXPECT_NE(seed, ht_ctx->Hash(nullptr, 0, seed)); // TODO: level 0 uses CRC hash, which only swaps bytes around on empty input. - // EXPECT_NE(seed, ht_ctx->Hash(NULL, 0, ht_ctx->Hash(NULL, 0, seed))); + // EXPECT_NE(seed, ht_ctx->Hash(nullptr, 0, ht_ctx->Hash(nullptr, 0, seed))); ht_ctx->set_level(1); - EXPECT_NE(seed, ht_ctx->Hash(NULL, 0, seed)); - EXPECT_NE(seed, ht_ctx->Hash(NULL, 0, ht_ctx->Hash(NULL, 0, seed))); - ht_ctx.get()->Close(); + EXPECT_NE(seed, ht_ctx->Hash(nullptr, 0, seed)); + EXPECT_NE(seed, ht_ctx->Hash(nullptr, 0, ht_ctx->Hash(nullptr, 0, seed))); + ht_ctx->Close(runtime_state_); } TEST_F(HashTableTest, VeryLowMemTest) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hash-table.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc index d8a7cdc..28831ce 100644 --- a/be/src/exec/hash-table.cc +++ b/be/src/exec/hash-table.cc @@ -23,9 +23,9 @@ #include "codegen/codegen-anyval.h" #include "codegen/llvm-codegen.h" -#include "exprs/expr.h" -#include "exprs/expr-context.h" #include "exprs/slot-ref.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "runtime/buffered-block-mgr.h" #include "runtime/mem-tracker.h" #include "runtime/raw-value.inline.h" @@ -85,24 +85,24 @@ static int64_t NULL_VALUE[] = { HashUtil::FNV_SEED, HashUtil::FNV_SEED, static const int64_t INITIAL_DATA_PAGE_SIZES[] = { 64 * 1024, 512 * 1024 }; static const int NUM_SMALL_DATA_PAGES = sizeof(INITIAL_DATA_PAGE_SIZES) / sizeof(int64_t); -HashTableCtx::HashTableCtx(const std::vector<ExprContext*>& build_expr_ctxs, - const std::vector<ExprContext*>& probe_expr_ctxs, bool stores_nulls, +HashTableCtx::HashTableCtx(const std::vector<ScalarExpr*>& build_exprs, + const std::vector<ScalarExpr*>& probe_exprs, bool stores_nulls, const std::vector<bool>& finds_nulls, int32_t initial_seed, - int max_levels, MemTracker* tracker) - : build_expr_ctxs_(build_expr_ctxs), - probe_expr_ctxs_(probe_expr_ctxs), + int max_levels, MemPool* mem_pool) + : build_exprs_(build_exprs), + probe_exprs_(probe_exprs), stores_nulls_(stores_nulls), finds_nulls_(finds_nulls), finds_some_nulls_(std::accumulate( finds_nulls_.begin(), finds_nulls_.end(), false, std::logical_or<bool>())), level_(0), scratch_row_(NULL), - tracker_(tracker) { + mem_pool_(mem_pool) { DCHECK(!finds_some_nulls_ || stores_nulls_); // Compute the layout and buffer size to store the evaluated expr results - DCHECK_EQ(build_expr_ctxs_.size(), probe_expr_ctxs_.size()); - DCHECK_EQ(build_expr_ctxs_.size(), finds_nulls_.size()); - DCHECK(!build_expr_ctxs_.empty()); + DCHECK_EQ(build_exprs_.size(), probe_exprs_.size()); + DCHECK_EQ(build_exprs_.size(), finds_nulls_.size()); + DCHECK(!build_exprs_.empty()); // Populate the seeds to use for all the levels. TODO: revisit how we generate these. DCHECK_GE(max_levels, 0); @@ -115,30 +115,57 @@ HashTableCtx::HashTableCtx(const std::vector<ExprContext*>& build_expr_ctxs, } } -Status HashTableCtx::Create(RuntimeState* state, - const std::vector<ExprContext*>& build_expr_ctxs, - const std::vector<ExprContext*>& probe_expr_ctxs, bool stores_nulls, - const std::vector<bool>& finds_nulls, int32_t initial_seed, int max_levels, - int num_build_tuples, MemTracker* tracker, scoped_ptr<HashTableCtx>* ht_ctx) { - ht_ctx->reset(new HashTableCtx(build_expr_ctxs, probe_expr_ctxs, stores_nulls, - finds_nulls, initial_seed, max_levels, tracker)); - return ht_ctx->get()->Init(state, num_build_tuples); -} - -Status HashTableCtx::Init(RuntimeState* state, int num_build_tuples) { +Status HashTableCtx::Init(ObjectPool* pool, RuntimeState* state, int num_build_tuples) { int scratch_row_size = sizeof(Tuple*) * num_build_tuples; scratch_row_ = reinterpret_cast<TupleRow*>(malloc(scratch_row_size)); if (UNLIKELY(scratch_row_ == NULL)) { return Status(Substitute("Failed to allocate $0 bytes for scratch row of " "HashTableCtx.", scratch_row_size)); } - return expr_values_cache_.Init(state, tracker_, build_expr_ctxs_); + RETURN_IF_ERROR(ScalarExprEvaluator::Create(build_exprs_, state, pool, mem_pool_, + &build_expr_evals_)); + DCHECK_EQ(build_exprs_.size(), build_expr_evals_.size()); + RETURN_IF_ERROR(ScalarExprEvaluator::Create(probe_exprs_, state, pool, mem_pool_, + &probe_expr_evals_)); + DCHECK_EQ(probe_exprs_.size(), probe_expr_evals_.size()); + return expr_values_cache_.Init(state, mem_pool_->mem_tracker(), build_exprs_); +} + +Status HashTableCtx::Create(ObjectPool* pool, RuntimeState* state, + const std::vector<ScalarExpr*>& build_exprs, + const std::vector<ScalarExpr*>& probe_exprs, bool stores_nulls, + const std::vector<bool>& finds_nulls, int32_t initial_seed, int max_levels, + int num_build_tuples, MemPool* mem_pool, scoped_ptr<HashTableCtx>* ht_ctx) { + ht_ctx->reset(new HashTableCtx(build_exprs, probe_exprs, stores_nulls, + finds_nulls, initial_seed, max_levels, mem_pool)); + return (*ht_ctx)->Init(pool, state, num_build_tuples); } -void HashTableCtx::Close() { +Status HashTableCtx::Open(RuntimeState* state) { + RETURN_IF_ERROR(ScalarExprEvaluator::Open(build_expr_evals_, state)); + RETURN_IF_ERROR(ScalarExprEvaluator::Open(probe_expr_evals_, state)); + return Status::OK(); +} + +void HashTableCtx::Close(RuntimeState* state) { free(scratch_row_); scratch_row_ = NULL; - expr_values_cache_.Close(tracker_); + expr_values_cache_.Close(mem_pool_->mem_tracker()); + ScalarExprEvaluator::Close(build_expr_evals_, state); + ScalarExprEvaluator::Close(probe_expr_evals_, state); +} + +void HashTableCtx::FreeBuildLocalAllocations() { + ScalarExprEvaluator::FreeLocalAllocations(build_expr_evals_); +} + +void HashTableCtx::FreeProbeLocalAllocations() { + ScalarExprEvaluator::FreeLocalAllocations(probe_expr_evals_); +} + +void HashTableCtx::FreeLocalAllocations() { + FreeBuildLocalAllocations(); + FreeProbeLocalAllocations(); } uint32_t HashTableCtx::Hash(const void* input, int len, uint32_t hash) const { @@ -161,12 +188,13 @@ uint32_t HashTableCtx::HashRow( } } -bool HashTableCtx::EvalRow(const TupleRow* row, const vector<ExprContext*>& ctxs, +bool HashTableCtx::EvalRow(const TupleRow* row, + const vector<ScalarExprEvaluator*>& evals, uint8_t* expr_values, uint8_t* expr_values_null) noexcept { bool has_null = false; - for (int i = 0; i < ctxs.size(); ++i) { + for (int i = 0; i < evals.size(); ++i) { void* loc = expr_values_cache_.ExprValuePtr(expr_values, i); - void* val = ctxs[i]->GetValue(row); + void* val = evals[i]->GetValue(row); if (val == NULL) { // If the table doesn't store nulls, no reason to keep evaluating if (!stores_nulls_) return true; @@ -176,9 +204,8 @@ bool HashTableCtx::EvalRow(const TupleRow* row, const vector<ExprContext*>& ctxs } else { expr_values_null[i] = false; } - DCHECK_LE(build_expr_ctxs_[i]->root()->type().GetSlotSize(), - sizeof(NULL_VALUE)); - RawValue::Write(val, loc, build_expr_ctxs_[i]->root()->type(), NULL); + DCHECK_LE(build_exprs_[i]->type().GetSlotSize(), sizeof(NULL_VALUE)); + RawValue::Write(val, loc, build_exprs_[i]->type(), NULL); } return has_null; } @@ -192,11 +219,12 @@ uint32_t HashTableCtx::HashVariableLenRow(const uint8_t* expr_values, hash = Hash(expr_values, var_result_offset, hash); } - for (int i = 0; i < build_expr_ctxs_.size(); ++i) { + for (int i = 0; i < build_exprs_.size(); ++i) { // non-string and null slots are already part of 'expr_values'. - if (build_expr_ctxs_[i]->root()->type().type != TYPE_STRING - && build_expr_ctxs_[i]->root()->type().type != TYPE_VARCHAR) continue; - + if (build_exprs_[i]->type().type != TYPE_STRING && + build_exprs_[i]->type().type != TYPE_VARCHAR) { + continue; + } const void* loc = expr_values_cache_.ExprValuePtr(expr_values, i); if (expr_values_null[i]) { // Hash the null random seed values at 'loc' @@ -214,8 +242,8 @@ uint32_t HashTableCtx::HashVariableLenRow(const uint8_t* expr_values, template <bool FORCE_NULL_EQUALITY> bool HashTableCtx::Equals(const TupleRow* build_row, const uint8_t* expr_values, const uint8_t* expr_values_null) const noexcept { - for (int i = 0; i < build_expr_ctxs_.size(); ++i) { - void* val = build_expr_ctxs_[i]->GetValue(build_row); + for (int i = 0; i < build_expr_evals_.size(); ++i) { + void* val = build_expr_evals_[i]->GetValue(build_row); if (val == NULL) { if (!(FORCE_NULL_EQUALITY || finds_nulls_[i])) return false; if (!expr_values_null[i]) return false; @@ -225,13 +253,11 @@ bool HashTableCtx::Equals(const TupleRow* build_row, const uint8_t* expr_values, } const void* loc = expr_values_cache_.ExprValuePtr(expr_values, i); - if (!RawValue::Eq(loc, val, build_expr_ctxs_[i]->root()->type())) { - return false; - } + DCHECK(build_exprs_[i] == &build_expr_evals_[i]->root()); + if (!RawValue::Eq(loc, val, build_exprs_[i]->type())) return false; } return true; } - template bool HashTableCtx::Equals<true>(const TupleRow* build_row, const uint8_t* expr_values, const uint8_t* expr_values_null) const; template bool HashTableCtx::Equals<false>(const TupleRow* build_row, @@ -249,11 +275,11 @@ HashTableCtx::ExprValuesCache::ExprValuesCache() null_bitmap_(0) {} Status HashTableCtx::ExprValuesCache::Init(RuntimeState* state, - MemTracker* tracker, const std::vector<ExprContext*>& build_expr_ctxs) { + MemTracker* tracker, const std::vector<ScalarExpr*>& build_exprs) { // Initialize the number of expressions. - num_exprs_ = build_expr_ctxs.size(); + num_exprs_ = build_exprs.size(); // Compute the layout of evaluated values of a row. - expr_values_bytes_per_row_ = Expr::ComputeResultsLayout(build_expr_ctxs, + expr_values_bytes_per_row_ = ScalarExpr::ComputeResultsLayout(build_exprs, &expr_values_offsets_, &var_result_offset_); if (expr_values_bytes_per_row_ == 0) { DCHECK_EQ(num_exprs_, 0); @@ -623,8 +649,8 @@ static void CodegenAssignNullValue( } } -// Codegen for evaluating a tuple row over either build_expr_ctxs_ or probe_expr_ctxs_. -// For a group by with (big int, string) the IR looks like: +// Codegen for evaluating a tuple row over either build_expr_evals_ or +// probe_expr_evals_. For a group by with (big int, string) the IR looks like: // // define i1 @EvalProbeRow(%"class.impala::HashTableCtx"* %this_ptr, // %"class.impala::TupleRow"* %row, i8* %expr_values, i8* %expr_values_null) #34 { @@ -695,10 +721,10 @@ static void CodegenAssignNullValue( // becomes the start of the next block for codegen (either the next expr or just the // end of the function). Status HashTableCtx::CodegenEvalRow(LlvmCodeGen* codegen, bool build, Function** fn) { - const vector<ExprContext*>& ctxs = build ? build_expr_ctxs_ : probe_expr_ctxs_; - for (int i = 0; i < ctxs.size(); ++i) { + const vector<ScalarExpr*>& exprs = build ? build_exprs_ : probe_exprs_; + for (int i = 0; i < exprs.size(); ++i) { // Disable codegen for CHAR - if (ctxs[i]->root()->type().type == TYPE_CHAR) { + if (exprs[i]->type().type == TYPE_CHAR) { return Status("HashTableCtx::CodegenEvalRow(): CHAR NYI"); } } @@ -728,13 +754,13 @@ Status HashTableCtx::CodegenEvalRow(LlvmCodeGen* codegen, bool build, Function** Value* expr_values_null = args[3]; Value* has_null = codegen->false_value(); - // ctx_vector = &build_expr_ctxs_[0] / ctx_vector = &probe_expr_ctxs_[0] - Value* ctx_vector = codegen->CodegenCallFunction(&builder, build ? - IRFunction::HASH_TABLE_GET_BUILD_EXPR_CTX : - IRFunction::HASH_TABLE_GET_PROBE_EXPR_CTX, - this_ptr, "ctx_vector"); + // evaluator_vector = &build_expr_evals_[0] / &probe_expr_evals_[0] + Value* eval_vector = codegen->CodegenCallFunction(&builder, build ? + IRFunction::HASH_TABLE_GET_BUILD_EXPR_EVALUATORS : + IRFunction::HASH_TABLE_GET_PROBE_EXPR_EVALUATORS, + this_ptr, "eval_vector"); - for (int i = 0; i < ctxs.size(); ++i) { + for (int i = 0; i < exprs.size(); ++i) { // TODO: refactor this to somewhere else? This is not hash table specific except for // the null handling bit and would be used for anyone that needs to materialize a // vector of exprs @@ -743,7 +769,7 @@ Status HashTableCtx::CodegenEvalRow(LlvmCodeGen* codegen, bool build, Function** Value* loc = builder.CreateInBoundsGEP( NULL, expr_values, codegen->GetIntConstant(TYPE_INT, offset), "loc_addr"); Value* llvm_loc = builder.CreatePointerCast( - loc, codegen->GetPtrType(ctxs[i]->root()->type()), "loc"); + loc, codegen->GetPtrType(exprs[i]->type()), "loc"); BasicBlock* null_block = BasicBlock::Create(context, "null", *fn); BasicBlock* not_null_block = BasicBlock::Create(context, "not_null", *fn); @@ -751,7 +777,7 @@ Status HashTableCtx::CodegenEvalRow(LlvmCodeGen* codegen, bool build, Function** // Call expr Function* expr_fn; - Status status = ctxs[i]->root()->GetCodegendComputeFn(codegen, &expr_fn); + Status status = exprs[i]->GetCodegendComputeFn(codegen, &expr_fn); if (!status.ok()) { (*fn)->eraseFromParent(); // deletes function *fn = NULL; @@ -764,9 +790,10 @@ Status HashTableCtx::CodegenEvalRow(LlvmCodeGen* codegen, bool build, Function** codegen->SetNoInline(expr_fn); } - Value* expr_ctx = codegen->CodegenArrayAt(&builder, ctx_vector, i, "expr_ctx"); + Value* eval_arg = + codegen->CodegenArrayAt(&builder, eval_vector, i, "eval"); CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped( - codegen, &builder, ctxs[i]->root()->type(), expr_fn, {expr_ctx, row}, "result"); + codegen, &builder, exprs[i]->type(), expr_fn, {eval_arg, row}, "result"); Value* is_null = result.GetIsNull(); // Set null-byte result @@ -782,7 +809,7 @@ Status HashTableCtx::CodegenEvalRow(LlvmCodeGen* codegen, bool build, Function** // hash table doesn't store nulls, no reason to keep evaluating exprs builder.CreateRet(codegen->true_value()); } else { - CodegenAssignNullValue(codegen, &builder, llvm_loc, ctxs[i]->root()->type()); + CodegenAssignNullValue(codegen, &builder, llvm_loc, exprs[i]->type()); builder.CreateBr(continue_block); } @@ -804,7 +831,7 @@ Status HashTableCtx::CodegenEvalRow(LlvmCodeGen* codegen, bool build, Function** builder.CreateRet(has_null); // Avoid inlining a large EvalRow() function into caller. - if (ctxs.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) { + if (exprs.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) { codegen->SetNoInline(*fn); } @@ -851,9 +878,9 @@ Status HashTableCtx::CodegenEvalRow(LlvmCodeGen* codegen, bool build, Function** // ret i32 %hash_phi // } Status HashTableCtx::CodegenHashRow(LlvmCodeGen* codegen, bool use_murmur, Function** fn) { - for (int i = 0; i < build_expr_ctxs_.size(); ++i) { + for (int i = 0; i < build_exprs_.size(); ++i) { // Disable codegen for CHAR - if (build_expr_ctxs_[i]->root()->type().type == TYPE_CHAR) { + if (build_exprs_[i]->type().type == TYPE_CHAR) { return Status("HashTableCtx::CodegenHashRow(): CHAR NYI"); } } @@ -906,9 +933,11 @@ Status HashTableCtx::CodegenHashRow(LlvmCodeGen* codegen, bool use_murmur, Funct } // Hash string slots - for (int i = 0; i < build_expr_ctxs_.size(); ++i) { - if (build_expr_ctxs_[i]->root()->type().type != TYPE_STRING - && build_expr_ctxs_[i]->root()->type().type != TYPE_VARCHAR) continue; + for (int i = 0; i < build_exprs_.size(); ++i) { + if (build_exprs_[i]->type().type != TYPE_STRING && + build_exprs_[i]->type().type != TYPE_VARCHAR) { + continue; + } BasicBlock* null_block = NULL; BasicBlock* not_null_block = NULL; @@ -980,7 +1009,7 @@ Status HashTableCtx::CodegenHashRow(LlvmCodeGen* codegen, bool use_murmur, Funct builder.CreateRet(hash_result); // Avoid inlining into caller if there are many exprs. - if (build_expr_ctxs_.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) { + if (build_exprs_.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) { codegen->SetNoInline(*fn); } *fn = codegen->FinalizeFunction(*fn); @@ -1059,9 +1088,9 @@ Status HashTableCtx::CodegenHashRow(LlvmCodeGen* codegen, bool use_murmur, Funct // } Status HashTableCtx::CodegenEquals(LlvmCodeGen* codegen, bool force_null_equality, Function** fn) { - for (int i = 0; i < build_expr_ctxs_.size(); ++i) { + for (int i = 0; i < build_exprs_.size(); ++i) { // Disable codegen for CHAR - if (build_expr_ctxs_[i]->root()->type().type == TYPE_CHAR) { + if (build_exprs_[i]->type().type == TYPE_CHAR) { return Status("HashTableCtx::CodegenEquals(): CHAR NYI"); } } @@ -1090,36 +1119,36 @@ Status HashTableCtx::CodegenEquals(LlvmCodeGen* codegen, bool force_null_equalit Value* expr_values = args[2]; Value* expr_values_null = args[3]; - // ctx_vector = &build_expr_ctxs_[0] - Value* ctx_vector = codegen->CodegenCallFunction(&builder, - IRFunction::HASH_TABLE_GET_BUILD_EXPR_CTX, this_ptr, "ctx_vector"); + // eval_vector = &build_expr_evals_[0] + Value* eval_vector = codegen->CodegenCallFunction(&builder, + IRFunction::HASH_TABLE_GET_BUILD_EXPR_EVALUATORS, this_ptr, "eval_vector"); BasicBlock* false_block = BasicBlock::Create(context, "false_block", *fn); - for (int i = 0; i < build_expr_ctxs_.size(); ++i) { + for (int i = 0; i < build_exprs_.size(); ++i) { BasicBlock* null_block = BasicBlock::Create(context, "null", *fn); BasicBlock* not_null_block = BasicBlock::Create(context, "not_null", *fn); BasicBlock* continue_block = BasicBlock::Create(context, "continue", *fn); // call GetValue on build_exprs[i] Function* expr_fn; - Status status = build_expr_ctxs_[i]->root()->GetCodegendComputeFn(codegen, &expr_fn); + Status status = build_exprs_[i]->GetCodegendComputeFn(codegen, &expr_fn); if (!status.ok()) { (*fn)->eraseFromParent(); // deletes function *fn = NULL; return Status( Substitute("Problem with HashTableCtx::CodegenEquals: $0", status.GetDetail())); } - if (build_expr_ctxs_.size() > LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) { + if (build_exprs_.size() > LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) { // Avoid bloating function by inlining too many exprs into it. codegen->SetNoInline(expr_fn); } - // Load ExprContext*: expr_ctx = ctx_vector[i]; - Value* expr_ctx = codegen->CodegenArrayAt(&builder, ctx_vector, i, "expr_ctx"); - + // Load ScalarExprEvaluator*: eval = eval_vector[i]; + Value* eval_arg = + codegen->CodegenArrayAt(&builder, eval_vector, i, "eval"); // Evaluate the expression. CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(codegen, &builder, - build_expr_ctxs_[i]->root()->type(), expr_fn, {expr_ctx, row}, "result"); + build_exprs_[i]->type(), expr_fn, {eval_arg, row}, "result"); Value* is_null = result.GetIsNull(); // Determine if row is null (i.e. expr_values_null[i] == true). In @@ -1141,7 +1170,7 @@ Status HashTableCtx::CodegenEquals(LlvmCodeGen* codegen, bool force_null_equalit Value* loc = builder.CreateInBoundsGEP( NULL, expr_values, codegen->GetIntConstant(TYPE_INT, offset), "loc"); Value* row_val = builder.CreatePointerCast( - loc, codegen->GetPtrType(build_expr_ctxs_[i]->root()->type()), "row_val"); + loc, codegen->GetPtrType(build_exprs_[i]->type()), "row_val"); // Branch for GetValue() returning NULL builder.CreateCondBr(is_null, null_block, not_null_block); @@ -1170,7 +1199,7 @@ Status HashTableCtx::CodegenEquals(LlvmCodeGen* codegen, bool force_null_equalit builder.CreateRet(codegen->false_value()); // Avoid inlining into caller if it is large. - if (build_expr_ctxs_.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) { + if (build_exprs_.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) { codegen->SetNoInline(*fn); } *fn = codegen->FinalizeFunction(*fn); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hash-table.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h index a93d601..9ba5b04 100644 --- a/be/src/exec/hash-table.h +++ b/be/src/exec/hash-table.h @@ -38,12 +38,12 @@ namespace llvm { namespace impala { -class Expr; -class ExprContext; class LlvmCodeGen; class MemTracker; class RowDescriptor; class RuntimeState; +class ScalarExpr; +class ScalarExprEvaluator; class Tuple; class TupleRow; class HashTable; @@ -108,39 +108,29 @@ class HashTable; /// needed by a thread to operate on a hash table. class HashTableCtx { public: - /// Create a hash table context. - /// - build_exprs are the exprs that should be used to evaluate rows during Insert(). - /// - probe_exprs are used during FindProbeRow() - /// - stores_nulls: if false, TupleRows with nulls are ignored during Insert - /// - finds_nulls: if finds_nulls[i] is false, FindProbeRow() returns End() for - /// TupleRows with nulls in position i even if stores_nulls is true. - /// - initial_seed: initial seed value to use when computing hashes for rows with - /// level 0. Other levels have their seeds derived from this seed. - /// - max_levels: the max levels we will hash with. - /// - tracker: the memory tracker of the exec node which owns this hash table context. - /// Memory usage of expression values cache is charged against it. - /// TODO: stores_nulls is too coarse: for a hash table in which some columns are joined - /// with '<=>' and others with '=', stores_nulls could distinguish between columns - /// in which nulls are stored and columns in which they are not, which could save - /// space by not storing some rows we know will never match. - HashTableCtx(const std::vector<ExprContext*>& build_expr_ctxs, - const std::vector<ExprContext*>& probe_expr_ctxs, bool stores_nulls, - const std::vector<bool>& finds_nulls, int32_t initial_seed, - int max_levels, MemTracker* tracker); - /// Create a hash table context with the specified parameters, invoke Init() to - /// initialize the new hash table context and return it in 'ht_ctx'. Please see header - /// comments of HashTableCtx constructor for details of the parameters. - /// 'num_build_tuples' is the number of tuples of a row in the build side, used for - /// computing the size of a scratch row. - static Status Create(RuntimeState* state, - const std::vector<ExprContext*>& build_expr_ctxs, - const std::vector<ExprContext*>& probe_expr_ctxs, bool stores_nulls, + /// initialize the new hash table context and return it in 'ht_ctx'. Expression + /// evaluators for the build and probe expressions will also be allocated. + /// Please see the comments of HashTableCtx constructor and Init() for details + /// of other parameters. + static Status Create(ObjectPool* pool, RuntimeState* state, + const std::vector<ScalarExpr*>& build_exprs, + const std::vector<ScalarExpr*>& probe_exprs, bool stores_nulls, const std::vector<bool>& finds_nulls, int32_t initial_seed, int max_levels, - int num_build_tuples, MemTracker* tracker, boost::scoped_ptr<HashTableCtx>* ht_ctx); + int num_build_tuples, MemPool* mem_pool, boost::scoped_ptr<HashTableCtx>* ht_ctx); - /// Call to cleanup any resources. - void Close(); + /// Initialize the build and probe expression evaluators. + Status Open(RuntimeState* state); + + /// Call to cleanup any resources allocated by the expression evaluators. + void Close(RuntimeState* state); + + /// Free local allocations made by build and probe expression evaluators respectively. + void FreeBuildLocalAllocations(); + void FreeProbeLocalAllocations(); + + /// Free local allocations of both build and probe expression evaluators. + void FreeLocalAllocations(); void set_level(int level); @@ -249,7 +239,7 @@ class HashTableCtx { /// if memory allocation leads to the memory limits of the exec node to be exceeded. /// 'tracker' is the memory tracker of the exec node which owns this HashTableCtx. Status Init(RuntimeState* state, MemTracker* tracker, - const std::vector<ExprContext*>& build_expr_ctxs); + const std::vector<ScalarExpr*>& build_exprs); /// Frees up various resources and updates memory tracker with proper accounting. /// 'tracker' should be the same memory tracker which was passed in for Init(). @@ -398,9 +388,34 @@ class HashTableCtx { friend class HashTable; friend class HashTableTest_HashEmpty_Test; + /// Construct a hash table context. + /// - build_exprs are the exprs that should be used to evaluate rows during Insert(). + /// - probe_exprs are used during FindProbeRow() + /// - stores_nulls: if false, TupleRows with nulls are ignored during Insert + /// - finds_nulls: if finds_nulls[i] is false, FindProbeRow() returns End() for + /// TupleRows with nulls in position i even if stores_nulls is true. + /// - initial_seed: initial seed value to use when computing hashes for rows with + /// level 0. Other levels have their seeds derived from this seed. + /// - max_levels: the max lhashevels we will hash with. + /// - mem_pool: the MemPool which the expression evaluators allocate from. Owned by the + /// exec node which owns this hash table context. Memory usage of the expression + /// value cache is charged against its MemTracker. + /// + /// TODO: stores_nulls is too coarse: for a hash table in which some columns are joined + /// with '<=>' and others with '=', stores_nulls could distinguish between columns + /// in which nulls are stored and columns in which they are not, which could save + /// space by not storing some rows we know will never match. + HashTableCtx(const std::vector<ScalarExpr*>& build_exprs, + const std::vector<ScalarExpr*>& probe_exprs, bool stores_nulls, + const std::vector<bool>& finds_nulls, int32_t initial_seed, + int max_levels, MemPool* mem_pool); + /// Allocate various buffers for storing expression evaluation results, hash values, - /// null bits etc. Returns error if allocation causes query memory limit to be exceeded. - Status Init(RuntimeState* state, int num_build_tuples); + /// null bits etc. Also allocate evaluators for the build and probe expressions and + /// store them in 'pool'. Returns error if allocation causes query memory limit to + /// be exceeded or the evaluators fail to initialize. 'num_build_tuples' is the number + /// of tuples of a row in the build side, used for computing the size of a scratch row. + Status Init(ObjectPool* pool, RuntimeState* state, int num_build_tuples); /// Compute the hash of the values in 'expr_values' with nullness 'expr_values_null'. /// This will be replaced by codegen. We don't want this inlined for replacing @@ -418,14 +433,14 @@ class HashTableCtx { /// codegen'd function. bool IR_NO_INLINE EvalBuildRow( const TupleRow* row, uint8_t* expr_values, uint8_t* expr_values_null) noexcept { - return EvalRow(row, build_expr_ctxs_, expr_values, expr_values_null); + return EvalRow(row, build_expr_evals_, expr_values, expr_values_null); } /// Evaluate 'row' over probe exprs, storing the values into 'expr_values' and nullness /// into 'expr_values_null'. This will be replaced by codegen. bool IR_NO_INLINE EvalProbeRow( const TupleRow* row, uint8_t* expr_values, uint8_t* expr_values_null) noexcept { - return EvalRow(row, probe_expr_ctxs_, expr_values, expr_values_null); + return EvalRow(row, probe_expr_evals_, expr_values, expr_values_null); } /// Compute the hash of the values in 'expr_values' with nullness 'expr_values_null' @@ -436,7 +451,7 @@ class HashTableCtx { /// Evaluate the exprs over row, storing the values into 'expr_values' and nullness into /// 'expr_values_null'. Returns whether any expr evaluated to NULL. This will be /// replaced by codegen. - bool EvalRow(const TupleRow* row, const std::vector<ExprContext*>& ctxs, + bool EvalRow(const TupleRow* row, const std::vector<ScalarExprEvaluator*>& evaluators, uint8_t* expr_values, uint8_t* expr_values_null) noexcept; /// Returns true if the values of build_exprs evaluated over 'build_row' equal the @@ -462,13 +477,18 @@ class HashTableCtx { bool IR_NO_INLINE stores_nulls() const { return stores_nulls_; } bool IR_NO_INLINE finds_some_nulls() const { return finds_some_nulls_; } - /// Cross-compiled function to access the build/probe expression context. - /// Called by generated LLVM IR functions such as Equals() and EvalRow(). - ExprContext* const* IR_ALWAYS_INLINE GetBuildExprCtxs() const; - ExprContext* const* IR_ALWAYS_INLINE GetProbeExprCtxs() const; + /// Cross-compiled function to access the build/probe expression evaluators. + ScalarExprEvaluator* const* IR_ALWAYS_INLINE build_expr_evals() const; + ScalarExprEvaluator* const* IR_ALWAYS_INLINE probe_expr_evals() const; + + /// The exprs used to evaluate rows for inserting rows into hash table. + /// Also used when matching hash table entries against probe rows. + const std::vector<ScalarExpr*>& build_exprs_; + std::vector<ScalarExprEvaluator*> build_expr_evals_; - const std::vector<ExprContext*>& build_expr_ctxs_; - const std::vector<ExprContext*>& probe_expr_ctxs_; + /// The exprs used to evaluate rows for look-up in the hash table. + const std::vector<ScalarExpr*>& probe_exprs_; + std::vector<ScalarExprEvaluator*> probe_expr_evals_; /// Constants on how the hash table should behave. Joins and aggs have slightly /// different behavior. @@ -492,9 +512,9 @@ class HashTableCtx { /// Scratch buffer to generate rows on the fly. TupleRow* scratch_row_; - /// Memory tracker of the exec node which owns this hash table context. Account the - /// memory usage of expression values cache towards it. - MemTracker* tracker_; + /// MemPool for 'build_expr_evals_' and 'probe_expr_evals_' to allocate from. + /// Not owned. + MemPool* mem_pool_; }; /// The hash table consists of a contiguous array of buckets that contain a pointer to the http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hash-table.inline.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table.inline.h b/be/src/exec/hash-table.inline.h index f055649..aff7c14 100644 --- a/be/src/exec/hash-table.inline.h +++ b/be/src/exec/hash-table.inline.h @@ -21,9 +21,6 @@ #include "exec/hash-table.h" -#include "exprs/expr.h" -#include "exprs/expr-context.h" - namespace impala { inline bool HashTableCtx::EvalAndHashBuild(const TupleRow* row) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hbase-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hbase-scan-node.cc b/be/src/exec/hbase-scan-node.cc index 8ac492d..3d74d81 100644 --- a/be/src/exec/hbase-scan-node.cc +++ b/be/src/exec/hbase-scan-node.cc @@ -246,7 +246,8 @@ Status HBaseScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eo if (state->abort_on_error()) return Status(state->ErrorLog()); } - if (EvalConjuncts(&conjunct_ctxs_[0], conjunct_ctxs_.size(), row)) { + DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size()); + if (EvalConjuncts(conjunct_evals_.data(), conjuncts_.size(), row)) { row_batch->CommitLastRow(); ++num_rows_returned_; COUNTER_SET(rows_returned_counter_, num_rows_returned_); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hbase-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc index 9c5998b..8f9c64f 100644 --- a/be/src/exec/hbase-table-sink.cc +++ b/be/src/exec/hbase-table-sink.cc @@ -20,8 +20,9 @@ #include <vector> #include "common/logging.h" -#include "exprs/expr.h" -#include "exprs/expr-context.h" +#include "common/status.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "gen-cpp/ImpalaInternalService_constants.h" #include "runtime/mem-tracker.h" #include "util/runtime-profile-counters.h" @@ -33,40 +34,26 @@ namespace impala { const static string& ROOT_PARTITION_KEY = g_ImpalaInternalService_constants.ROOT_PARTITION_KEY; -HBaseTableSink::HBaseTableSink(const RowDescriptor& row_desc, - const vector<TExpr>& select_list_texprs, - const TDataSink& tsink) - : DataSink(row_desc), - table_id_(tsink.table_sink.target_table_id), - table_desc_(NULL), - hbase_table_writer_(NULL), - select_list_texprs_(select_list_texprs) { -} - -Status HBaseTableSink::PrepareExprs(RuntimeState* state) { - // From the thrift expressions create the real exprs. - RETURN_IF_ERROR(Expr::CreateExprTrees(state->obj_pool(), select_list_texprs_, - &output_expr_ctxs_)); - // Prepare the exprs to run. - RETURN_IF_ERROR( - Expr::Prepare(output_expr_ctxs_, state, row_desc_, expr_mem_tracker_.get())); - return Status::OK(); +HBaseTableSink::HBaseTableSink(const RowDescriptor& row_desc, const TDataSink& tsink) + : DataSink(row_desc), + table_id_(tsink.table_sink.target_table_id), + table_desc_(NULL), + hbase_table_writer_(NULL) { + DCHECK(tsink.__isset.table_sink); } Status HBaseTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) { RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker)); SCOPED_TIMER(profile()->total_time_counter()); - // Get the hbase table descriptor. The table name will be used. + // Get the hbase table descriptor. The table name will be used. table_desc_ = static_cast<HBaseTableDescriptor*>( state->desc_tbl().GetTableDescriptor(table_id_)); - // Prepare the expressions. - RETURN_IF_ERROR(PrepareExprs(state)); // Now that expressions are ready to materialize tuples, create the writer. hbase_table_writer_.reset( - new HBaseTableWriter(table_desc_, output_expr_ctxs_, profile())); + new HBaseTableWriter(table_desc_, output_expr_evals_, profile())); - // Try and init the table writer. This can create connections to HBase and + // Try and init the table writer. This can create connections to HBase and // to zookeeper. RETURN_IF_ERROR(hbase_table_writer_->Init(state)); @@ -80,13 +67,9 @@ Status HBaseTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_track return Status::OK(); } -Status HBaseTableSink::Open(RuntimeState* state) { - return Expr::Open(output_expr_ctxs_, state); -} - Status HBaseTableSink::Send(RuntimeState* state, RowBatch* batch) { SCOPED_TIMER(profile()->total_time_counter()); - ExprContext::FreeLocalAllocations(output_expr_ctxs_); + ScalarExprEvaluator::FreeLocalAllocations(output_expr_evals_); RETURN_IF_ERROR(state->CheckQueryState()); // Since everything is set up just forward everything to the writer. RETURN_IF_ERROR(hbase_table_writer_->AppendRows(batch)); @@ -108,7 +91,6 @@ void HBaseTableSink::Close(RuntimeState* state) { hbase_table_writer_->Close(state); hbase_table_writer_.reset(NULL); } - Expr::Close(output_expr_ctxs_, state); DataSink::Close(state); closed_ = true; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hbase-table-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hbase-table-sink.h b/be/src/exec/hbase-table-sink.h index 3326ddb..714c90b 100644 --- a/be/src/exec/hbase-table-sink.h +++ b/be/src/exec/hbase-table-sink.h @@ -35,19 +35,14 @@ namespace impala { /// eventually be written into an HBase table. class HBaseTableSink : public DataSink { public: - HBaseTableSink(const RowDescriptor& row_desc, - const std::vector<TExpr>& select_list_texprs, const TDataSink& tsink); + HBaseTableSink(const RowDescriptor& row_desc, const TDataSink& tsink); virtual std::string GetName() { return "HBaseTableSink"; } virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker); - virtual Status Open(RuntimeState* state); virtual Status Send(RuntimeState* state, RowBatch* batch); virtual Status FlushFinal(RuntimeState* state); virtual void Close(RuntimeState* state); private: - /// Turn thrift TExpr into Expr and prepare them to run - Status PrepareExprs(RuntimeState* state); - /// Used to get the HBaseTableDescriptor from the RuntimeState TableId table_id_; @@ -58,10 +53,6 @@ class HBaseTableSink : public DataSink { /// hbase_table_writer is owned by this sink and should be closed /// when this is Close'd. boost::scoped_ptr<HBaseTableWriter> hbase_table_writer_; - - /// Owned by the RuntimeState. - const std::vector<TExpr>& select_list_texprs_; - std::vector<ExprContext*> output_expr_ctxs_; }; } // namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hbase-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hbase-table-writer.cc b/be/src/exec/hbase-table-writer.cc index 7574adc..e621ad8 100644 --- a/be/src/exec/hbase-table-writer.cc +++ b/be/src/exec/hbase-table-writer.cc @@ -21,8 +21,8 @@ #include <sstream> #include "common/logging.h" -#include "exprs/expr.h" -#include "exprs/expr-context.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "runtime/hbase-table-factory.h" #include "runtime/mem-tracker.h" #include "runtime/raw-value.h" @@ -45,14 +45,12 @@ jmethodID HBaseTableWriter::list_add_id_ = NULL; jmethodID HBaseTableWriter::put_add_id_ = NULL; HBaseTableWriter::HBaseTableWriter(HBaseTableDescriptor* table_desc, - const vector<ExprContext*>& output_expr_ctxs, - RuntimeProfile* profile) - : table_desc_(table_desc), - table_(NULL), - output_expr_ctxs_(output_expr_ctxs), - put_list_(NULL), - runtime_profile_(profile) { -}; + const vector<ScalarExprEvaluator*>& output_expr_evals, RuntimeProfile* profile) + : table_desc_(table_desc), + table_(NULL), + output_expr_evals_(output_expr_evals), + put_list_(NULL), + runtime_profile_(profile) { } Status HBaseTableWriter::Init(RuntimeState* state) { RETURN_IF_ERROR(state->htable_factory()->GetTable(table_desc_->name(), @@ -72,7 +70,8 @@ Status HBaseTableWriter::Init(RuntimeState* state) { cf_arrays_.reserve(num_col - 1); qual_arrays_.reserve(num_col - 1); for (int i = 0; i < num_col; ++i) { - output_exprs_byte_sizes_[i] = output_expr_ctxs_[i]->root()->type().GetByteSize(); + output_exprs_byte_sizes_[i] = + output_expr_evals_[i]->root().type().GetByteSize(); if (i == 0) continue; @@ -138,20 +137,20 @@ Status HBaseTableWriter::AppendRows(RowBatch* batch) { TupleRow* current_row = batch->GetRow(idx_batch); jobject put = NULL; - if (output_expr_ctxs_[0]->GetValue(current_row) == NULL) { + if (output_expr_evals_[0]->GetValue(current_row) == NULL) { // HBase row key must not be null. return Status("Cannot insert into HBase with a null row key."); } for (int j = 0; j < num_cols; j++) { const HBaseTableDescriptor::HBaseColumnDescriptor& col = table_desc_->cols()[j]; - void* value = output_expr_ctxs_[j]->GetValue(current_row); + void* value = output_expr_evals_[j]->GetValue(current_row); if (value != NULL) { if (!col.binary_encoded) { // Text encoded string_value.clear(); - output_expr_ctxs_[j]->PrintValue(value, &string_value); + output_expr_evals_[j]->PrintValue(value, &string_value); data = string_value.data(); data_len = string_value.length(); } else { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hbase-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hbase-table-writer.h b/be/src/exec/hbase-table-writer.h index bc414ab..8360cd9 100644 --- a/be/src/exec/hbase-table-writer.h +++ b/be/src/exec/hbase-table-writer.h @@ -45,7 +45,7 @@ class RowBatch; class HBaseTableWriter { public: HBaseTableWriter(HBaseTableDescriptor* table_desc, - const std::vector<ExprContext*>& output_expr_ctxs, + const std::vector<ScalarExprEvaluator*>& output_expr_evals, RuntimeProfile* profile); Status AppendRows(RowBatch* batch); @@ -89,12 +89,13 @@ class HBaseTableWriter { /// up using close before the table can be discarded. boost::scoped_ptr<HBaseTable> table_; - /// The expressions that are run to create tuples to be written to hbase. - const std::vector<ExprContext*> output_expr_ctxs_; - - /// output_exprs_byte_sizes_[i] is the byte size of output_expr_ctxs_[i]->root()'s type. + /// Contains the byte size of output_expr_evals_[i]->root()'s type. std::vector<int> output_exprs_byte_sizes_; + /// Reference to the evaluators of expressions which generate the output value. + /// The evaluators are owned by the sink which owns this table writer. + const std::vector<ScalarExprEvaluator*>& output_expr_evals_; + /// jni ArrayList<Put> jobject put_list_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-avro-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc index 6c737a4..0df7c46 100644 --- a/be/src/exec/hdfs-avro-scanner.cc +++ b/be/src/exec/hdfs-avro-scanner.cc @@ -78,7 +78,7 @@ Status HdfsAvroScanner::Open(ScannerContext* context) { } Status HdfsAvroScanner::Codegen(HdfsScanNodeBase* node, - const vector<ExprContext*>& conjunct_ctxs, Function** decode_avro_data_fn) { + const vector<ScalarExpr*>& conjuncts, Function** decode_avro_data_fn) { *decode_avro_data_fn = NULL; DCHECK(node->runtime_state()->ShouldCodegen()); LlvmCodeGen* codegen = node->runtime_state()->codegen(); @@ -86,8 +86,8 @@ Status HdfsAvroScanner::Codegen(HdfsScanNodeBase* node, Function* materialize_tuple_fn = NULL; RETURN_IF_ERROR(CodegenMaterializeTuple(node, codegen, &materialize_tuple_fn)); DCHECK(materialize_tuple_fn != NULL); - RETURN_IF_ERROR(CodegenDecodeAvroData(codegen, materialize_tuple_fn, - conjunct_ctxs, decode_avro_data_fn)); + RETURN_IF_ERROR(CodegenDecodeAvroData(codegen, materialize_tuple_fn, conjuncts, + decode_avro_data_fn)); DCHECK(*decode_avro_data_fn != NULL); return Status::OK(); } @@ -1018,7 +1018,7 @@ Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element, } Status HdfsAvroScanner::CodegenDecodeAvroData(LlvmCodeGen* codegen, - Function* materialize_tuple_fn, const vector<ExprContext*>& conjunct_ctxs, + Function* materialize_tuple_fn, const vector<ScalarExpr*>& conjuncts, Function** decode_avro_data_fn) { SCOPED_TIMER(codegen->codegen_timer()); DCHECK(materialize_tuple_fn != NULL); @@ -1029,8 +1029,7 @@ Status HdfsAvroScanner::CodegenDecodeAvroData(LlvmCodeGen* codegen, DCHECK_EQ(replaced, 1); Function* eval_conjuncts_fn; - RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjunct_ctxs, - &eval_conjuncts_fn)); + RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjuncts, &eval_conjuncts_fn)); replaced = codegen->ReplaceCallSites(fn, eval_conjuncts_fn, "EvalConjuncts"); DCHECK_EQ(replaced, 1); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-avro-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h index 595a733..b09d87f 100644 --- a/be/src/exec/hdfs-avro-scanner.h +++ b/be/src/exec/hdfs-avro-scanner.h @@ -96,7 +96,7 @@ class HdfsAvroScanner : public BaseSequenceScanner { /// Codegen DecodeAvroData(). Stores the resulting function in 'decode_avro_data_fn' if /// codegen was successful or NULL otherwise. static Status Codegen(HdfsScanNodeBase* node, - const std::vector<ExprContext*>& conjunct_ctxs, + const std::vector<ScalarExpr*>& conjuncts, llvm::Function** decode_avro_data_fn); protected: @@ -199,7 +199,7 @@ class HdfsAvroScanner : public BaseSequenceScanner { /// successful or returns an error. static Status CodegenDecodeAvroData(LlvmCodeGen* codegen, llvm::Function* materialize_tuple_fn, - const std::vector<ExprContext*>& conjunct_ctxs, + const std::vector<ScalarExpr*>& conjuncts, llvm::Function** decode_avro_data_fn); /// Codegens a version of MaterializeTuple() that reads records based on the table http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-avro-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-table-writer.cc b/be/src/exec/hdfs-avro-table-writer.cc index 00a51dd..46185e8 100644 --- a/be/src/exec/hdfs-avro-table-writer.cc +++ b/be/src/exec/hdfs-avro-table-writer.cc @@ -28,8 +28,8 @@ #include "util/compress.h" #include "util/hdfs-util.h" #include "util/uid-util.h" -#include "exprs/expr.h" -#include "exprs/expr-context.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "runtime/mem-pool.h" #include "runtime/mem-tracker.h" #include "runtime/raw-value.h" @@ -55,10 +55,9 @@ const int DEFAULT_AVRO_BLOCK_SIZE = 64 * 1024; HdfsAvroTableWriter::HdfsAvroTableWriter(HdfsTableSink* parent, RuntimeState* state, OutputPartition* output, - const HdfsPartitionDescriptor* partition, const HdfsTableDescriptor* table_desc, - const vector<ExprContext*>& output_exprs) : - HdfsTableWriter(parent, state, output, partition, table_desc, output_exprs), - unflushed_rows_(0) { + const HdfsPartitionDescriptor* partition, const HdfsTableDescriptor* table_desc) + : HdfsTableWriter(parent, state, output, partition, table_desc), + unflushed_rows_(0) { mem_pool_.reset(new MemPool(parent->mem_tracker())); } @@ -67,8 +66,8 @@ void HdfsAvroTableWriter::ConsumeRow(TupleRow* row) { int num_non_partition_cols = table_desc_->num_cols() - table_desc_->num_clustering_cols(); for (int j = 0; j < num_non_partition_cols; ++j) { - void* value = output_expr_ctxs_[j]->GetValue(row); - AppendField(output_expr_ctxs_[j]->root()->type(), value); + void* value = output_expr_evals_[j]->GetValue(row); + AppendField(output_expr_evals_[j]->root().type(), value); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-avro-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-table-writer.h b/be/src/exec/hdfs-avro-table-writer.h index 01a79f7..f85659e 100644 --- a/be/src/exec/hdfs-avro-table-writer.h +++ b/be/src/exec/hdfs-avro-table-writer.h @@ -30,14 +30,14 @@ namespace impala { -class Expr; struct ColumnType; +class HdfsTableSink; +class RuntimeState; +class ScalarExprEvaluator; class TupleDescriptor; class TupleRow; -class RuntimeState; -class HdfsTableSink; -struct StringValue; struct OutputPartition; +struct StringValue; /// Consumes rows and outputs the rows into an Avro file in HDFS /// Each Avro file contains a block of records (rows). The file metadata specifies the @@ -64,8 +64,7 @@ class HdfsAvroTableWriter : public HdfsTableWriter { HdfsAvroTableWriter(HdfsTableSink* parent, RuntimeState* state, OutputPartition* output, const HdfsPartitionDescriptor* partition, - const HdfsTableDescriptor* table_desc, - const std::vector<ExprContext*>& output_exprs); + const HdfsTableDescriptor* table_desc); virtual ~HdfsAvroTableWriter() { } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-parquet-scanner-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner-ir.cc b/be/src/exec/hdfs-parquet-scanner-ir.cc index c9cf1eb..69951c7 100644 --- a/be/src/exec/hdfs-parquet-scanner-ir.cc +++ b/be/src/exec/hdfs-parquet-scanner-ir.cc @@ -19,7 +19,7 @@ #include "exec/filter-context.h" #include "exec/parquet-scratch-tuple-batch.h" -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" #include "runtime/runtime-filter.h" #include "runtime/runtime-filter.inline.h" #include "runtime/tuple-row.h" @@ -27,8 +27,8 @@ using namespace impala; int HdfsParquetScanner::ProcessScratchBatch(RowBatch* dst_batch) { - ExprContext* const* conjunct_ctxs = &(*scanner_conjunct_ctxs_)[0]; - const int num_conjuncts = scanner_conjunct_ctxs_->size(); + ScalarExprEvaluator* const* conjunct_evals = &(*conjunct_evals_)[0]; + const int num_conjuncts = conjunct_evals_->size(); // Start/end/current iterators over the output rows. Tuple** output_row_start = @@ -54,7 +54,7 @@ int HdfsParquetScanner::ProcessScratchBatch(RowBatch* dst_batch) { if (!EvalRuntimeFilters(reinterpret_cast<TupleRow*>(output_row))) { continue; } - if (!ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, + if (!ExecNode::EvalConjuncts(conjunct_evals, num_conjuncts, reinterpret_cast<TupleRow*>(output_row))) { continue; }
