http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/partitioned-hash-join-builder.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h index b133be6..f7c1f88 100644 --- a/be/src/exec/partitioned-hash-join-builder.h +++ b/be/src/exec/partitioned-hash-join-builder.h @@ -33,9 +33,10 @@ namespace impala { -class ExprContext; class RowDescriptor; class RuntimeState; +class ScalarExpr; +class ScalarExprEvaluator; /// The build side for the PartitionedHashJoinNode. Build-side rows are hash-partitioned /// into PARTITION_FANOUT partitions, with partitions spilled if the full build side @@ -73,7 +74,8 @@ class PhjBuilder : public DataSink { PhjBuilder(int join_node_id, TJoinOp::type join_op, const RowDescriptor& probe_row_desc, const RowDescriptor& build_row_desc, RuntimeState* state); - Status Init(RuntimeState* state, const std::vector<TEqJoinCondition>& eq_join_conjuncts, + Status InitExprsAndFilters(RuntimeState* state, + const std::vector<TEqJoinCondition>& eq_join_conjuncts, const std::vector<TRuntimeFilterDesc>& filters); /// Implementations of DataSink interface methods. @@ -236,6 +238,12 @@ class PhjBuilder : public DataSink { std::unique_ptr<BufferedTupleStream> build_rows_; }; + protected: + /// Init() function inherited from DataSink. Overridden to be a no-op for now. + /// TODO: Merge with InitExprsAndFilters() once this class becomes a true data sink. + virtual Status Init(const std::vector<TExpr>& thrift_output_exprs, + const TDataSink& tsink, RuntimeState* state) override; + private: /// Computes the minimum number of buffers required to execute the spilling partitioned /// hash algorithm successfully for any input size (assuming enough disk space is @@ -253,6 +261,9 @@ class PhjBuilder : public DataSink { return num_reserved_buffers; } + /// Free local allocations made from expr evaluators during hash table construction. + void FreeLocalAllocations() const; + /// Create and initialize a set of hash partitions for partitioning level 'level'. /// The previous hash partitions must have been cleared with ClearHashPartitions(). /// After calling this, batches are added to the new partitions by calling Send(). @@ -357,18 +368,18 @@ class PhjBuilder : public DataSink { /// If true, the build side has at least one row. bool non_empty_build_; - /// Expr contexts to free after partitioning or inserting each batch. - std::vector<ExprContext*> expr_ctxs_to_free_; - - /// Expression contexts over input rows for hash table build. - std::vector<ExprContext*> build_expr_ctxs_; + /// Expressions over input rows for hash table build. + std::vector<ScalarExpr*> build_exprs_; /// is_not_distinct_from_[i] is true if and only if the ith equi-join predicate is IS /// NOT DISTINCT FROM, rather than equality. std::vector<bool> is_not_distinct_from_; - /// List of filters to build. - std::vector<FilterContext> filters_; + /// Expressions for evaluating input rows for insertion into runtime filters. + std::vector<ScalarExpr*> filter_exprs_; + + /// List of filters to build. One-to-one correspondence with exprs in 'filter_exprs_'. + std::vector<FilterContext> filter_ctxs_; /// Used for hash-related functionality, such as evaluating rows and calculating hashes. /// The level is set to the same level as 'hash_partitions_'. @@ -423,8 +434,8 @@ class PhjBuilder : public DataSink { /// Partition used for null-aware joins. This partition is always processed at the end /// after all build and probe rows are processed. In this partition's 'build_rows_', we - /// store all the rows for which 'build_expr_ctxs_' evaluated over the row returns NULL - /// (i.e. it has a NULL on the eq join slot). + /// store all the rows for which 'build_expr_evals_' evaluated over the row returns + /// NULL (i.e. it has a NULL on the eq join slot). /// NULL if the join is not null aware or we are done processing this partition. Partition* null_aware_partition_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/partitioned-hash-join-node-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc index bbed06d..42c9f4f 100644 --- a/be/src/exec/partitioned-hash-join-node-ir.cc +++ b/be/src/exec/partitioned-hash-join-node-ir.cc @@ -33,14 +33,14 @@ namespace impala { // TODO: explicitly set the calling convention? // TODO: investigate using fastcc for all codegen internal functions? bool IR_NO_INLINE EvalOtherJoinConjuncts( - ExprContext* const* ctxs, int num_ctxs, TupleRow* row) { - return ExecNode::EvalConjuncts(ctxs, num_ctxs, row); + ScalarExprEvaluator* const* evals, int num_evals, TupleRow* row) { + return ExecNode::EvalConjuncts(evals, num_evals, row); } bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowInnerJoin( - ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts, - ExprContext* const* conjunct_ctxs, int num_conjuncts, - RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) { + ScalarExprEvaluator* const* other_join_conjunct_evals, + int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals, + int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) { DCHECK(current_probe_row_ != NULL); TupleRow* out_row = out_batch_iterator->Get(); for (; !hash_tbl_iterator_.AtEnd(); hash_tbl_iterator_.NextDuplicate()) { @@ -50,11 +50,11 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowInnerJoin( // Create an output row with all probe/build tuples and evaluate the // non-equi-join conjuncts. CreateOutputRow(out_row, current_probe_row_, matched_build_row); - if (!EvalOtherJoinConjuncts(other_join_conjunct_ctxs, num_other_join_conjuncts, + if (!EvalOtherJoinConjuncts(other_join_conjunct_evals, num_other_join_conjuncts, out_row)) { continue; } - if (ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, out_row)) { + if (ExecNode::EvalConjuncts(conjunct_evals, num_conjuncts, out_row)) { --(*remaining_capacity); if (*remaining_capacity == 0) { hash_tbl_iterator_.NextDuplicate(); @@ -68,9 +68,9 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowInnerJoin( template<int const JoinOp> bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowRightSemiJoins( - ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts, - ExprContext* const* conjunct_ctxs, int num_conjuncts, - RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) { + ScalarExprEvaluator* const* other_join_conjunct_evals, + int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals, + int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) { DCHECK(current_probe_row_ != NULL); DCHECK(JoinOp == TJoinOp::RIGHT_SEMI_JOIN || JoinOp == TJoinOp::RIGHT_ANTI_JOIN); TupleRow* out_row = out_batch_iterator->Get(); @@ -83,7 +83,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowRightSemiJoins( // build and probe tuples. if (num_other_join_conjuncts > 0) { CreateOutputRow(semi_join_staging_row_, current_probe_row_, matched_build_row); - if (!EvalOtherJoinConjuncts(other_join_conjunct_ctxs, + if (!EvalOtherJoinConjuncts(other_join_conjunct_evals, num_other_join_conjuncts, semi_join_staging_row_)) { continue; } @@ -93,7 +93,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowRightSemiJoins( // Update the hash table to indicate that this entry has been matched. hash_tbl_iterator_.SetMatched(); if (JoinOp == TJoinOp::RIGHT_SEMI_JOIN && - ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, out_row)) { + ExecNode::EvalConjuncts(conjunct_evals, num_conjuncts, out_row)) { --(*remaining_capacity); if (*remaining_capacity == 0) { hash_tbl_iterator_.NextDuplicate(); @@ -107,9 +107,10 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowRightSemiJoins( template<int const JoinOp> bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowLeftSemiJoins( - ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts, - ExprContext* const* conjunct_ctxs, int num_conjuncts, - RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status) { + ScalarExprEvaluator* const* other_join_conjunct_evals, + int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals, + int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, + Status* status) { DCHECK(current_probe_row_ != NULL); DCHECK(JoinOp == TJoinOp::LEFT_ANTI_JOIN || JoinOp == TJoinOp::LEFT_SEMI_JOIN || JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN); @@ -121,7 +122,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowLeftSemiJoins( // build and probe tuples. if (num_other_join_conjuncts > 0) { CreateOutputRow(semi_join_staging_row_, current_probe_row_, matched_build_row); - if (!EvalOtherJoinConjuncts(other_join_conjunct_ctxs, + if (!EvalOtherJoinConjuncts(other_join_conjunct_evals, num_other_join_conjuncts, semi_join_staging_row_)) { continue; } @@ -133,7 +134,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowLeftSemiJoins( hash_tbl_iterator_.SetAtEnd(); // Append to output batch for left semi joins if the conjuncts are satisfied. if (JoinOp == TJoinOp::LEFT_SEMI_JOIN && - ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, out_row)) { + ExecNode::EvalConjuncts(conjunct_evals, num_conjuncts, out_row)) { --(*remaining_capacity); if (*remaining_capacity == 0) return false; out_row = out_batch_iterator->Next(); @@ -159,7 +160,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowLeftSemiJoins( } } // No match for this current_probe_row_, we need to output it. No need to - // evaluate the conjunct_ctxs since anti joins cannot have any. + // evaluate the conjunct_evals since anti joins cannot have any. out_batch_iterator->parent()->CopyRow(current_probe_row_, out_row); matched_probe_ = true; --(*remaining_capacity); @@ -171,9 +172,9 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowLeftSemiJoins( template<int const JoinOp> bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowOuterJoins( - ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts, - ExprContext* const* conjunct_ctxs, int num_conjuncts, - RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) { + ScalarExprEvaluator* const* other_join_conjunct_evals, + int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals, + int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) { DCHECK(JoinOp == TJoinOp::LEFT_OUTER_JOIN || JoinOp == TJoinOp::RIGHT_OUTER_JOIN || JoinOp == TJoinOp::FULL_OUTER_JOIN); DCHECK(current_probe_row_ != NULL); @@ -184,7 +185,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowOuterJoins( // Create an output row with all probe/build tuples and evaluate the // non-equi-join conjuncts. CreateOutputRow(out_row, current_probe_row_, matched_build_row); - if (!EvalOtherJoinConjuncts(other_join_conjunct_ctxs, num_other_join_conjuncts, + if (!EvalOtherJoinConjuncts(other_join_conjunct_evals, num_other_join_conjuncts, out_row)) { continue; } @@ -195,7 +196,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowOuterJoins( // as matched for right/full outer joins. hash_tbl_iterator_.SetMatched(); } - if (ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, out_row)) { + if (ExecNode::EvalConjuncts(conjunct_evals, num_conjuncts, out_row)) { --(*remaining_capacity); if (*remaining_capacity == 0) { hash_tbl_iterator_.NextDuplicate(); @@ -208,7 +209,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowOuterJoins( if (JoinOp != TJoinOp::RIGHT_OUTER_JOIN && !matched_probe_) { // No match for this row, we need to output it if it's a left/full outer join. CreateOutputRow(out_row, current_probe_row_, NULL); - if (ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, out_row)) { + if (ExecNode::EvalConjuncts(conjunct_evals, num_conjuncts, out_row)) { matched_probe_ = true; --(*remaining_capacity); if (*remaining_capacity == 0) return false; @@ -220,28 +221,30 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowOuterJoins( template <int const JoinOp> bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRow( - ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts, - ExprContext* const* conjunct_ctxs, int num_conjuncts, - RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status) { + ScalarExprEvaluator* const* other_join_conjunct_evals, + int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals, + int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, + Status* status) { if (JoinOp == TJoinOp::INNER_JOIN) { - return ProcessProbeRowInnerJoin(other_join_conjunct_ctxs, num_other_join_conjuncts, - conjunct_ctxs, num_conjuncts, out_batch_iterator, remaining_capacity); + return ProcessProbeRowInnerJoin(other_join_conjunct_evals, + num_other_join_conjuncts, conjunct_evals, num_conjuncts, out_batch_iterator, + remaining_capacity); } else if (JoinOp == TJoinOp::RIGHT_SEMI_JOIN || JoinOp == TJoinOp::RIGHT_ANTI_JOIN) { - return ProcessProbeRowRightSemiJoins<JoinOp>(other_join_conjunct_ctxs, - num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, out_batch_iterator, + return ProcessProbeRowRightSemiJoins<JoinOp>(other_join_conjunct_evals, + num_other_join_conjuncts, conjunct_evals, num_conjuncts, out_batch_iterator, remaining_capacity); } else if (JoinOp == TJoinOp::LEFT_SEMI_JOIN || JoinOp == TJoinOp::LEFT_ANTI_JOIN || JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - return ProcessProbeRowLeftSemiJoins<JoinOp>(other_join_conjunct_ctxs, - num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, out_batch_iterator, + return ProcessProbeRowLeftSemiJoins<JoinOp>(other_join_conjunct_evals, + num_other_join_conjuncts, conjunct_evals, num_conjuncts, out_batch_iterator, remaining_capacity, status); } else { DCHECK(JoinOp == TJoinOp::RIGHT_OUTER_JOIN || JoinOp == TJoinOp::LEFT_OUTER_JOIN || TJoinOp::FULL_OUTER_JOIN); - return ProcessProbeRowOuterJoins<JoinOp>(other_join_conjunct_ctxs, - num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, out_batch_iterator, + return ProcessProbeRowOuterJoins<JoinOp>(other_join_conjunct_evals, + num_other_join_conjuncts, conjunct_evals, num_conjuncts, out_batch_iterator, remaining_capacity); } } @@ -269,7 +272,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow( // Fetch the hash and expr values' nullness for this row. if (expr_vals_cache->IsRowNull()) { if (JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && builder_->non_empty_build()) { - const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size(); + const int num_other_join_conjuncts = other_join_conjunct_evals_.size(); // For NAAJ, we need to treat NULLs on the probe carefully. The logic is: // 1. No build rows -> Return this row. The check for 'non_empty_build_' // is for this case. @@ -363,10 +366,11 @@ int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode RowBatch* out_batch, HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) { DCHECK(state_ == PARTITIONING_PROBE || state_ == PROBING_SPILLED_PARTITION || state_ == REPARTITIONING_PROBE); - ExprContext* const* other_join_conjunct_ctxs = &other_join_conjunct_ctxs_[0]; - const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size(); - ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0]; - const int num_conjuncts = conjunct_ctxs_.size(); + ScalarExprEvaluator* const* other_join_conjunct_evals = + other_join_conjunct_evals_.data(); + const int num_other_join_conjuncts = other_join_conjunct_evals_.size(); + ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data(); + const int num_conjuncts = conjunct_evals_.size(); DCHECK(!out_batch->AtCapacity()); DCHECK_GE(probe_batch_pos_, 0); @@ -399,9 +403,9 @@ int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode do { // 'current_probe_row_' can be NULL on the first iteration through this loop. if (current_probe_row_ != NULL) { - if (!ProcessProbeRow<JoinOp>(other_join_conjunct_ctxs, num_other_join_conjuncts, - conjunct_ctxs, num_conjuncts, &out_batch_iterator, &remaining_capacity, - status)) { + if (!ProcessProbeRow<JoinOp>(other_join_conjunct_evals, + num_other_join_conjuncts, conjunct_evals, num_conjuncts, + &out_batch_iterator, &remaining_capacity, status)) { if (status->ok()) DCHECK_EQ(remaining_capacity, 0); break; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/partitioned-hash-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc index d3a10d3..d36aa71 100644 --- a/be/src/exec/partitioned-hash-join-node.cc +++ b/be/src/exec/partitioned-hash-join-node.cc @@ -24,8 +24,8 @@ #include "codegen/llvm-codegen.h" #include "exec/hash-table.inline.h" -#include "exprs/expr.h" -#include "exprs/expr-context.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "exprs/slot-ref.h" #include "runtime/buffered-block-mgr.h" #include "runtime/buffered-tuple-stream.inline.h" @@ -79,19 +79,26 @@ Status PartitionedHashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state // being separated out further. builder_.reset( new PhjBuilder(id(), join_op_, child(0)->row_desc(), child(1)->row_desc(), state)); - RETURN_IF_ERROR(builder_->Init(state, eq_join_conjuncts, tnode.runtime_filters)); + RETURN_IF_ERROR( + builder_->InitExprsAndFilters(state, eq_join_conjuncts, tnode.runtime_filters)); for (const TEqJoinCondition& eq_join_conjunct : eq_join_conjuncts) { - ExprContext* ctx; - RETURN_IF_ERROR(Expr::CreateExprTree(pool_, eq_join_conjunct.left, &ctx)); - probe_expr_ctxs_.push_back(ctx); - RETURN_IF_ERROR(Expr::CreateExprTree(pool_, eq_join_conjunct.right, &ctx)); - build_expr_ctxs_.push_back(ctx); + ScalarExpr* probe_expr; + RETURN_IF_ERROR(ScalarExpr::Create(eq_join_conjunct.left, child(0)->row_desc(), + state, &probe_expr)); + probe_exprs_.push_back(probe_expr); + ScalarExpr* build_expr; + RETURN_IF_ERROR(ScalarExpr::Create(eq_join_conjunct.right, child(1)->row_desc(), + state, &build_expr)); + build_exprs_.push_back(build_expr); } - RETURN_IF_ERROR(Expr::CreateExprTrees( - pool_, tnode.hash_join_node.other_join_conjuncts, &other_join_conjunct_ctxs_)); - DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || - eq_join_conjuncts.size() == 1); + // other_join_conjuncts_ are evaluated in the context of rows assembled from all build + // and probe tuples; full_row_desc is not necessarily the same as the output row desc, + // e.g., because semi joins only return the build xor probe tuples + RowDescriptor full_row_desc(child(0)->row_desc(), child(1)->row_desc()); + RETURN_IF_ERROR(ScalarExpr::Create(tnode.hash_join_node.other_join_conjuncts, + full_row_desc, state, &other_join_conjuncts_)); + DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || eq_join_conjuncts.size() == 1); return Status::OK(); } @@ -104,31 +111,14 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) { RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker())); runtime_profile()->PrependChild(builder_->profile()); - // build and probe exprs are evaluated in the context of the rows produced by our - // right and left children, respectively - RETURN_IF_ERROR( - Expr::Prepare(build_expr_ctxs_, state, child(1)->row_desc(), expr_mem_tracker())); - RETURN_IF_ERROR( - Expr::Prepare(probe_expr_ctxs_, state, child(0)->row_desc(), expr_mem_tracker())); - - // Build expressions may be evaluated during probing, so must be freed. - // Probe side expr is not included in QueryMaintenance(). We cache the probe expression - // values in ExprValuesCache. Local allocations need to survive until the cache is reset - // so we need to manually free probe expr local allocations. - AddExprCtxsToFree(build_expr_ctxs_); + RETURN_IF_ERROR(ScalarExprEvaluator::Create(other_join_conjuncts_, state, pool_, + expr_mem_pool(), &other_join_conjunct_evals_)); + AddEvaluatorsToFree(other_join_conjunct_evals_); - // other_join_conjunct_ctxs_ are evaluated in the context of rows assembled from all - // build and probe tuples; full_row_desc is not necessarily the same as the output row - // desc, e.g., because semi joins only return the build xor probe tuples - RowDescriptor full_row_desc(child(0)->row_desc(), child(1)->row_desc()); - RETURN_IF_ERROR( - Expr::Prepare(other_join_conjunct_ctxs_, state, full_row_desc, expr_mem_tracker())); - AddExprCtxsToFree(other_join_conjunct_ctxs_); - - RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, probe_expr_ctxs_, + RETURN_IF_ERROR(HashTableCtx::Create(pool_, state, build_exprs_, probe_exprs_, builder_->HashTableStoresNulls(), builder_->is_not_distinct_from(), state->fragment_hash_seed(), MAX_PARTITION_DEPTH, - child(1)->row_desc().tuple_descriptors().size(), mem_tracker(), &ht_ctx_)); + child(1)->row_desc().tuple_descriptors().size(), expr_mem_pool(), &ht_ctx_)); if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { null_aware_eval_timer_ = ADD_TIMER(runtime_profile(), "NullAwareAntiJoinEvalTime"); } @@ -163,9 +153,8 @@ void PartitionedHashJoinNode::Codegen(RuntimeState* state) { Status PartitionedHashJoinNode::Open(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(BlockingJoinNode::Open(state)); - RETURN_IF_ERROR(Expr::Open(build_expr_ctxs_, state)); - RETURN_IF_ERROR(Expr::Open(probe_expr_ctxs_, state)); - RETURN_IF_ERROR(Expr::Open(other_join_conjunct_ctxs_, state)); + RETURN_IF_ERROR(ht_ctx_->Open(state)); + RETURN_IF_ERROR(ScalarExprEvaluator::Open(other_join_conjunct_evals_, state)); if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { RETURN_IF_ERROR(InitNullAwareProbePartition()); @@ -179,7 +168,7 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) { // (e.g. calling UdfBuiltins::Lower()). The probe expressions' local allocations need to // be freed now as they don't get freed again till probing. Other exprs' local allocations // are freed in ExecNode::FreeLocalAllocations(). - ExprContext::FreeLocalAllocations(probe_expr_ctxs_); + ht_ctx_->FreeProbeLocalAllocations(); RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_.get())); RETURN_IF_ERROR(PrepareForProbe()); @@ -192,6 +181,15 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) { return Status::OK(); } +Status PartitionedHashJoinNode::QueryMaintenance(RuntimeState* state) { + // Build expressions may be evaluated during probing, so must be freed. + // Probe side expr is not included in QueryMaintenance(). We cache the probe expression + // values in ExprValuesCache. Local allocations need to survive until the cache is reset + // so we need to manually free probe expr local allocations. + if (ht_ctx_.get() != nullptr) ht_ctx_->FreeBuildLocalAllocations(); + return ExecNode::QueryMaintenance(state); +} + Status PartitionedHashJoinNode::Reset(RuntimeState* state) { if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { null_probe_output_idx_ = -1; @@ -240,15 +238,17 @@ void PartitionedHashJoinNode::CloseAndDeletePartitions() { void PartitionedHashJoinNode::Close(RuntimeState* state) { if (is_closed()) return; - if (ht_ctx_ != NULL) ht_ctx_->Close(); + if (ht_ctx_ != nullptr) ht_ctx_->Close(state); + ht_ctx_.reset(); nulls_build_batch_.reset(); output_unmatched_batch_.reset(); output_unmatched_batch_iter_.reset(); CloseAndDeletePartitions(); - if (builder_ != NULL) builder_->Close(state); - Expr::Close(build_expr_ctxs_, state); - Expr::Close(probe_expr_ctxs_, state); - Expr::Close(other_join_conjunct_ctxs_, state); + if (builder_ != nullptr) builder_->Close(state); + ScalarExprEvaluator::Close(other_join_conjunct_evals_, state); + ScalarExpr::Close(build_exprs_); + ScalarExpr::Close(probe_exprs_); + ScalarExpr::Close(other_join_conjuncts_); BlockingJoinNode::Close(state); } @@ -591,7 +591,7 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch // Free local allocations of the probe side expressions only after ExprValuesCache // has been reset. DCHECK(ht_ctx_->expr_values_cache()->AtEnd()); - ExprContext::FreeLocalAllocations(probe_expr_ctxs_); + ht_ctx_->FreeProbeLocalAllocations(); // We want to return as soon as we have attached a tuple stream to the out_batch // (before preparing a new partition). The attached tuple stream will be recycled @@ -665,8 +665,8 @@ Status PartitionedHashJoinNode::OutputAllBuild(RowBatch* out_batch) { // it is done by the loop in GetNext(). So, there must be exactly one partition in // 'output_build_partitions_' here. DCHECK_EQ(output_build_partitions_.size(), 1); - ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0]; - const int num_conjuncts = conjunct_ctxs_.size(); + ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data(); + const int num_conjuncts = conjuncts_.size(); RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows()); bool eos = false; @@ -684,7 +684,7 @@ Status PartitionedHashJoinNode::OutputAllBuild(RowBatch* out_batch) { output_unmatched_batch_iter_->Next()) { OutputBuildRow(out_batch, output_unmatched_batch_iter_->Get(), &out_batch_iterator); if (ExecNode::EvalConjuncts( - conjunct_ctxs, num_conjuncts, out_batch_iterator.Get())) { + conjunct_evals, num_conjuncts, out_batch_iterator.Get())) { out_batch->CommitLastRow(); out_batch_iterator.Next(); } @@ -705,8 +705,8 @@ Status PartitionedHashJoinNode::OutputAllBuild(RowBatch* out_batch) { } Status PartitionedHashJoinNode::OutputUnmatchedBuildFromHashTable(RowBatch* out_batch) { - ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0]; - const int num_conjuncts = conjunct_ctxs_.size(); + ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data(); + const int num_conjuncts = conjuncts_.size(); RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows()); while (!out_batch->AtCapacity() && !hash_tbl_iterator_.AtEnd()) { @@ -714,7 +714,7 @@ Status PartitionedHashJoinNode::OutputUnmatchedBuildFromHashTable(RowBatch* out_ if (!hash_tbl_iterator_.IsMatched()) { OutputBuildRow(out_batch, hash_tbl_iterator_.GetRow(), &out_batch_iterator); if (ExecNode::EvalConjuncts( - conjunct_ctxs, num_conjuncts, out_batch_iterator.Get())) { + conjunct_evals, num_conjuncts, out_batch_iterator.Get())) { out_batch->CommitLastRow(); out_batch_iterator.Next(); } @@ -892,8 +892,8 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state, DCHECK(null_aware_probe_partition_ != NULL); DCHECK(nulls_build_batch_ != NULL); - ExprContext* const* join_conjunct_ctxs = &other_join_conjunct_ctxs_[0]; - int num_join_conjuncts = other_join_conjunct_ctxs_.size(); + ScalarExprEvaluator* const* join_conjunct_evals = other_join_conjunct_evals_.data(); + int num_join_conjuncts = other_join_conjuncts_.size(); DCHECK(probe_batch_ != NULL); BufferedTupleStream* probe_stream = null_aware_probe_partition_->probe_rows(); @@ -919,12 +919,11 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state, for (; probe_batch_pos_ < probe_batch_->num_rows(); ++probe_batch_pos_) { if (out_batch->AtCapacity()) break; TupleRow* probe_row = probe_batch_->GetRow(probe_batch_pos_); - bool matched = false; for (int i = 0; i < nulls_build_batch_->num_rows(); ++i) { CreateOutputRow(semi_join_staging_row_, probe_row, nulls_build_batch_->GetRow(i)); if (ExecNode::EvalConjuncts( - join_conjunct_ctxs, num_join_conjuncts, semi_join_staging_row_)) { + join_conjunct_evals, num_join_conjuncts, semi_join_staging_row_)) { matched = true; break; } @@ -1003,8 +1002,8 @@ Status PartitionedHashJoinNode::EvaluateNullProbe(BufferedTupleStream* build) { RETURN_IF_ERROR(null_probe_rows_->GetRows(&probe_rows, &got_rows)); if (!got_rows) return NullAwareAntiJoinError(false); - ExprContext* const* join_conjunct_ctxs = &other_join_conjunct_ctxs_[0]; - int num_join_conjuncts = other_join_conjunct_ctxs_.size(); + ScalarExprEvaluator* const* join_conjunct_evals = other_join_conjunct_evals_.data(); + int num_join_conjuncts = other_join_conjuncts_.size(); DCHECK_LE(probe_rows->num_rows(), matched_null_probe_.size()); // For each row, iterate over all rows in the build table. @@ -1015,7 +1014,7 @@ Status PartitionedHashJoinNode::EvaluateNullProbe(BufferedTupleStream* build) { CreateOutputRow(semi_join_staging_row_, probe_rows->GetRow(i), build_rows->GetRow(j)); if (ExecNode::EvalConjuncts( - join_conjunct_ctxs, num_join_conjuncts, semi_join_staging_row_)) { + join_conjunct_evals, num_join_conjuncts, semi_join_staging_row_)) { matched_null_probe_[i] = true; break; } @@ -1107,8 +1106,8 @@ void PartitionedHashJoinNode::AddToDebugString(int indent, stringstream* out) co *out << " hash_tbl="; *out << string(indent * 2, ' '); *out << "HashTbl(" - << " build_exprs=" << Expr::DebugString(build_expr_ctxs_) - << " probe_exprs=" << Expr::DebugString(probe_expr_ctxs_); + << " build_exprs=" << ScalarExpr::DebugString(build_exprs_) + << " probe_exprs=" << ScalarExpr::DebugString(probe_exprs_); *out << ")"; } @@ -1369,12 +1368,12 @@ Status PartitionedHashJoinNode::CodegenProcessProbeBatch( // Codegen evaluating other join conjuncts Function* eval_other_conjuncts_fn; - RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, other_join_conjunct_ctxs_, + RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, other_join_conjuncts_, &eval_other_conjuncts_fn, "EvalOtherConjuncts")); // Codegen evaluating conjuncts Function* eval_conjuncts_fn; - RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjunct_ctxs_, + RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjuncts_, &eval_conjuncts_fn)); // Replace all call sites with codegen version http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/partitioned-hash-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h index 6d4e7f4..d6f6f18 100644 --- a/be/src/exec/partitioned-hash-join-node.h +++ b/be/src/exec/partitioned-hash-join-node.h @@ -117,6 +117,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode { virtual void Close(RuntimeState* state); protected: + virtual Status QueryMaintenance(RuntimeState* state); virtual void AddToDebugString(int indentation_level, std::stringstream* out) const; virtual Status ProcessBuildInput(RuntimeState* state); @@ -182,9 +183,9 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// Using a separate variable is probably faster than calling /// 'out_batch_iterator->parent()->AtCapacity()' as it avoids unnecessary memory load. bool inline ProcessProbeRowInnerJoin( - ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts, - ExprContext* const* conjunct_ctxs, int num_conjuncts, - RowBatch::Iterator* out_batch_iterator, int* remaining_capacity); + ScalarExprEvaluator* const* other_join_conjunct_evals, + int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals, + int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity); /// Probes and updates the hash table for the current probe row for either /// RIGHT_SEMI_JOIN or RIGHT_ANTI_JOIN. For RIGHT_SEMI_JOIN, all matching build @@ -200,9 +201,9 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// 'out_batch_iterator->parent()->AtCapacity()' as it avoids unnecessary memory load. template<int const JoinOp> bool inline ProcessProbeRowRightSemiJoins( - ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts, - ExprContext* const* conjunct_ctxs, int num_conjuncts, - RowBatch::Iterator* out_batch_iterator, int* remaining_capacity); + ScalarExprEvaluator* const* other_join_conjunct_evals, + int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals, + int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity); /// Probes the hash table for the current probe row for LEFT_SEMI_JOIN, /// LEFT_ANTI_JOIN or NULL_AWARE_LEFT_ANTI_JOIN. The probe row will be appended @@ -217,9 +218,10 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// 'out_batch_iterator->parent()->AtCapacity()' as it avoids unnecessary memory load. template<int const JoinOp> bool inline ProcessProbeRowLeftSemiJoins( - ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts, - ExprContext* const* conjunct_ctxs, int num_conjuncts, - RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status); + ScalarExprEvaluator* const* other_join_conjunct_evals, + int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals, + int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, + Status* status); /// Probes the hash table for the current probe row for LEFT_OUTER_JOIN, /// RIGHT_OUTER_JOIN or FULL_OUTER_JOIN. The matching build and/or probe row @@ -235,18 +237,19 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// 'status' may be updated if appending to null aware BTS fails. template<int const JoinOp> bool inline ProcessProbeRowOuterJoins( - ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts, - ExprContext* const* conjunct_ctxs, int num_conjuncts, - RowBatch::Iterator* out_batch_iterator, int* remaining_capacity); + ScalarExprEvaluator* const* other_join_conjunct_evals, + int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals, + int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity); /// Probes 'current_probe_row_' against the the hash tables and append outputs /// to output batch. Wrapper around the join-type specific probe row functions /// declared above. template<int const JoinOp> bool inline ProcessProbeRow( - ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts, - ExprContext* const* conjunct_ctxs, int num_conjuncts, - RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status); + ScalarExprEvaluator* const* other_join_conjunct_evals, + int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals, + int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, + Status* status); /// Evaluates some number of rows in 'probe_batch_' against the probe expressions /// and hashes the results to 32-bit hash values. The evaluation results and the hash @@ -389,14 +392,17 @@ class PartitionedHashJoinNode : public BlockingJoinNode { RuntimeState* runtime_state_; /// Our equi-join predicates "<lhs> = <rhs>" are separated into - /// build_expr_ctxs_ (over child(1)) and probe_expr_ctxs_ (over child(0)) - std::vector<ExprContext*> build_expr_ctxs_; - std::vector<ExprContext*> probe_expr_ctxs_; + /// build_exprs_ (over child(1)) and probe_exprs_ (over child(0)) + std::vector<ScalarExpr*> build_exprs_; + std::vector<ScalarExpr*> probe_exprs_; /// Non-equi-join conjuncts from the ON clause. - std::vector<ExprContext*> other_join_conjunct_ctxs_; + std::vector<ScalarExpr*> other_join_conjuncts_; + std::vector<ScalarExprEvaluator*> other_join_conjunct_evals_; /// Used for hash-related functionality, such as evaluating rows and calculating hashes. + /// This owns the evaluators for the build and probe expressions used during insertion + /// and probing of the hash tables. boost::scoped_ptr<HashTableCtx> ht_ctx_; /// The iterator that corresponds to the look up of current_probe_row_. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/plan-root-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc index f513800..eeece0f 100644 --- a/be/src/exec/plan-root-sink.cc +++ b/be/src/exec/plan-root-sink.cc @@ -17,8 +17,8 @@ #include "exec/plan-root-sink.h" -#include "exprs/expr-context.h" -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "runtime/row-batch.h" #include "runtime/tuple-row.h" #include "service/query-result-set.h" @@ -34,24 +34,8 @@ namespace impala { const string PlanRootSink::NAME = "PLAN_ROOT_SINK"; -PlanRootSink::PlanRootSink(const RowDescriptor& row_desc, - const std::vector<TExpr>& output_exprs, const TDataSink& thrift_sink) - : DataSink(row_desc), thrift_output_exprs_(output_exprs) {} - -Status PlanRootSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) { - RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker)); - RETURN_IF_ERROR( - Expr::CreateExprTrees(state->obj_pool(), thrift_output_exprs_, &output_expr_ctxs_)); - RETURN_IF_ERROR( - Expr::Prepare(output_expr_ctxs_, state, row_desc_, expr_mem_tracker_.get())); - - return Status::OK(); -} - -Status PlanRootSink::Open(RuntimeState* state) { - RETURN_IF_ERROR(Expr::Open(output_expr_ctxs_, state)); - return Status::OK(); -} +PlanRootSink::PlanRootSink(const RowDescriptor& row_desc) + : DataSink(row_desc) { } namespace { @@ -100,7 +84,7 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) { DCHECK(results_ != nullptr); // List of expr values to hold evaluated rows from the query vector<void*> result_row; - result_row.resize(output_expr_ctxs_.size()); + result_row.resize(output_exprs_.size()); // List of scales for floating point values in result_row vector<int> scales; @@ -116,7 +100,7 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) { } // Signal the consumer. results_ = nullptr; - ExprContext::FreeLocalAllocations(output_expr_ctxs_); + ScalarExprEvaluator::FreeLocalAllocations(output_expr_evals_); consumer_cv_.notify_all(); } return Status::OK(); @@ -139,7 +123,6 @@ void PlanRootSink::Close(RuntimeState* state) { // Wait for consumer to be done, in case sender tries to tear-down this sink while the // sender is still reading from it. while (!consumer_done_) sender_cv_.wait(l); - Expr::Close(output_expr_ctxs_, state); DataSink::Close(state); } @@ -165,10 +148,10 @@ Status PlanRootSink::GetNext( void PlanRootSink::GetRowValue( TupleRow* row, vector<void*>* result, vector<int>* scales) { - DCHECK(result->size() >= output_expr_ctxs_.size()); - for (int i = 0; i < output_expr_ctxs_.size(); ++i) { - (*result)[i] = output_expr_ctxs_[i]->GetValue(row); - (*scales)[i] = output_expr_ctxs_[i]->root()->output_scale(); + DCHECK(result->size() >= output_expr_evals_.size()); + for (int i = 0; i < output_expr_evals_.size(); ++i) { + (*result)[i] = output_expr_evals_[i]->GetValue(row); + (*scales)[i] = output_expr_evals_[i]->output_scale(); } } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/plan-root-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h index a9c47c3b..c45d327 100644 --- a/be/src/exec/plan-root-sink.h +++ b/be/src/exec/plan-root-sink.h @@ -27,7 +27,7 @@ namespace impala { class TupleRow; class RowBatch; class QueryResultSet; -class ExprContext; +class ScalarExprEvaluator; /// Sink which manages the handoff between a 'sender' (a fragment instance) that produces /// batches by calling Send(), and a 'consumer' (e.g. the coordinator) which consumes rows @@ -57,15 +57,10 @@ class ExprContext; /// and consumer. See IMPALA-4268. class PlanRootSink : public DataSink { public: - PlanRootSink(const RowDescriptor& row_desc, const std::vector<TExpr>& output_exprs, - const TDataSink& thrift_sink); + PlanRootSink(const RowDescriptor& row_desc); virtual std::string GetName() { return NAME; } - virtual Status Prepare(RuntimeState* state, MemTracker* tracker); - - virtual Status Open(RuntimeState* state); - /// Sends a new batch. Ownership of 'batch' remains with the sender. Blocks until the /// consumer has consumed 'batch' by calling GetNext(). virtual Status Send(RuntimeState* state, RowBatch* batch); @@ -125,12 +120,8 @@ class PlanRootSink : public DataSink { /// Set to true in Send() and FlushFinal() when the Sink() has finished producing rows. bool eos_ = false; - /// Output expressions to map plan row batches onto result set rows. - std::vector<TExpr> thrift_output_exprs_; - std::vector<ExprContext*> output_expr_ctxs_; - - /// Writes a single row into 'result' and 'scales' by evaluating output_expr_ctxs_ over - /// 'row'. + /// Writes a single row into 'result' and 'scales' by evaluating + /// output_expr_evals_ over 'row'. void GetRowValue(TupleRow* row, std::vector<void*>* result, std::vector<int>* scales); }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/select-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/select-node.cc b/be/src/exec/select-node.cc index c8f8b1e..86aac1d 100644 --- a/be/src/exec/select-node.cc +++ b/be/src/exec/select-node.cc @@ -16,7 +16,8 @@ // under the License. #include "exec/select-node.h" -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" #include "runtime/raw-value.h" @@ -93,8 +94,9 @@ Status SelectNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) } bool SelectNode::CopyRows(RowBatch* output_batch) { - ExprContext** conjunct_ctxs = &conjunct_ctxs_[0]; - int num_conjunct_ctxs = conjunct_ctxs_.size(); + ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data(); + int num_conjuncts = conjuncts_.size(); + DCHECK_EQ(num_conjuncts, conjunct_evals_.size()); while (child_row_idx_ < child_row_batch_->num_rows()) { // Add a new row to output_batch @@ -104,7 +106,7 @@ bool SelectNode::CopyRows(RowBatch* output_batch) { // Make sure to increment row idx before returning. ++child_row_idx_; - if (EvalConjuncts(conjunct_ctxs, num_conjunct_ctxs, src_row)) { + if (EvalConjuncts(conjunct_evals, num_conjuncts, src_row)) { output_batch->CopyRow(src_row, dst_row); output_batch->CommitLastRow(); ++num_rows_returned_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/sort-exec-exprs.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/sort-exec-exprs.cc b/be/src/exec/sort-exec-exprs.cc deleted file mode 100644 index 3e3f60d..0000000 --- a/be/src/exec/sort-exec-exprs.cc +++ /dev/null @@ -1,81 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/sort-exec-exprs.h" - -#include "common/names.h" - -namespace impala { - -Status SortExecExprs::Init(const TSortInfo& sort_info, ObjectPool* pool) { - return Init(sort_info.ordering_exprs, - sort_info.__isset.sort_tuple_slot_exprs ? &sort_info.sort_tuple_slot_exprs : NULL, - pool); -} - -Status SortExecExprs::Init(const vector<TExpr>& ordering_exprs, - const vector<TExpr>* sort_tuple_slot_exprs, ObjectPool* pool) { - RETURN_IF_ERROR( - Expr::CreateExprTrees(pool, ordering_exprs, &lhs_ordering_expr_ctxs_)); - - if (sort_tuple_slot_exprs != NULL) { - materialize_tuple_ = true; - RETURN_IF_ERROR(Expr::CreateExprTrees(pool, *sort_tuple_slot_exprs, - &sort_tuple_slot_expr_ctxs_)); - } else { - materialize_tuple_ = false; - } - return Status::OK(); -} - -Status SortExecExprs::Init(const vector<ExprContext*>& lhs_ordering_expr_ctxs, - const vector<ExprContext*>& rhs_ordering_expr_ctxs) { - lhs_ordering_expr_ctxs_ = lhs_ordering_expr_ctxs; - rhs_ordering_expr_ctxs_ = rhs_ordering_expr_ctxs; - return Status::OK(); -} - -Status SortExecExprs::Prepare(RuntimeState* state, const RowDescriptor& child_row_desc, - const RowDescriptor& output_row_desc, MemTracker* expr_mem_tracker) { - if (materialize_tuple_) { - RETURN_IF_ERROR(Expr::Prepare( - sort_tuple_slot_expr_ctxs_, state, child_row_desc, expr_mem_tracker)); - } - RETURN_IF_ERROR(Expr::Prepare( - lhs_ordering_expr_ctxs_, state, output_row_desc, expr_mem_tracker)); - return Status::OK(); -} - -Status SortExecExprs::Open(RuntimeState* state) { - if (materialize_tuple_) { - RETURN_IF_ERROR(Expr::Open(sort_tuple_slot_expr_ctxs_, state)); - } - RETURN_IF_ERROR(Expr::Open(lhs_ordering_expr_ctxs_, state)); - RETURN_IF_ERROR(Expr::CloneIfNotExists( - lhs_ordering_expr_ctxs_, state, &rhs_ordering_expr_ctxs_)); - return Status::OK(); -} - -void SortExecExprs::Close(RuntimeState* state) { - if (materialize_tuple_) { - Expr::Close(sort_tuple_slot_expr_ctxs_, state); - } - Expr::Close(rhs_ordering_expr_ctxs_, state); - Expr::Close(lhs_ordering_expr_ctxs_, state); -} - -} //namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/sort-exec-exprs.h ---------------------------------------------------------------------- diff --git a/be/src/exec/sort-exec-exprs.h b/be/src/exec/sort-exec-exprs.h deleted file mode 100644 index 782aeeb..0000000 --- a/be/src/exec/sort-exec-exprs.h +++ /dev/null @@ -1,92 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_EXEC_SORT_EXEC_EXPRS_H -#define IMPALA_EXEC_SORT_EXEC_EXPRS_H - -#include "exprs/expr.h" -#include "runtime/runtime-state.h" - -namespace impala { - -/// Helper class to Prepare() , Open() and Close() the ordering expressions used to perform -/// comparisons in a sort. Used by TopNNode, SortNode, and MergingExchangeNode. When two -/// rows are compared, the ordering expressions are evaluated once for each side. -/// TopN and Sort materialize input rows into a single tuple before sorting. -/// If materialize_tuple_ is true, SortExecExprs also stores the slot expressions used to -/// materialize the sort tuples. -class SortExecExprs { - public: - /// Initialize the expressions from a TSortInfo using the specified pool. - Status Init(const TSortInfo& sort_info, ObjectPool* pool); - - /// Initialize the ordering and (optionally) materialization expressions from the thrift - /// TExprs into the specified pool. sort_tuple_slot_exprs is NULL if the tuple is not - /// materialized. - Status Init(const std::vector<TExpr>& ordering_exprs, - const std::vector<TExpr>* sort_tuple_slot_exprs, ObjectPool* pool); - - /// Prepare all expressions used for sorting and tuple materialization. - Status Prepare(RuntimeState* state, const RowDescriptor& child_row_desc, - const RowDescriptor& output_row_desc, MemTracker* expr_mem_tracker); - - /// Open all expressions used for sorting and tuple materialization. - Status Open(RuntimeState* state); - - /// Close all expressions used for sorting and tuple materialization. - void Close(RuntimeState* state); - - const std::vector<ExprContext*>& sort_tuple_slot_expr_ctxs() const { - return sort_tuple_slot_expr_ctxs_; - } - - /// Populated in Prepare() (empty before then) - const std::vector<ExprContext*>& lhs_ordering_expr_ctxs() const { - return lhs_ordering_expr_ctxs_; - } - /// Populated in Open() (empty before then) - const std::vector<ExprContext*>& rhs_ordering_expr_ctxs() const { - return rhs_ordering_expr_ctxs_; - } - - private: - // Give access to testing Init() - friend class DataStreamTest; - - /// Create two ExprContexts for evaluating over the TupleRows. - std::vector<ExprContext*> lhs_ordering_expr_ctxs_; - std::vector<ExprContext*> rhs_ordering_expr_ctxs_; - - /// If true, the tuples to be sorted are materialized by - /// sort_tuple_slot_exprs_ before the actual sort is performed. - bool materialize_tuple_; - - /// Expressions used to materialize slots in the tuples to be sorted. - /// One expr per slot in the materialized tuple. Valid only if - /// materialize_tuple_ is true. - std::vector<ExprContext*> sort_tuple_slot_expr_ctxs_; - - /// Initialize directly from already-created ExprContexts. Callers should manually call - /// Prepare(), Open(), and Close() on input ExprContexts (instead of calling the - /// analogous functions in this class). Used for testing. - Status Init(const std::vector<ExprContext*>& lhs_ordering_expr_ctxs, - const std::vector<ExprContext*>& rhs_ordering_expr_ctxs); -}; - -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/sort-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc index 9660ed3..6d93130 100644 --- a/be/src/exec/sort-node.cc +++ b/be/src/exec/sort-node.cc @@ -16,7 +16,6 @@ // under the License. #include "exec/sort-node.h" -#include "exec/sort-exec-exprs.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" #include "runtime/sorted-run-merger.h" @@ -37,8 +36,13 @@ SortNode::~SortNode() { } Status SortNode::Init(const TPlanNode& tnode, RuntimeState* state) { + const TSortInfo& tsort_info = tnode.sort_node.sort_info; RETURN_IF_ERROR(ExecNode::Init(tnode, state)); - RETURN_IF_ERROR(sort_exec_exprs_.Init(tnode.sort_node.sort_info, pool_)); + RETURN_IF_ERROR(ScalarExpr::Create(tsort_info.ordering_exprs, row_descriptor_, + state, &ordering_exprs_)); + DCHECK(tsort_info.__isset.sort_tuple_slot_exprs); + RETURN_IF_ERROR(ScalarExpr::Create(tsort_info.sort_tuple_slot_exprs, + child(0)->row_desc(), state, &sort_tuple_exprs_)); is_asc_order_ = tnode.sort_node.sort_info.is_asc_order; nulls_first_ = tnode.sort_node.sort_info.nulls_first; return Status::OK(); @@ -47,14 +51,10 @@ Status SortNode::Init(const TPlanNode& tnode, RuntimeState* state) { Status SortNode::Prepare(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(ExecNode::Prepare(state)); - RETURN_IF_ERROR(sort_exec_exprs_.Prepare( - state, child(0)->row_desc(), row_descriptor_, expr_mem_tracker())); - AddExprCtxsToFree(sort_exec_exprs_); - less_than_.reset(new TupleRowComparator(sort_exec_exprs_, is_asc_order_, nulls_first_)); - sorter_.reset( - new Sorter(*less_than_.get(), sort_exec_exprs_.sort_tuple_slot_expr_ctxs(), - &row_descriptor_, mem_tracker(), runtime_profile(), state)); - RETURN_IF_ERROR(sorter_->Prepare()); + less_than_.reset(new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_)); + sorter_.reset(new Sorter(*less_than_, sort_tuple_exprs_, + &row_descriptor_, mem_tracker(), runtime_profile(), state)); + RETURN_IF_ERROR(sorter_->Prepare(pool_, expr_mem_pool())); AddCodegenDisabledMessage(state); return Status::OK(); } @@ -63,7 +63,6 @@ void SortNode::Codegen(RuntimeState* state) { DCHECK(state->ShouldCodegen()); ExecNode::Codegen(state); if (IsNodeCodegenDisabled()) return; - Status codegen_status = less_than_->Codegen(state); runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status); } @@ -71,11 +70,11 @@ void SortNode::Codegen(RuntimeState* state) { Status SortNode::Open(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(ExecNode::Open(state)); - RETURN_IF_ERROR(sort_exec_exprs_.Open(state)); + RETURN_IF_ERROR(less_than_->Open(pool_, state, expr_mem_pool())); + RETURN_IF_ERROR(sorter_->Open()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(QueryMaintenance(state)); RETURN_IF_ERROR(child(0)->Open(state)); - RETURN_IF_ERROR(sorter_->Open()); // The child has been opened and the sorter created. Sort the input. // The final merge is done on-demand as rows are requested in GetNext(). @@ -135,16 +134,22 @@ Status SortNode::Reset(RuntimeState* state) { void SortNode::Close(RuntimeState* state) { if (is_closed()) return; - sort_exec_exprs_.Close(state); - if (sorter_ != NULL) sorter_->Close(); + if (less_than_.get() != nullptr) less_than_->Close(state); + if (sorter_ != nullptr) sorter_->Close(state); sorter_.reset(); + ScalarExpr::Close(ordering_exprs_); + ScalarExpr::Close(sort_tuple_exprs_); ExecNode::Close(state); } +Status SortNode::QueryMaintenance(RuntimeState* state) { + sorter_->FreeLocalAllocations(); + return ExecNode::QueryMaintenance(state); +} + void SortNode::DebugString(int indentation_level, stringstream* out) const { *out << string(indentation_level * 2, ' '); - *out << "SortNode(" - << Expr::DebugString(sort_exec_exprs_.lhs_ordering_expr_ctxs()); + *out << "SortNode(" << ScalarExpr::DebugString(ordering_exprs_); for (int i = 0; i < is_asc_order_.size(); ++i) { *out << (i > 0 ? " " : "") << (is_asc_order_[i] ? "asc" : "desc") http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/sort-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h index 75513ba..cbe5b68 100644 --- a/be/src/exec/sort-node.h +++ b/be/src/exec/sort-node.h @@ -19,7 +19,6 @@ #define IMPALA_EXEC_SORT_NODE_H #include "exec/exec-node.h" -#include "exec/sort-exec-exprs.h" #include "runtime/sorter.h" #include "runtime/buffered-block-mgr.h" @@ -29,7 +28,7 @@ namespace impala { /// to disk if the input is larger than available memory. /// Uses Sorter and BufferedBlockMgr for the external sort implementation. /// Input rows to SortNode are materialized by the Sorter into a single tuple -/// using the expressions specified in sort_exec_exprs_. +/// using the expressions specified in sort_tuple_exprs_. /// In GetNext(), SortNode passes in the output batch to the sorter instance created /// in Open() to fill it with sorted rows. /// If a merge phase was performed in the sort, sorted rows are deep copied into @@ -48,6 +47,7 @@ class SortNode : public ExecNode { virtual void Close(RuntimeState* state); protected: + virtual Status QueryMaintenance(RuntimeState* state); virtual void DebugString(int indentation_level, std::stringstream* out) const; private: @@ -61,7 +61,12 @@ class SortNode : public ExecNode { boost::scoped_ptr<TupleRowComparator> less_than_; /// Expressions and parameters used for tuple materialization and tuple comparison. - SortExecExprs sort_exec_exprs_; + std::vector<ScalarExpr*> ordering_exprs_; + + /// Expressions used to materialize slots in the tuples to be sorted. + /// One expr per slot in the materialized tuple. + std::vector<ScalarExpr*> sort_tuple_exprs_; + std::vector<bool> is_asc_order_; std::vector<bool> nulls_first_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/subplan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/subplan-node.cc b/be/src/exec/subplan-node.cc index 4c80125..98ef13a 100644 --- a/be/src/exec/subplan-node.cc +++ b/be/src/exec/subplan-node.cc @@ -39,22 +39,28 @@ SubplanNode::SubplanNode(ObjectPool* pool, const TPlanNode& tnode, Status SubplanNode::Init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::Init(tnode, state)); DCHECK_EQ(children_.size(), 2); - SetContainingSubplan(this, child(1)); + RETURN_IF_ERROR(SetContainingSubplan(state, this, child(1))); return Status::OK(); } -void SubplanNode::SetContainingSubplan(SubplanNode* ancestor, ExecNode* node) { +Status SubplanNode::SetContainingSubplan( + RuntimeState* state, SubplanNode* ancestor, ExecNode* node) { node->set_containing_subplan(ancestor); if (node->type() == TPlanNodeType::SUBPLAN_NODE) { // Only traverse the first child and not the second one, because the Subplan // parent of nodes inside it should be 'node' and not 'ancestor'. - SetContainingSubplan(ancestor, node->child(0)); + RETURN_IF_ERROR(SetContainingSubplan(state, ancestor, node->child(0))); } else { + if (node->type() == TPlanNodeType::UNNEST_NODE) { + UnnestNode* unnest_node = reinterpret_cast<UnnestNode*>(node); + RETURN_IF_ERROR(unnest_node->InitCollExpr(state)); + } int num_children = node->num_children(); for (int i = 0; i < num_children; ++i) { - SetContainingSubplan(ancestor, node->child(i)); + RETURN_IF_ERROR(SetContainingSubplan(state, ancestor, node->child(i))); } } + return Status::OK(); } Status SubplanNode::Prepare(RuntimeState* state) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/subplan-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/subplan-node.h b/be/src/exec/subplan-node.h index 9aa4f2e..bf13ca1 100644 --- a/be/src/exec/subplan-node.h +++ b/be/src/exec/subplan-node.h @@ -19,7 +19,7 @@ #define IMPALA_EXEC_SUBPLAN_NODE_H_ #include "exec/exec-node.h" -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" namespace impala { @@ -62,9 +62,10 @@ class SubplanNode : public ExecNode { friend class UnnestNode; /// Sets 'ancestor' as the containing Subplan in all exec nodes inside the exec-node - /// tree rooted at 'node'. - /// Does not traverse the second child of SubplanNodes within 'node' - void SetContainingSubplan(SubplanNode* ancestor, ExecNode* node); + /// tree rooted at 'node' and does any initialization that is required as a result of + /// setting the subplan. Doesn't traverse the second child of SubplanNodes within + /// 'node'. + Status SetContainingSubplan(RuntimeState* state, SubplanNode* ancestor, ExecNode* node); /// Returns the current row from child(0) or NULL if no rows from child(0) have been /// retrieved yet (GetNext() has not yet been called). This function is called by http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/topn-node-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/topn-node-ir.cc b/be/src/exec/topn-node-ir.cc index f0eaeaa..0acd724 100644 --- a/be/src/exec/topn-node-ir.cc +++ b/be/src/exec/topn-node-ir.cc @@ -31,18 +31,18 @@ void TopNNode::InsertTupleRow(TupleRow* input_row) { if (priority_queue_->size() < limit_ + offset_) { insert_tuple = reinterpret_cast<Tuple*>( - tuple_pool_->Allocate(materialized_tuple_desc_->byte_size())); - insert_tuple->MaterializeExprs<false, false>(input_row, *materialized_tuple_desc_, - sort_exec_exprs_.sort_tuple_slot_expr_ctxs(), tuple_pool_.get()); + tuple_pool_->Allocate(output_tuple_desc_->byte_size())); + insert_tuple->MaterializeExprs<false, false>(input_row, *output_tuple_desc_, + output_tuple_expr_evals_, tuple_pool_.get()); } else { DCHECK(!priority_queue_->empty()); Tuple* top_tuple = priority_queue_->top(); - tmp_tuple_->MaterializeExprs<false, true>(input_row, *materialized_tuple_desc_, - sort_exec_exprs_.sort_tuple_slot_expr_ctxs(), nullptr); + tmp_tuple_->MaterializeExprs<false, true>(input_row, *output_tuple_desc_, + output_tuple_expr_evals_, nullptr); if (tuple_row_less_than_->Less(tmp_tuple_, top_tuple)) { // TODO: DeepCopy() will allocate new buffers for the string data. This needs // to be fixed to use a freelist - tmp_tuple_->DeepCopy(top_tuple, *materialized_tuple_desc_, tuple_pool_.get()); + tmp_tuple_->DeepCopy(top_tuple, *output_tuple_desc_, tuple_pool_.get()); insert_tuple = top_tuple; priority_queue_->pop(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/topn-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc index 54bfe8f..73a8108 100644 --- a/be/src/exec/topn-node.cc +++ b/be/src/exec/topn-node.cc @@ -20,7 +20,7 @@ #include <sstream> #include "codegen/llvm-codegen.h" -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" #include "runtime/descriptors.h" #include "runtime/mem-pool.h" #include "runtime/row-batch.h" @@ -42,7 +42,7 @@ using namespace llvm; TopNNode::TopNNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), offset_(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0), - materialized_tuple_desc_(NULL), + output_tuple_desc_(row_descriptor_.tuple_descriptors()[0]), tuple_row_less_than_(NULL), tmp_tuple_(NULL), tuple_pool_(NULL), @@ -52,31 +52,34 @@ TopNNode::TopNNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl } Status TopNNode::Init(const TPlanNode& tnode, RuntimeState* state) { + const TSortInfo& tsort_info = tnode.sort_node.sort_info; RETURN_IF_ERROR(ExecNode::Init(tnode, state)); - RETURN_IF_ERROR(sort_exec_exprs_.Init(tnode.sort_node.sort_info, pool_)); + RETURN_IF_ERROR(ScalarExpr::Create(tsort_info.ordering_exprs, row_descriptor_, + state, &ordering_exprs_)); + DCHECK(tsort_info.__isset.sort_tuple_slot_exprs); + RETURN_IF_ERROR(ScalarExpr::Create(tsort_info.sort_tuple_slot_exprs, + child(0)->row_desc(), state, &output_tuple_exprs_)); is_asc_order_ = tnode.sort_node.sort_info.is_asc_order; nulls_first_ = tnode.sort_node.sort_info.nulls_first; - - DCHECK_EQ(conjunct_ctxs_.size(), 0) + DCHECK_EQ(conjuncts_.size(), 0) << "TopNNode should never have predicates to evaluate."; - return Status::OK(); } Status TopNNode::Prepare(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); + DCHECK(output_tuple_desc_ != nullptr); RETURN_IF_ERROR(ExecNode::Prepare(state)); tuple_pool_.reset(new MemPool(mem_tracker())); - materialized_tuple_desc_ = row_descriptor_.tuple_descriptors()[0]; - RETURN_IF_ERROR(sort_exec_exprs_.Prepare( - state, child(0)->row_desc(), row_descriptor_, expr_mem_tracker())); - AddExprCtxsToFree(sort_exec_exprs_); + RETURN_IF_ERROR(ScalarExprEvaluator::Create(output_tuple_exprs_, state, pool_, + expr_mem_pool(), &output_tuple_expr_evals_)); + AddEvaluatorsToFree(output_tuple_expr_evals_); tuple_row_less_than_.reset( - new TupleRowComparator(sort_exec_exprs_, is_asc_order_, nulls_first_)); + new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_)); priority_queue_.reset( new priority_queue<Tuple*, vector<Tuple*>, ComparatorWrapper<TupleRowComparator>>( *tuple_row_less_than_)); - materialized_tuple_desc_ = row_descriptor_.tuple_descriptors()[0]; + output_tuple_desc_ = row_descriptor_.tuple_descriptors()[0]; insert_batch_timer_ = ADD_TIMER(runtime_profile(), "InsertBatchTime"); AddCodegenDisabledMessage(state); return Status::OK(); @@ -99,17 +102,17 @@ void TopNNode::Codegen(RuntimeState* state) { // Generate two MaterializeExprs() functions, one using tuple_pool_ and // one with no pool. - DCHECK(materialized_tuple_desc_ != NULL); + DCHECK(output_tuple_desc_ != NULL); Function* materialize_exprs_tuple_pool_fn; Function* materialize_exprs_no_pool_fn; codegen_status = Tuple::CodegenMaterializeExprs(codegen, false, - *materialized_tuple_desc_, sort_exec_exprs_.sort_tuple_slot_expr_ctxs(), + *output_tuple_desc_, output_tuple_exprs_, true, &materialize_exprs_tuple_pool_fn); if (codegen_status.ok()) { codegen_status = Tuple::CodegenMaterializeExprs(codegen, false, - *materialized_tuple_desc_, sort_exec_exprs_.sort_tuple_slot_expr_ctxs(), + *output_tuple_desc_, output_tuple_exprs_, false, &materialize_exprs_no_pool_fn); if (codegen_status.ok()) { @@ -134,13 +137,14 @@ void TopNNode::Codegen(RuntimeState* state) { Status TopNNode::Open(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(ExecNode::Open(state)); + RETURN_IF_ERROR(tuple_row_less_than_->Open(pool_, state, expr_mem_pool())); + RETURN_IF_ERROR(ScalarExprEvaluator::Open(output_tuple_expr_evals_, state)); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(QueryMaintenance(state)); - RETURN_IF_ERROR(sort_exec_exprs_.Open(state)); // Allocate memory for a temporary tuple. tmp_tuple_ = reinterpret_cast<Tuple*>( - tuple_pool_->Allocate(materialized_tuple_desc_->byte_size())); + tuple_pool_->Allocate(output_tuple_desc_->byte_size())); RETURN_IF_ERROR(child(0)->Open(state)); @@ -213,11 +217,19 @@ Status TopNNode::Reset(RuntimeState* state) { void TopNNode::Close(RuntimeState* state) { if (is_closed()) return; - if (tuple_pool_.get() != NULL) tuple_pool_->FreeAll(); - sort_exec_exprs_.Close(state); + if (tuple_pool_.get() != nullptr) tuple_pool_->FreeAll(); + if (tuple_row_less_than_.get() != nullptr) tuple_row_less_than_->Close(state); + ScalarExprEvaluator::Close(output_tuple_expr_evals_, state); + ScalarExpr::Close(ordering_exprs_); + ScalarExpr::Close(output_tuple_exprs_); ExecNode::Close(state); } +Status TopNNode::QueryMaintenance(RuntimeState* state) { + tuple_row_less_than_->FreeLocalAllocations(); + return ExecNode::QueryMaintenance(state); +} + // Reverse the order of the tuples in the priority queue void TopNNode::PrepareForOutput() { sorted_top_n_.resize(priority_queue_->size()); @@ -236,7 +248,7 @@ void TopNNode::PrepareForOutput() { void TopNNode::DebugString(int indentation_level, stringstream* out) const { *out << string(indentation_level * 2, ' '); *out << "TopNNode(" - << Expr::DebugString(sort_exec_exprs_.lhs_ordering_expr_ctxs()); + << ScalarExpr::DebugString(ordering_exprs_); for (int i = 0; i < is_asc_order_.size(); ++i) { *out << (i > 0 ? " " : "") << (is_asc_order_[i] ? "asc" : "desc") http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/topn-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/topn-node.h b/be/src/exec/topn-node.h index 5bd7ded..d7daacb 100644 --- a/be/src/exec/topn-node.h +++ b/be/src/exec/topn-node.h @@ -24,7 +24,6 @@ #include "codegen/impala-ir.h" #include "exec/exec-node.h" -#include "exec/sort-exec-exprs.h" #include "runtime/descriptors.h" // for TupleId #include "util/tuple-row-compare.h" @@ -52,6 +51,7 @@ class TopNNode : public ExecNode { virtual void Close(RuntimeState* state); protected: + virtual Status QueryMaintenance(RuntimeState* state); virtual void DebugString(int indentation_level, std::stringstream* out) const; private: @@ -71,14 +71,18 @@ class TopNNode : public ExecNode { /// Number of rows to skip. int64_t offset_; - /// sort_exec_exprs_ contains the ordering expressions used for tuple comparison and - /// the materialization exprs for the output tuple. - SortExecExprs sort_exec_exprs_; + /// Ordering expressions used for tuple comparison. + std::vector<ScalarExpr*> ordering_exprs_; + + /// Materialization exprs for the output tuple and their evaluators. + std::vector<ScalarExpr*> output_tuple_exprs_; + std::vector<ScalarExprEvaluator*> output_tuple_expr_evals_; + std::vector<bool> is_asc_order_; std::vector<bool> nulls_first_; /// Cached descriptor for the materialized tuple. Assigned in Prepare(). - TupleDescriptor* materialized_tuple_desc_; + TupleDescriptor* output_tuple_desc_; /// Comparator for priority_queue_. boost::scoped_ptr<TupleRowComparator> tuple_row_less_than_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/union-node-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/union-node-ir.cc b/be/src/exec/union-node-ir.cc index 3d19f07..21bc205 100644 --- a/be/src/exec/union-node-ir.cc +++ b/be/src/exec/union-node-ir.cc @@ -20,12 +20,13 @@ using namespace impala; -void IR_ALWAYS_INLINE UnionNode::MaterializeExprs(const std::vector<ExprContext*>& exprs, - TupleRow* row, uint8_t* tuple_buf, RowBatch* dst_batch) { +void IR_ALWAYS_INLINE UnionNode::MaterializeExprs( + const vector<ScalarExprEvaluator*>& evals, TupleRow* row, uint8_t* tuple_buf, + RowBatch* dst_batch) { DCHECK(!dst_batch->AtCapacity()); Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf); TupleRow* dst_row = dst_batch->GetRow(dst_batch->AddRow()); - dst_tuple->MaterializeExprs<false, false>(row, *tuple_desc_, exprs, + dst_tuple->MaterializeExprs<false, false>(row, *tuple_desc_, evals, dst_batch->tuple_data_pool()); dst_row->SetTuple(0, dst_tuple); dst_batch->CommitLastRow(); @@ -37,13 +38,14 @@ void UnionNode::MaterializeBatch(RowBatch* dst_batch, uint8_t** tuple_buf) { RowBatch* child_batch = child_batch_.get(); int tuple_byte_size = tuple_desc_->byte_size(); uint8_t* cur_tuple = *tuple_buf; - const std::vector<ExprContext*>& child_exprs = child_expr_lists_[child_idx_]; + const std::vector<ScalarExprEvaluator*>& child_expr_evals = + child_expr_evals_lists_[child_idx_]; int num_rows_to_process = std::min(child_batch->num_rows() - child_row_idx_, dst_batch->capacity() - dst_batch->num_rows()); FOREACH_ROW_LIMIT(child_batch, child_row_idx_, num_rows_to_process, batch_iter) { TupleRow* child_row = batch_iter.Get(); - MaterializeExprs(child_exprs, child_row, cur_tuple, dst_batch); + MaterializeExprs(child_expr_evals, child_row, cur_tuple, dst_batch); cur_tuple += tuple_byte_size; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/union-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc index e1912c9..d8707da 100644 --- a/be/src/exec/union-node.cc +++ b/be/src/exec/union-node.cc @@ -17,8 +17,8 @@ #include "codegen/llvm-codegen.h" #include "exec/union-node.h" -#include "exprs/expr.h" -#include "exprs/expr-context.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" #include "runtime/tuple.h" @@ -35,32 +35,37 @@ UnionNode::UnionNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), tuple_id_(tnode.union_node.tuple_id), - tuple_desc_(nullptr), + tuple_desc_(descs.GetTupleDescriptor(tuple_id_)), first_materialized_child_idx_(tnode.union_node.first_materialized_child_idx), child_idx_(0), child_batch_(nullptr), child_row_idx_(0), child_eos_(false), - const_expr_list_idx_(0), + const_exprs_lists_idx_(0), to_close_child_idx_(-1) { } Status UnionNode::Init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::Init(tnode, state)); DCHECK(tnode.__isset.union_node); - DCHECK_EQ(conjunct_ctxs_.size(), 0); - // Create const_expr_ctx_lists_ from thrift exprs. + DCHECK_EQ(conjuncts_.size(), 0); + DCHECK(tuple_desc_ != nullptr); + // Create const_exprs_lists_ from thrift exprs. const vector<vector<TExpr>>& const_texpr_lists = tnode.union_node.const_expr_lists; for (const vector<TExpr>& texprs : const_texpr_lists) { - vector<ExprContext*> ctxs; - RETURN_IF_ERROR(Expr::CreateExprTrees(pool_, texprs, &ctxs)); - const_expr_lists_.push_back(ctxs); + vector<ScalarExpr*> const_exprs; + RETURN_IF_ERROR(ScalarExpr::Create(texprs, row_desc(), state, &const_exprs)); + DCHECK_EQ(const_exprs.size(), tuple_desc_->slots().size()); + const_exprs_lists_.push_back(const_exprs); } - // Create result_expr_ctx_lists_ from thrift exprs. - const vector<vector<TExpr>>& result_texpr_lists = tnode.union_node.result_expr_lists; - for (const vector<TExpr>& texprs : result_texpr_lists) { - vector<ExprContext*> ctxs; - RETURN_IF_ERROR(Expr::CreateExprTrees(pool_, texprs, &ctxs)); - child_expr_lists_.push_back(ctxs); + // Create child_exprs_lists_ from thrift exprs. + const vector<vector<TExpr>>& thrift_result_exprs = tnode.union_node.result_expr_lists; + for (int i = 0; i < thrift_result_exprs.size(); ++i) { + const vector<TExpr>& texprs = thrift_result_exprs[i]; + vector<ScalarExpr*> child_exprs; + RETURN_IF_ERROR( + ScalarExpr::Create(texprs, child(i)->row_desc(), state, &child_exprs)); + child_exprs_lists_.push_back(child_exprs); + DCHECK_EQ(child_exprs.size(), tuple_desc_->slots().size()); } return Status::OK(); } @@ -68,23 +73,25 @@ Status UnionNode::Init(const TPlanNode& tnode, RuntimeState* state) { Status UnionNode::Prepare(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(ExecNode::Prepare(state)); - tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_); DCHECK(tuple_desc_ != nullptr); - codegend_union_materialize_batch_fns_.resize(child_expr_lists_.size()); + codegend_union_materialize_batch_fns_.resize(child_exprs_lists_.size()); // Prepare const expr lists. - for (const vector<ExprContext*>& exprs : const_expr_lists_) { - RETURN_IF_ERROR(Expr::Prepare(exprs, state, row_desc(), expr_mem_tracker())); - AddExprCtxsToFree(exprs); - DCHECK_EQ(exprs.size(), tuple_desc_->slots().size()); + for (const vector<ScalarExpr*>& const_exprs : const_exprs_lists_) { + vector<ScalarExprEvaluator*> const_expr_evals; + RETURN_IF_ERROR(ScalarExprEvaluator::Create(const_exprs, state, pool_, + expr_mem_pool(), &const_expr_evals)); + AddEvaluatorsToFree(const_expr_evals); + const_expr_evals_lists_.push_back(const_expr_evals); } // Prepare result expr lists. - for (int i = 0; i < child_expr_lists_.size(); ++i) { - RETURN_IF_ERROR(Expr::Prepare( - child_expr_lists_[i], state, child(i)->row_desc(), expr_mem_tracker())); - AddExprCtxsToFree(child_expr_lists_[i]); - DCHECK_EQ(child_expr_lists_[i].size(), tuple_desc_->slots().size()); + for (const vector<ScalarExpr*>& child_exprs : child_exprs_lists_) { + vector<ScalarExprEvaluator*> child_expr_evals; + RETURN_IF_ERROR(ScalarExprEvaluator::Create(child_exprs, state, pool_, + expr_mem_pool(), &child_expr_evals)); + AddEvaluatorsToFree(child_expr_evals); + child_expr_evals_lists_.push_back(child_expr_evals); } return Status::OK(); } @@ -98,12 +105,12 @@ void UnionNode::Codegen(RuntimeState* state) { DCHECK(codegen != nullptr); std::stringstream codegen_message; Status codegen_status; - for (int i = 0; i < child_expr_lists_.size(); ++i) { + for (int i = 0; i < child_exprs_lists_.size(); ++i) { if (IsChildPassthrough(i)) continue; llvm::Function* tuple_materialize_exprs_fn; codegen_status = Tuple::CodegenMaterializeExprs(codegen, false, *tuple_desc_, - child_expr_lists_[i], true, &tuple_materialize_exprs_fn); + child_exprs_lists_[i], true, &tuple_materialize_exprs_fn); if (!codegen_status.ok()) { // Codegen may fail in some corner cases (e.g. we don't handle TYPE_CHAR). If this // happens, abort codegen for this and the remaining children. @@ -137,12 +144,12 @@ Status UnionNode::Open(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(ExecNode::Open(state)); // Open const expr lists. - for (const vector<ExprContext*>& exprs : const_expr_lists_) { - RETURN_IF_ERROR(Expr::Open(exprs, state)); + for (const vector<ScalarExprEvaluator*>& evals : const_expr_evals_lists_) { + RETURN_IF_ERROR(ScalarExprEvaluator::Open(evals, state)); } // Open result expr lists. - for (const vector<ExprContext*>& exprs : child_expr_lists_) { - RETURN_IF_ERROR(Expr::Open(exprs, state)); + for (const vector<ScalarExprEvaluator*>& evals : child_expr_evals_lists_) { + RETURN_IF_ERROR(ScalarExprEvaluator::Open(evals, state)); } // Ensures that rows are available for clients to fetch after this Open() has @@ -247,7 +254,7 @@ Status UnionNode::GetNextMaterialized(RuntimeState* state, RowBatch* row_batch) Status UnionNode::GetNextConst(RuntimeState* state, RowBatch* row_batch) { DCHECK(state->instance_ctx().per_fragment_instance_idx == 0 || IsInSubplan()); - DCHECK_LT(const_expr_list_idx_, const_expr_lists_.size()); + DCHECK_LT(const_exprs_lists_idx_, const_expr_evals_lists_.size()); // Create new tuple buffer for row_batch. int64_t tuple_buf_size; uint8_t* tuple_buf; @@ -255,11 +262,11 @@ Status UnionNode::GetNextConst(RuntimeState* state, RowBatch* row_batch) { row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buf_size, &tuple_buf)); memset(tuple_buf, 0, tuple_buf_size); - while (const_expr_list_idx_ < const_expr_lists_.size() && !row_batch->AtCapacity()) { + while (const_exprs_lists_idx_ < const_exprs_lists_.size() && !row_batch->AtCapacity()) { MaterializeExprs( - const_expr_lists_[const_expr_list_idx_], nullptr, tuple_buf, row_batch); + const_expr_evals_lists_[const_exprs_lists_idx_], nullptr, tuple_buf, row_batch); tuple_buf += tuple_desc_->byte_size(); - ++const_expr_list_idx_; + ++const_exprs_lists_idx_; } return Status::OK(); @@ -314,7 +321,7 @@ Status UnionNode::Reset(RuntimeState* state) { child_batch_.reset(); child_row_idx_ = 0; child_eos_ = false; - const_expr_list_idx_ = 0; + const_exprs_lists_idx_ = 0; // Since passthrough is disabled in subplans, verify that there is no passthrough child // that needs to be closed. DCHECK_EQ(to_close_child_idx_, -1); @@ -324,11 +331,17 @@ Status UnionNode::Reset(RuntimeState* state) { void UnionNode::Close(RuntimeState* state) { if (is_closed()) return; child_batch_.reset(); - for (const vector<ExprContext*>& exprs : const_expr_lists_) { - Expr::Close(exprs, state); + for (const vector<ScalarExprEvaluator*>& evals : const_expr_evals_lists_) { + ScalarExprEvaluator::Close(evals, state); + } + for (const vector<ScalarExprEvaluator*>& evals : child_expr_evals_lists_) { + ScalarExprEvaluator::Close(evals, state); + } + for (const vector<ScalarExpr*>& const_exprs : const_exprs_lists_) { + ScalarExpr::Close(const_exprs); } - for (const vector<ExprContext*>& exprs : child_expr_lists_) { - Expr::Close(exprs, state); + for (const vector<ScalarExpr*>& child_exprs : child_exprs_lists_) { + ScalarExpr::Close(child_exprs); } ExecNode::Close(state); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/union-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/union-node.h b/be/src/exec/union-node.h index 79fdfba..9c83276 100644 --- a/be/src/exec/union-node.h +++ b/be/src/exec/union-node.h @@ -29,7 +29,9 @@ namespace impala { class DescriptorTbl; -class ExprContext; +class RuntimeState; +class ScalarExpr; +class ScalarExprEvaluator; class Tuple; class TupleRow; class TPlanNode; @@ -67,10 +69,12 @@ class UnionNode : public ExecNode { /// Const exprs materialized by this node. These exprs don't refer to any children. /// Only materialized by the first fragment instance to avoid duplication. - std::vector<std::vector<ExprContext*>> const_expr_lists_; + std::vector<std::vector<ScalarExpr*>> const_exprs_lists_; + std::vector<std::vector<ScalarExprEvaluator*>> const_expr_evals_lists_; /// Exprs materialized by this node. The i-th result expr list refers to the i-th child. - std::vector<std::vector<ExprContext*>> child_expr_lists_; + std::vector<std::vector<ScalarExpr*>> child_exprs_lists_; + std::vector<std::vector<ScalarExprEvaluator*>> child_expr_evals_lists_; ///////////////////////////////////////// /// BEGIN: Members that must be Reset() @@ -96,7 +100,7 @@ class UnionNode : public ExecNode { bool child_eos_; /// Index of current const result expr list. - int const_expr_list_idx_; + int const_exprs_lists_idx_; /// Index of the child that needs to be closed on the next GetNext() call. Should be set /// to -1 if no child needs to be closed. @@ -126,7 +130,7 @@ class UnionNode : public ExecNode { /// Evaluates 'exprs' over 'row', materializes the results in 'tuple_buf'. /// and appends the new tuple to 'dst_batch'. Increments 'num_rows_returned_'. - void MaterializeExprs(const std::vector<ExprContext*>& exprs, + void MaterializeExprs(const std::vector<ScalarExprEvaluator*>& evaluators, TupleRow* row, uint8_t* tuple_buf, RowBatch* dst_batch); /// Returns true if the child at 'child_idx' can be passed through. @@ -150,7 +154,7 @@ class UnionNode : public ExecNode { /// Returns true if there are still rows to be returned from constant expressions. bool HasMoreConst(const RuntimeState* state) const { return (state->instance_ctx().per_fragment_instance_idx == 0 || IsInSubplan()) && - const_expr_list_idx_ < const_expr_lists_.size(); + const_exprs_lists_idx_ < const_exprs_lists_.size(); } };
