IMPALA-3586: Clean up union-node.h/cc to enable improvements.

This patch does not address IMPALA-3586, but it cleans up the
code in union-node.h/cc to make it easier to implement those
perf improvements.

The major simplification is to remove conjunct evaluation since
the planner does not assigns conjuncts to a union-node anymore.
Conjuncts are always pushed to the union operands.

Change-Id: Ia5fc23985e8d51acb8a6920717ce4e2f0254fe70
Reviewed-on: http://gerrit.cloudera.org:8080/4817
Reviewed-by: Alex Behm <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/e9a4077b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/e9a4077b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/e9a4077b

Branch: refs/heads/master
Commit: e9a4077b3589ee0a6debadb4b5efc13e05ace773
Parents: d802f32
Author: Alex Behm <[email protected]>
Authored: Thu Oct 20 21:16:33 2016 -0700
Committer: Internal Jenkins <[email protected]>
Committed: Sat Nov 5 01:00:00 2016 +0000

----------------------------------------------------------------------
 be/src/exec/union-node.cc | 192 +++++++++++++++++------------------------
 be/src/exec/union-node.h  |  36 ++++----
 2 files changed, 95 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e9a4077b/be/src/exec/union-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc
index 8b56099..f4d3b69 100644
--- a/be/src/exec/union-node.cc
+++ b/be/src/exec/union-node.cc
@@ -20,7 +20,7 @@
 #include "exprs/expr-context.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
-#include "runtime/raw-value.h"
+#include "runtime/tuple.h"
 #include "runtime/tuple-row.h"
 #include "util/runtime-profile-counters.h"
 #include "gen-cpp/PlanNodes_types.h"
@@ -30,33 +30,34 @@
 namespace impala {
 
 UnionNode::UnionNode(ObjectPool* pool, const TPlanNode& tnode,
-                     const DescriptorTbl& descs)
+    const DescriptorTbl& descs)
     : ExecNode(pool, tnode, descs),
       tuple_id_(tnode.union_node.tuple_id),
       tuple_desc_(NULL),
-      const_result_expr_idx_(0),
       child_idx_(0),
-      child_row_batch_(NULL),
+      child_batch_(NULL),
       child_row_idx_(0),
-      child_eos_(false) {
+      child_eos_(false),
+      const_expr_list_idx_(0) {
 }
 
 Status UnionNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Init(tnode, state));
   DCHECK(tnode.__isset.union_node);
+  DCHECK_EQ(conjunct_ctxs_.size(), 0);
   // Create const_expr_ctx_lists_ from thrift exprs.
   const vector<vector<TExpr>>& const_texpr_lists = 
