IMPALA-3586: Clean up union-node.h/cc to enable improvements. This patch does not address IMPALA-3586, but it cleans up the code in union-node.h/cc to make it easier to implement those perf improvements.
The major simplification is to remove conjunct evaluation since the planner does not assigns conjuncts to a union-node anymore. Conjuncts are always pushed to the union operands. Change-Id: Ia5fc23985e8d51acb8a6920717ce4e2f0254fe70 Reviewed-on: http://gerrit.cloudera.org:8080/4817 Reviewed-by: Alex Behm <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/e9a4077b Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/e9a4077b Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/e9a4077b Branch: refs/heads/master Commit: e9a4077b3589ee0a6debadb4b5efc13e05ace773 Parents: d802f32 Author: Alex Behm <[email protected]> Authored: Thu Oct 20 21:16:33 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Sat Nov 5 01:00:00 2016 +0000 ---------------------------------------------------------------------- be/src/exec/union-node.cc | 192 +++++++++++++++++------------------------ be/src/exec/union-node.h | 36 ++++---- 2 files changed, 95 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e9a4077b/be/src/exec/union-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc index 8b56099..f4d3b69 100644 --- a/be/src/exec/union-node.cc +++ b/be/src/exec/union-node.cc @@ -20,7 +20,7 @@ #include "exprs/expr-context.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" -#include "runtime/raw-value.h" +#include "runtime/tuple.h" #include "runtime/tuple-row.h" #include "util/runtime-profile-counters.h" #include "gen-cpp/PlanNodes_types.h" @@ -30,33 +30,34 @@ namespace impala { UnionNode::UnionNode(ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), tuple_id_(tnode.union_node.tuple_id), tuple_desc_(NULL), - const_result_expr_idx_(0), child_idx_(0), - child_row_batch_(NULL), + child_batch_(NULL), child_row_idx_(0), - child_eos_(false) { + child_eos_(false), + const_expr_list_idx_(0) { } Status UnionNode::Init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::Init(tnode, state)); DCHECK(tnode.__isset.union_node); + DCHECK_EQ(conjunct_ctxs_.size(), 0); // Create const_expr_ctx_lists_ from thrift exprs. const vector<vector<TExpr>>& const_texpr_lists = tnode.union_node.const_expr_lists; - for (int i = 0; i < const_texpr_lists.size(); ++i) { + for (const vector<TExpr>& texprs : const_texpr_lists) { vector<ExprContext*> ctxs; - RETURN_IF_ERROR(Expr::CreateExprTrees(pool_, const_texpr_lists[i], &ctxs)); - const_result_expr_ctx_lists_.push_back(ctxs); + RETURN_IF_ERROR(Expr::CreateExprTrees(pool_, texprs, &ctxs)); + const_expr_lists_.push_back(ctxs); } // Create result_expr_ctx_lists_ from thrift exprs. const vector<vector<TExpr>>& result_texpr_lists = tnode.union_node.result_expr_lists; - for (int i = 0; i < result_texpr_lists.size(); ++i) { + for (const vector<TExpr>& texprs : result_texpr_lists) { vector<ExprContext*> ctxs; - RETURN_IF_ERROR(Expr::CreateExprTrees(pool_, result_texpr_lists[i], &ctxs)); - result_expr_ctx_lists_.push_back(ctxs); + RETURN_IF_ERROR(Expr::CreateExprTrees(pool_, texprs, &ctxs)); + child_expr_lists_.push_back(ctxs); } return Status::OK(); } @@ -68,19 +69,18 @@ Status UnionNode::Prepare(RuntimeState* state) { DCHECK(tuple_desc_ != NULL); // Prepare const expr lists. - for (int i = 0; i < const_result_expr_ctx_lists_.size(); ++i) { - RETURN_IF_ERROR(Expr::Prepare( - const_result_expr_ctx_lists_[i], state, row_desc(), expr_mem_tracker())); - AddExprCtxsToFree(const_result_expr_ctx_lists_[i]); - DCHECK_EQ(const_result_expr_ctx_lists_[i].size(), tuple_desc_->slots().size()); + for (const vector<ExprContext*>& exprs : const_expr_lists_) { + RETURN_IF_ERROR(Expr::Prepare(exprs, state, row_desc(), expr_mem_tracker())); + AddExprCtxsToFree(exprs); + DCHECK_EQ(exprs.size(), tuple_desc_->slots().size()); } // Prepare result expr lists. - for (int i = 0; i < result_expr_ctx_lists_.size(); ++i) { + for (int i = 0; i < child_expr_lists_.size(); ++i) { RETURN_IF_ERROR(Expr::Prepare( - result_expr_ctx_lists_[i], state, child(i)->row_desc(), expr_mem_tracker())); - AddExprCtxsToFree(result_expr_ctx_lists_[i]); - DCHECK_EQ(result_expr_ctx_lists_[i].size(), tuple_desc_->slots().size()); + child_expr_lists_[i], state, child(i)->row_desc(), expr_mem_tracker())); + AddExprCtxsToFree(child_expr_lists_[i]); + DCHECK_EQ(child_expr_lists_[i].size(), tuple_desc_->slots().size()); } return Status::OK(); } @@ -89,12 +89,12 @@ Status UnionNode::Open(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(ExecNode::Open(state)); // Open const expr lists. - for (int i = 0; i < const_result_expr_ctx_lists_.size(); ++i) { - RETURN_IF_ERROR(Expr::Open(const_result_expr_ctx_lists_[i], state)); + for (const vector<ExprContext*>& exprs : const_expr_lists_) { + RETURN_IF_ERROR(Expr::Open(exprs, state)); } // Open result expr lists. - for (int i = 0; i < result_expr_ctx_lists_.size(); ++i) { - RETURN_IF_ERROR(Expr::Open(result_expr_ctx_lists_[i], state)); + for (const vector<ExprContext*>& exprs : child_expr_lists_) { + RETURN_IF_ERROR(Expr::Open(exprs, state)); } // Open and fetch from the first child if there is one. Ensures that rows are @@ -106,11 +106,11 @@ Status UnionNode::Open(RuntimeState* state) { Status UnionNode::OpenCurrentChild(RuntimeState* state) { DCHECK_LT(child_idx_, children_.size()); - child_row_batch_.reset(new RowBatch( + child_batch_.reset(new RowBatch( child(child_idx_)->row_desc(), state->batch_size(), mem_tracker())); // Open child and fetch the first row batch. RETURN_IF_ERROR(child(child_idx_)->Open(state)); - RETURN_IF_ERROR(child(child_idx_)->GetNext(state, child_row_batch_.get(), + RETURN_IF_ERROR(child(child_idx_)->GetNext(state, child_batch_.get(), &child_eos_)); child_row_idx_ = 0; return Status::OK(); @@ -122,35 +122,38 @@ Status UnionNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(QueryMaintenance(state)); // Create new tuple buffer for row_batch. - int64_t tuple_buffer_size; - uint8_t* tuple_buffer; + int64_t tuple_buf_size; + uint8_t* tuple_buf; RETURN_IF_ERROR( - row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buffer_size, &tuple_buffer)); - Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buffer); - tuple->Init(tuple_buffer_size); + row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buf_size, &tuple_buf)); + memset(tuple_buf, 0, tuple_buf_size); // Fetch from children, evaluate corresponding exprs and materialize. while (child_idx_ < children_.size()) { // Row batch was either never set or we're moving on to a different child. - if (child_row_batch_.get() == NULL) RETURN_IF_ERROR(OpenCurrentChild(state)); + if (child_batch_.get() == NULL) RETURN_IF_ERROR(OpenCurrentChild(state)); - // Start (or continue) consuming row batches from current child. + // Start or continue consuming row batches from current child. while (true) { - RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(QueryMaintenance(state)); - // Continue materializing exprs on child_row_batch_ into row batch. - RETURN_IF_ERROR(EvalAndMaterializeExprs(result_expr_ctx_lists_[child_idx_], false, - &tuple, row_batch)); - if (row_batch->AtCapacity() || ReachedLimit()) { + // Start or continue processing current child batch. + while (child_row_idx_ < child_batch_->num_rows()) { + TupleRow* child_row = child_batch_->GetRow(child_row_idx_); + MaterializeExprs(child_expr_lists_[child_idx_], child_row, tuple_buf, row_batch); + tuple_buf += tuple_desc_->byte_size(); + ++child_row_idx_; *eos = ReachedLimit(); - return Status::OK(); + if (*eos || row_batch->AtCapacity()) { + COUNTER_SET(rows_returned_counter_, num_rows_returned_); + return Status::OK(); + } } // Fetch new batch if one is available, otherwise move on to next child. if (child_eos_) break; - child_row_batch_->Reset(); - RETURN_IF_ERROR(child(child_idx_)->GetNext(state, child_row_batch_.get(), + child_batch_->Reset(); + RETURN_IF_ERROR(child(child_idx_)->GetNext(state, child_batch_.get(), &child_eos_)); child_row_idx_ = 0; } @@ -160,7 +163,7 @@ Status UnionNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { // transfered all resources. It is not OK to close the child above in the case when // ReachedLimit() is true as we may end up releasing resources that are referenced // by the output row_batch. - child_row_batch_.reset(); + child_batch_.reset(); // Unless we are inside a subplan expecting to call Open()/GetNext() on the child // again, the child can be closed at this point. @@ -168,97 +171,58 @@ Status UnionNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { ++child_idx_; } - // Evaluate and materialize the const expr lists exactly once. - while (const_result_expr_idx_ < const_result_expr_ctx_lists_.size()) { - // Only evaluate the const expr lists by the first fragment instance. - if (state->fragment_ctx().per_fragment_instance_idx == 0) { - // Materialize expr results into row_batch. - RETURN_IF_ERROR(EvalAndMaterializeExprs( - const_result_expr_ctx_lists_[const_result_expr_idx_], true, &tuple, - row_batch)); + // Only evaluate the const expr lists by the first fragment instance. + if (state->fragment_ctx().per_fragment_instance_idx == 0) { + // Evaluate and materialize the const expr lists exactly once. + while (const_expr_list_idx_ < const_expr_lists_.size()) { + MaterializeExprs( + const_expr_lists_[const_expr_list_idx_], NULL, tuple_buf, row_batch); + tuple_buf += tuple_desc_->byte_size(); + ++const_expr_list_idx_; + *eos = ReachedLimit(); + if (*eos || row_batch->AtCapacity()) { + COUNTER_SET(rows_returned_counter_, num_rows_returned_); + return Status::OK(); + } } - ++const_result_expr_idx_; - *eos = ReachedLimit(); - if (*eos || row_batch->AtCapacity()) return Status::OK(); } + COUNTER_SET(rows_returned_counter_, num_rows_returned_); *eos = true; return Status::OK(); } +void UnionNode::MaterializeExprs(const vector<ExprContext*>& exprs, + TupleRow* row, uint8_t* tuple_buf, RowBatch* dst_batch) { + DCHECK(!dst_batch->AtCapacity()); + Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf); + TupleRow* dst_row = dst_batch->GetRow(dst_batch->AddRow()); + dst_tuple->MaterializeExprs<false, false>(row, *tuple_desc_, + exprs, dst_batch->tuple_data_pool()); + dst_row->SetTuple(0, dst_tuple); + dst_batch->CommitLastRow(); + ++num_rows_returned_; +} + Status UnionNode::Reset(RuntimeState* state) { child_row_idx_ = 0; - const_result_expr_idx_ = 0; + const_expr_list_idx_ = 0; child_idx_ = 0; - child_row_batch_.reset(); + child_batch_.reset(); child_eos_ = false; return ExecNode::Reset(state); } void UnionNode::Close(RuntimeState* state) { if (is_closed()) return; - child_row_batch_.reset(); - for (int i = 0; i < const_result_expr_ctx_lists_.size(); ++i) { - Expr::Close(const_result_expr_ctx_lists_[i], state); + child_batch_.reset(); + for (const vector<ExprContext*>& exprs : const_expr_lists_) { + Expr::Close(exprs, state); } - for (int i = 0; i < result_expr_ctx_lists_.size(); ++i) { - Expr::Close(result_expr_ctx_lists_[i], state); + for (const vector<ExprContext*>& exprs : child_expr_lists_) { + Expr::Close(exprs, state); } ExecNode::Close(state); } -Status UnionNode::EvalAndMaterializeExprs(const vector<ExprContext*>& ctxs, - bool const_exprs, Tuple** tuple, RowBatch* row_batch) { - // Make sure there are rows left in the batch. - if (!const_exprs && child_row_idx_ >= child_row_batch_->num_rows()) { - return Status::OK(); - } - // Execute the body at least once. - bool done = true; - ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0]; - int num_conjunct_ctxs = conjunct_ctxs_.size(); - - do { - TupleRow* child_row = NULL; - if (!const_exprs) { - DCHECK(child_row_batch_ != NULL); - // Non-const expr list. Fetch next row from batch. - child_row = child_row_batch_->GetRow(child_row_idx_); - ++child_row_idx_; - done = child_row_idx_ >= child_row_batch_->num_rows(); - } - - // Add a new row to the batch. - int row_idx = row_batch->AddRow(); - TupleRow* row = row_batch->GetRow(row_idx); - row->SetTuple(0, *tuple); - - // Materialize expr results into tuple. - DCHECK_EQ(ctxs.size(), tuple_desc_->slots().size()); - for (int i = 0; i < ctxs.size(); ++i) { - // our exprs correspond to materialized slots - SlotDescriptor* slot_desc = tuple_desc_->slots()[i]; - const void* value = ctxs[i]->GetValue(child_row); - RETURN_IF_ERROR(ctxs[i]->root()->GetFnContextError(ctxs[i])); - RawValue::Write(value, *tuple, slot_desc, row_batch->tuple_data_pool()); - } - - if (EvalConjuncts(conjunct_ctxs, num_conjunct_ctxs, row)) { - row_batch->CommitLastRow(); - ++num_rows_returned_; - COUNTER_SET(rows_returned_counter_, num_rows_returned_); - char* new_tuple = reinterpret_cast<char*>(*tuple); - new_tuple += tuple_desc_->byte_size(); - *tuple = reinterpret_cast<Tuple*>(new_tuple); - } else { - // Make sure to reset null indicators since we're overwriting - // the tuple assembled for the previous row. - (*tuple)->Init(tuple_desc_->byte_size()); - } - if (row_batch->AtCapacity() || ReachedLimit()) return Status::OK(); - } while (!done); - - return Status::OK(); -} - } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e9a4077b/be/src/exec/union-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/union-node.h b/be/src/exec/union-node.h index 65c8efc..cc835cd 100644 --- a/be/src/exec/union-node.h +++ b/be/src/exec/union-node.h @@ -22,14 +22,16 @@ #include <boost/scoped_ptr.hpp> #include "exec/exec-node.h" -#include "exprs/expr.h" -#include "runtime/mem-pool.h" -#include <boost/scoped_ptr.hpp> +#include "runtime/row-batch.h" namespace impala { +class DescriptorTbl; +class ExprContext; +class RuntimeState; class Tuple; class TupleRow; +class TPlanNode; /// Node that merges the results of its children by materializing their /// evaluated expressions into row batches. The UnionNode pulls row batches from its @@ -48,29 +50,27 @@ class UnionNode : public ExecNode { private: /// Tuple id resolved in Prepare() to set tuple_desc_; - int tuple_id_; + const int tuple_id_; /// Descriptor for tuples this union node constructs. const TupleDescriptor* tuple_desc_; /// Const exprs materialized by this node. These exprs don't refer to any children. - std::vector<std::vector<ExprContext*>> const_result_expr_ctx_lists_; + /// Only materialized by the first fragment instance to avoid duplication. + std::vector<std::vector<ExprContext*>> const_expr_lists_; /// Exprs materialized by this node. The i-th result expr list refers to the i-th child. - std::vector<std::vector<ExprContext*>> result_expr_ctx_lists_; + std::vector<std::vector<ExprContext*>> child_expr_lists_; ///////////////////////////////////////// /// BEGIN: Members that must be Reset() - /// Index of current const result expr list. - int const_result_expr_idx_; - /// Index of current child. int child_idx_; /// Current row batch of current child. We reset the pointer to a new RowBatch /// when switching to a different child. - boost::scoped_ptr<RowBatch> child_row_batch_; + boost::scoped_ptr<RowBatch> child_batch_; /// Index of current row in child_row_batch_. int child_row_idx_; @@ -78,6 +78,9 @@ class UnionNode : public ExecNode { /// Saved from the last to GetNext() on the current child. bool child_eos_; + /// Index of current const result expr list. + int const_expr_list_idx_; + /// END: Members that must be Reset() ///////////////////////////////////////// @@ -85,15 +88,10 @@ class UnionNode : public ExecNode { /// and sets child_row_idx_ to 0. May set child_eos_. Status OpenCurrentChild(RuntimeState* state); - /// Evaluates exprs on all rows in child_row_batch_ starting from child_row_idx_, - /// and materializes their results into *tuple. - /// Adds *tuple into row_batch, and increments *tuple. - /// If const_exprs is true, then the exprs are evaluated exactly once without - /// fetching rows from child_row_batch_. - /// Only commits tuples to row_batch if they are not filtered by conjuncts. - /// Returns an error status if evaluating an expression results in one. - Status EvalAndMaterializeExprs(const std::vector<ExprContext*>& ctxs, - bool const_exprs, Tuple** tuple, RowBatch* row_batch); + /// Evaluates 'exprs' over 'row', materializes the results in 'tuple_buf'. + /// and appends the new tuple to 'dst_batch'. Increments 'num_rows_returned_'. + inline void MaterializeExprs(const std::vector<ExprContext*>& exprs, + TupleRow* row, uint8_t* tuple_buf, RowBatch* dst_batch); }; }