tnode.union_node.const_expr_lists;
-  for (int i = 0; i < const_texpr_lists.size(); ++i) {
+  for (const vector<TExpr>& texprs : const_texpr_lists) {
     vector<ExprContext*> ctxs;
-    RETURN_IF_ERROR(Expr::CreateExprTrees(pool_, const_texpr_lists[i], &ctxs));
-    const_result_expr_ctx_lists_.push_back(ctxs);
+    RETURN_IF_ERROR(Expr::CreateExprTrees(pool_, texprs, &ctxs));
+    const_expr_lists_.push_back(ctxs);
   }
   // Create result_expr_ctx_lists_ from thrift exprs.
   const vector<vector<TExpr>>& result_texpr_lists = 
tnode.union_node.result_expr_lists;
-  for (int i = 0; i < result_texpr_lists.size(); ++i) {
+  for (const vector<TExpr>& texprs : result_texpr_lists) {
     vector<ExprContext*> ctxs;
-    RETURN_IF_ERROR(Expr::CreateExprTrees(pool_, result_texpr_lists[i], 
&ctxs));
-    result_expr_ctx_lists_.push_back(ctxs);
+    RETURN_IF_ERROR(Expr::CreateExprTrees(pool_, texprs, &ctxs));
+    child_expr_lists_.push_back(ctxs);
   }
   return Status::OK();
 }
@@ -68,19 +69,18 @@ Status UnionNode::Prepare(RuntimeState* state) {
   DCHECK(tuple_desc_ != NULL);
 
   // Prepare const expr lists.
-  for (int i = 0; i < const_result_expr_ctx_lists_.size(); ++i) {
-    RETURN_IF_ERROR(Expr::Prepare(
-        const_result_expr_ctx_lists_[i], state, row_desc(), 
expr_mem_tracker()));
-    AddExprCtxsToFree(const_result_expr_ctx_lists_[i]);
-    DCHECK_EQ(const_result_expr_ctx_lists_[i].size(), 
tuple_desc_->slots().size());
+  for (const vector<ExprContext*>& exprs : const_expr_lists_) {
+    RETURN_IF_ERROR(Expr::Prepare(exprs, state, row_desc(), 
expr_mem_tracker()));
+    AddExprCtxsToFree(exprs);
+    DCHECK_EQ(exprs.size(), tuple_desc_->slots().size());
   }
 
   // Prepare result expr lists.
-  for (int i = 0; i < result_expr_ctx_lists_.size(); ++i) {
+  for (int i = 0; i < child_expr_lists_.size(); ++i) {
     RETURN_IF_ERROR(Expr::Prepare(
-        result_expr_ctx_lists_[i], state, child(i)->row_desc(), 
expr_mem_tracker()));
-    AddExprCtxsToFree(result_expr_ctx_lists_[i]);
-    DCHECK_EQ(result_expr_ctx_lists_[i].size(), tuple_desc_->slots().size());
+        child_expr_lists_[i], state, child(i)->row_desc(), 
expr_mem_tracker()));
+    AddExprCtxsToFree(child_expr_lists_[i]);
+    DCHECK_EQ(child_expr_lists_[i].size(), tuple_desc_->slots().size());
   }
   return Status::OK();
 }
@@ -89,12 +89,12 @@ Status UnionNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Open(state));
   // Open const expr lists.
-  for (int i = 0; i < const_result_expr_ctx_lists_.size(); ++i) {
-    RETURN_IF_ERROR(Expr::Open(const_result_expr_ctx_lists_[i], state));
+  for (const vector<ExprContext*>& exprs : const_expr_lists_) {
+    RETURN_IF_ERROR(Expr::Open(exprs, state));
   }
   // Open result expr lists.
-  for (int i = 0; i < result_expr_ctx_lists_.size(); ++i) {
-    RETURN_IF_ERROR(Expr::Open(result_expr_ctx_lists_[i], state));
+  for (const vector<ExprContext*>& exprs : child_expr_lists_) {
+    RETURN_IF_ERROR(Expr::Open(exprs, state));
   }
 
   // Open and fetch from the first child if there is one. Ensures that rows are
@@ -106,11 +106,11 @@ Status UnionNode::Open(RuntimeState* state) {
 
 Status UnionNode::OpenCurrentChild(RuntimeState* state) {
   DCHECK_LT(child_idx_, children_.size());
-  child_row_batch_.reset(new RowBatch(
+  child_batch_.reset(new RowBatch(
       child(child_idx_)->row_desc(), state->batch_size(), mem_tracker()));
   // Open child and fetch the first row batch.
   RETURN_IF_ERROR(child(child_idx_)->Open(state));
-  RETURN_IF_ERROR(child(child_idx_)->GetNext(state, child_row_batch_.get(),
+  RETURN_IF_ERROR(child(child_idx_)->GetNext(state, child_batch_.get(),
       &child_eos_));
   child_row_idx_ = 0;
   return Status::OK();
@@ -122,35 +122,38 @@ Status UnionNode::GetNext(RuntimeState* state, RowBatch* 
row_batch, bool* eos) {
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
   // Create new tuple buffer for row_batch.
-  int64_t tuple_buffer_size;
-  uint8_t* tuple_buffer;
+  int64_t tuple_buf_size;
+  uint8_t* tuple_buf;
   RETURN_IF_ERROR(
-      row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buffer_size, 
&tuple_buffer));
-  Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buffer);
-  tuple->Init(tuple_buffer_size);
+      row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buf_size, 
&tuple_buf));
+  memset(tuple_buf, 0, tuple_buf_size);
 
   // Fetch from children, evaluate corresponding exprs and materialize.
   while (child_idx_ < children_.size()) {
     // Row batch was either never set or we're moving on to a different child.
-    if (child_row_batch_.get() == NULL) 
RETURN_IF_ERROR(OpenCurrentChild(state));
+    if (child_batch_.get() == NULL) RETURN_IF_ERROR(OpenCurrentChild(state));
 
-    // Start (or continue) consuming row batches from current child.
+    // Start or continue consuming row batches from current child.
     while (true) {
-      RETURN_IF_CANCELLED(state);
       RETURN_IF_ERROR(QueryMaintenance(state));
 
-      // Continue materializing exprs on child_row_batch_ into row batch.
-      
RETURN_IF_ERROR(EvalAndMaterializeExprs(result_expr_ctx_lists_[child_idx_], 
false,
-          &tuple, row_batch));
-      if (row_batch->AtCapacity() || ReachedLimit()) {
+      // Start or continue processing current child batch.
+      while (child_row_idx_ < child_batch_->num_rows()) {
+        TupleRow* child_row = child_batch_->GetRow(child_row_idx_);
+        MaterializeExprs(child_expr_lists_[child_idx_], child_row, tuple_buf, 
row_batch);
+        tuple_buf += tuple_desc_->byte_size();
+        ++child_row_idx_;
         *eos = ReachedLimit();
-        return Status::OK();
+        if (*eos || row_batch->AtCapacity()) {
+          COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+          return Status::OK();
+        }
       }
 
       // Fetch new batch if one is available, otherwise move on to next child.
       if (child_eos_) break;
-      child_row_batch_->Reset();
-      RETURN_IF_ERROR(child(child_idx_)->GetNext(state, child_row_batch_.get(),
+      child_batch_->Reset();
+      RETURN_IF_ERROR(child(child_idx_)->GetNext(state, child_batch_.get(),
           &child_eos_));
       child_row_idx_ = 0;
     }
@@ -160,7 +163,7 @@ Status UnionNode::GetNext(RuntimeState* state, RowBatch* 
row_batch, bool* eos) {
     // transfered all resources. It is not OK to close the child above in the 
case when
     // ReachedLimit() is true as we may end up releasing resources that are 
referenced
     // by the output row_batch.
-    child_row_batch_.reset();
+    child_batch_.reset();
 
     // Unless we are inside a subplan expecting to call Open()/GetNext() on 
the child
     // again, the child can be closed at this point.
@@ -168,97 +171,58 @@ Status UnionNode::GetNext(RuntimeState* state, RowBatch* 
row_batch, bool* eos) {
     ++child_idx_;
   }
 
-  // Evaluate and materialize the const expr lists exactly once.
-  while (const_result_expr_idx_ < const_result_expr_ctx_lists_.size()) {
-    // Only evaluate the const expr lists by the first fragment instance.
-    if (state->fragment_ctx().per_fragment_instance_idx == 0) {
-      // Materialize expr results into row_batch.
-      RETURN_IF_ERROR(EvalAndMaterializeExprs(
-          const_result_expr_ctx_lists_[const_result_expr_idx_], true, &tuple,
-          row_batch));
+  // Only evaluate the const expr lists by the first fragment instance.
+  if (state->fragment_ctx().per_fragment_instance_idx == 0) {
+    // Evaluate and materialize the const expr lists exactly once.
+    while (const_expr_list_idx_ < const_expr_lists_.size()) {
+      MaterializeExprs(
+          const_expr_lists_[const_expr_list_idx_], NULL, tuple_buf, row_batch);
+      tuple_buf += tuple_desc_->byte_size();
+      ++const_expr_list_idx_;
+      *eos = ReachedLimit();
+      if (*eos || row_batch->AtCapacity()) {
+        COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+        return Status::OK();
+      }
     }
-    ++const_result_expr_idx_;
-    *eos = ReachedLimit();
-    if (*eos || row_batch->AtCapacity()) return Status::OK();
   }
 
+  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
   *eos = true;
   return Status::OK();
 }
 
+void UnionNode::MaterializeExprs(const vector<ExprContext*>& exprs,
+    TupleRow* row, uint8_t* tuple_buf, RowBatch* dst_batch) {
+  DCHECK(!dst_batch->AtCapacity());
+  Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf);
+  TupleRow* dst_row = dst_batch->GetRow(dst_batch->AddRow());
+  dst_tuple->MaterializeExprs<false, false>(row, *tuple_desc_,
+      exprs, dst_batch->tuple_data_pool());
+  dst_row->SetTuple(0, dst_tuple);
+  dst_batch->CommitLastRow();
+  ++num_rows_returned_;
+}
+
 Status UnionNode::Reset(RuntimeState* state) {
   child_row_idx_ = 0;
-  const_result_expr_idx_ = 0;
+  const_expr_list_idx_ = 0;
   child_idx_ = 0;
-  child_row_batch_.reset();
+  child_batch_.reset();
   child_eos_ = false;
   return ExecNode::Reset(state);
 }
 
 void UnionNode::Close(RuntimeState* state) {
   if (is_closed()) return;
-  child_row_batch_.reset();
-  for (int i = 0; i < const_result_expr_ctx_lists_.size(); ++i) {
-    Expr::Close(const_result_expr_ctx_lists_[i], state);
+  child_batch_.reset();
+  for (const vector<ExprContext*>& exprs : const_expr_lists_) {
+    Expr::Close(exprs, state);
   }
-  for (int i = 0; i < result_expr_ctx_lists_.size(); ++i) {
-    Expr::Close(result_expr_ctx_lists_[i], state);
+  for (const vector<ExprContext*>& exprs : child_expr_lists_) {
+    Expr::Close(exprs, state);
   }
   ExecNode::Close(state);
 }
 
-Status UnionNode::EvalAndMaterializeExprs(const vector<ExprContext*>& ctxs,
-    bool const_exprs, Tuple** tuple, RowBatch* row_batch) {
-  // Make sure there are rows left in the batch.
-  if (!const_exprs && child_row_idx_ >= child_row_batch_->num_rows()) {
-    return Status::OK();
-  }
-  // Execute the body at least once.
-  bool done = true;
-  ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
-  int num_conjunct_ctxs = conjunct_ctxs_.size();
-
-  do {
-    TupleRow* child_row = NULL;
-    if (!const_exprs) {
-      DCHECK(child_row_batch_ != NULL);
-      // Non-const expr list. Fetch next row from batch.
-      child_row = child_row_batch_->GetRow(child_row_idx_);
-      ++child_row_idx_;
-      done = child_row_idx_ >= child_row_batch_->num_rows();
-    }
-
-    // Add a new row to the batch.
-    int row_idx = row_batch->AddRow();
-    TupleRow* row = row_batch->GetRow(row_idx);
-    row->SetTuple(0, *tuple);
-
-    // Materialize expr results into tuple.
-    DCHECK_EQ(ctxs.size(), tuple_desc_->slots().size());
-    for (int i = 0; i < ctxs.size(); ++i) {
-      // our exprs correspond to materialized slots
-      SlotDescriptor* slot_desc = tuple_desc_->slots()[i];
-      const void* value = ctxs[i]->GetValue(child_row);
-      RETURN_IF_ERROR(ctxs[i]->root()->GetFnContextError(ctxs[i]));
-      RawValue::Write(value, *tuple, slot_desc, row_batch->tuple_data_pool());
-    }
-
-    if (EvalConjuncts(conjunct_ctxs, num_conjunct_ctxs, row)) {
-      row_batch->CommitLastRow();
-      ++num_rows_returned_;
-      COUNTER_SET(rows_returned_counter_, num_rows_returned_);
-      char* new_tuple = reinterpret_cast<char*>(*tuple);
-      new_tuple += tuple_desc_->byte_size();
-      *tuple = reinterpret_cast<Tuple*>(new_tuple);
-    } else {
-      // Make sure to reset null indicators since we're overwriting
-      // the tuple assembled for the previous row.
-      (*tuple)->Init(tuple_desc_->byte_size());
-    }
-    if (row_batch->AtCapacity() || ReachedLimit()) return Status::OK();
-  } while (!done);
-
-  return Status::OK();
-}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e9a4077b/be/src/exec/union-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node.h b/be/src/exec/union-node.h
index 65c8efc..cc835cd 100644
--- a/be/src/exec/union-node.h
+++ b/be/src/exec/union-node.h
@@ -22,14 +22,16 @@
 #include <boost/scoped_ptr.hpp>
 
 #include "exec/exec-node.h"
-#include "exprs/expr.h"
-#include "runtime/mem-pool.h"
-#include <boost/scoped_ptr.hpp>
+#include "runtime/row-batch.h"
 
 namespace impala {
 
+class DescriptorTbl;
+class ExprContext;
+class RuntimeState;
 class Tuple;
 class TupleRow;
+class TPlanNode;
 
 /// Node that merges the results of its children by materializing their
 /// evaluated expressions into row batches. The UnionNode pulls row batches 
from its
@@ -48,29 +50,27 @@ class UnionNode : public ExecNode {
 
  private:
   /// Tuple id resolved in Prepare() to set tuple_desc_;
-  int tuple_id_;
+  const int tuple_id_;
 
   /// Descriptor for tuples this union node constructs.
   const TupleDescriptor* tuple_desc_;
 
   /// Const exprs materialized by this node. These exprs don't refer to any 
children.
-  std::vector<std::vector<ExprContext*>> const_result_expr_ctx_lists_;
+  /// Only materialized by the first fragment instance to avoid duplication.
+  std::vector<std::vector<ExprContext*>> const_expr_lists_;
 
   /// Exprs materialized by this node. The i-th result expr list refers to the 
i-th child.
-  std::vector<std::vector<ExprContext*>> result_expr_ctx_lists_;
+  std::vector<std::vector<ExprContext*>> child_expr_lists_;
 
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
 
-  /// Index of current const result expr list.
-  int const_result_expr_idx_;
-
   /// Index of current child.
   int child_idx_;
 
   /// Current row batch of current child. We reset the pointer to a new 
RowBatch
   /// when switching to a different child.
-  boost::scoped_ptr<RowBatch> child_row_batch_;
+  boost::scoped_ptr<RowBatch> child_batch_;
 
   /// Index of current row in child_row_batch_.
   int child_row_idx_;
@@ -78,6 +78,9 @@ class UnionNode : public ExecNode {
   /// Saved from the last to GetNext() on the current child.
   bool child_eos_;
 
+  /// Index of current const result expr list.
+  int const_expr_list_idx_;
+
   /// END: Members that must be Reset()
   /////////////////////////////////////////
 
@@ -85,15 +88,10 @@ class UnionNode : public ExecNode {
   /// and sets child_row_idx_ to 0. May set child_eos_.
   Status OpenCurrentChild(RuntimeState* state);
 
-  /// Evaluates exprs on all rows in child_row_batch_ starting from 
child_row_idx_,
-  /// and materializes their results into *tuple.
-  /// Adds *tuple into row_batch, and increments *tuple.
-  /// If const_exprs is true, then the exprs are evaluated exactly once without
-  /// fetching rows from child_row_batch_.
-  /// Only commits tuples to row_batch if they are not filtered by conjuncts.
-  /// Returns an error status if evaluating an expression results in one.
-  Status EvalAndMaterializeExprs(const std::vector<ExprContext*>& ctxs,
-      bool const_exprs, Tuple** tuple, RowBatch* row_batch);
+  /// Evaluates 'exprs' over 'row', materializes the results in 'tuple_buf'.
+  /// and appends the new tuple to 'dst_batch'. Increments 
'num_rows_returned_'.
+  inline void MaterializeExprs(const std::vector<ExprContext*>& exprs,
+      TupleRow* row, uint8_t* tuple_buf, RowBatch* dst_batch);
 };
 
 }

Reply via email to