IMPALA-3586: Implement union passthrough

The union node acts as pass through operator and forwards row batches
from it's children without materializing. This is done in the case
when the child's tuple layout is identical to union node tuple layout
and no functions need to be applied to the child row batches.

Removed operand reordering in the FE because it's simpler and safer to
handle all passthrough children before non-passthrough children in the
BE. The recent improvements to memory management allowed us to remove
this requirement.

Testing:
- Added new planner and end to end tests that cover the new
  functionality.
- Updated existing tests to reflect the new behavior.

Perf:
Ran a benchmark on a local 10 GB tpcds dataset. I used an unpartitioned
version of the store_sales table. There was over a 2x performance
improvement for the following query:

SELECT
  COUNT(ss_sold_time_sk),
  COUNT(ss_item_sk),
  COUNT(ss_customer_sk),
  COUNT(ss_cdemo_sk),
  COUNT(ss_hdemo_sk),
  COUNT(ss_addr_sk),
  COUNT(ss_store_sk),
  COUNT(ss_promo_sk),
  COUNT(ss_ticket_number),
  COUNT(ss_quantity),
  COUNT(ss_wholesale_cost),
  COUNT(ss_list_price),
  COUNT(ss_sales_price),
  COUNT(ss_ext_discount_amt),
  COUNT(ss_ext_sales_price),
  COUNT(ss_ext_wholesale_cost),
  COUNT(ss_ext_list_price),
  COUNT(ss_ext_tax),
  COUNT(ss_coupon_amt),
  COUNT(ss_net_paid),
  COUNT(ss_net_paid_inc_tax),
  COUNT(ss_net_profit),
  COUNT(ss_sold_date_sk)
FROM (
  select * from tpcds_10_parquet.store_sales_unpartitioned
  union all
  select * from tpcds_10_parquet.store_sales_unpartitioned
  union all
  select * from tpcds_10_parquet.store_sales_unpartitioned
  union all
  select * from tpcds_10_parquet.store_sales_unpartitioned
  union all
  select * from tpcds_10_parquet.store_sales_unpartitioned
  union all
  select * from tpcds_10_parquet.store_sales_unpartitioned
  union all
  select * from tpcds_10_parquet.store_sales_unpartitioned
  union all
  select * from tpcds_10_parquet.store_sales_unpartitioned
  union all
  select * from tpcds_10_parquet.store_sales_unpartitioned
  union all
  select * from tpcds_10_parquet.store_sales_unpartitioned
) t

Before:
Total Time: 43s164ms

Summary:
Operator          #Hosts   Avg Time   Max Time    #Rows  Est. #Rows   Peak Mem  
Est. Peak Mem  Detail
------------------------------------------------------------------------------------------------------------------------------
13:AGGREGATE           1  224.721us  224.721us        1           1   28.00 KB  
      -1.00 B  FINALIZE
12:EXCHANGE            1   24.578us   24.578us        3           1          0  
      -1.00 B  UNPARTITIONED
11:AGGREGATE           3    2s402ms    3s060ms        3           1  119.00 KB  
     10.00 MB
00:UNION               3   35s380ms   37s846ms  288.01M     288.01M    3.08 MB  
            0
|--02:SCAN HDFS        3  184.197ms  219.931ms   28.80M      28.80M  535.03 MB  
      1.88 GB  store_sales_unpartitioned
|--03:SCAN HDFS        3  131.956ms  153.401ms   28.80M      28.80M  534.98 MB  
      1.88 GB  store_sales_unpartitioned
|--04:SCAN HDFS        3  178.456ms  247.721ms   28.80M      28.80M  534.98 MB  
      1.88 GB  store_sales_unpartitioned
|--05:SCAN HDFS        3  189.398ms  242.251ms   28.80M      28.80M  535.01 MB  
      1.88 GB  store_sales_unpartitioned
|--06:SCAN HDFS        3  122.786ms  156.528ms   28.80M      28.80M  534.98 MB  
      1.88 GB  store_sales_unpartitioned
|--07:SCAN HDFS        3  147.467ms  183.391ms   28.80M      28.80M  535.13 MB  
      1.88 GB  store_sales_unpartitioned
|--08:SCAN HDFS        3  147.502ms  186.273ms   28.80M      28.80M  535.01 MB  
      1.88 GB  store_sales_unpartitioned
|--09:SCAN HDFS        3  130.086ms  154.682ms   28.80M      28.80M  535.04 MB  
      1.88 GB  store_sales_unpartitioned
|--10:SCAN HDFS        3  122.701ms  161.056ms   28.80M      28.80M  534.89 MB  
      1.88 GB  store_sales_unpartitioned
01:SCAN HDFS           3  287.863ms  330.436ms   28.80M      28.80M  534.98 MB  
      1.88 GB  store_sales_unpartitioned

After:
Total Time: 19s139ms

Summary:
Operator          #Hosts   Avg Time   Max Time    #Rows  Est. #Rows   Peak Mem  
Est. Peak Mem  Detail
------------------------------------------------------------------------------------------------------------------------------
13:AGGREGATE           1  166.241us  166.241us        1           1   28.00 KB  
      -1.00 B  FINALIZE
12:EXCHANGE            1   71.695us   71.695us        3           1          0  
      -1.00 B  UNPARTITIONED
11:AGGREGATE           3    2s971ms    3s809ms        3           1    3.08 MB  
     10.00 MB
00:UNION               3  207.956ms  222.846ms  288.01M     288.01M          0  
            0
|--02:SCAN HDFS        3    1s533ms    1s535ms   28.80M      28.80M  532.28 MB  
      1.88 GB  store_sales_unpartitioned
|--03:SCAN HDFS        3    1s554ms    1s669ms   28.80M      28.80M  525.73 MB  
      1.88 GB  store_sales_unpartitioned
|--04:SCAN HDFS        3    1s568ms    1s716ms   28.80M      28.80M  525.03 MB  
      1.88 GB  store_sales_unpartitioned
|--05:SCAN HDFS        3    1s503ms    1s617ms   28.80M      28.80M  527.43 MB  
      1.88 GB  store_sales_unpartitioned
|--06:SCAN HDFS        3    1s560ms    1s634ms   28.80M      28.80M  528.52 MB  
      1.88 GB  store_sales_unpartitioned
|--07:SCAN HDFS        3    1s489ms    1s643ms   28.80M      28.80M  534.81 MB  
      1.88 GB  store_sales_unpartitioned
|--08:SCAN HDFS        3    1s534ms    1s581ms   28.80M      28.80M  528.10 MB  
      1.88 GB  store_sales_unpartitioned
|--09:SCAN HDFS        3    1s558ms    1s674ms   28.80M      28.80M  526.77 MB  
      1.88 GB  store_sales_unpartitioned
|--10:SCAN HDFS        3    1s504ms    1s692ms   28.80M      28.80M  527.83 MB  
      1.88 GB  store_sales_unpartitioned
01:SCAN HDFS           3    1s682ms    1s911ms   28.80M      28.80M  526.14 MB  
      1.88 GB  store_sales_unpartitioned

Change-Id: Ia8f6d5062724ba5b78174c3227a7a796d10d8416
Reviewed-on: http://gerrit.cloudera.org:8080/5816
Reviewed-by: Dan Hecht <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a50c3440
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a50c3440
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a50c3440

Branch: refs/heads/master
Commit: a50c344077f6c9bbea3d3cbaa2e9146ba20ac9a9
Parents: 5deec60
Author: Taras Bobrovytsky <[email protected]>
Authored: Wed Jan 4 14:33:08 2017 -0800
Committer: Impala Public Jenkins <[email protected]>
Committed: Tue Mar 21 22:24:01 2017 +0000

----------------------------------------------------------------------
 be/src/exec/exchange-node.cc                    |    2 +-
 be/src/exec/union-node.cc                       |  224 ++--
 be/src/exec/union-node.h                        |   60 +-
 be/src/runtime/buffered-tuple-stream.cc         |    2 +-
 be/src/runtime/descriptors.cc                   |   33 +-
 be/src/runtime/descriptors.h                    |   20 +
 be/src/runtime/row-batch.cc                     |    2 +-
 common/thrift/ImpalaService.thrift              |    4 +-
 common/thrift/PlanNodes.thrift                  |    2 +
 .../apache/impala/analysis/SlotDescriptor.java  |   14 +
 .../apache/impala/analysis/TupleDescriptor.java |   12 -
 .../org/apache/impala/analysis/UnionStmt.java   |   17 +-
 .../impala/planner/DistributedPlanner.java      |    1 -
 .../impala/planner/SingleNodePlanner.java       |    3 +-
 .../org/apache/impala/planner/UnionNode.java    |  159 ++-
 .../impala/analysis/AnalyzeStmtsTest.java       |    2 +-
 .../queries/PlannerTest/aggregation.test        |   16 +
 .../queries/PlannerTest/analytic-fns.test       |  104 +-
 .../queries/PlannerTest/empty.test              |   16 +-
 .../queries/PlannerTest/inline-view.test        |    2 +
 .../queries/PlannerTest/kudu.test               |   30 +
 .../queries/PlannerTest/order.test              |  180 +--
 .../PlannerTest/predicate-propagation.test      |    2 +
 .../PlannerTest/runtime-filter-propagation.test |    3 +
 .../queries/PlannerTest/small-query-opt.test    |   25 +-
 .../queries/PlannerTest/subquery-rewrite.test   |    2 +
 .../queries/PlannerTest/topn.test               |    6 +
 .../queries/PlannerTest/union.test              | 1110 +++++++++++-------
 .../queries/PlannerTest/with-clause.test        |   12 +-
 .../queries/QueryTest/nested-types-subplan.test |   12 +
 .../queries/QueryTest/union.test                |   77 +-
 tests/query_test/test_queries.py                |   13 +
 32 files changed, 1432 insertions(+), 735 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 0f86339..6b5e52b 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -193,7 +193,7 @@ Status ExchangeNode::GetNext(RuntimeState* state, RowBatch* 
output_batch, bool*
     *eos = (input_batch_ == NULL);
     if (*eos) return Status::OK();
     next_row_idx_ = 0;
-    DCHECK(input_batch_->row_desc().IsPrefixOf(output_batch->row_desc()));
+    
DCHECK(input_batch_->row_desc().LayoutIsPrefixOf(output_batch->row_desc()));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/be/src/exec/union-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc
index 5457737..bff7435 100644
--- a/be/src/exec/union-node.cc
+++ b/be/src/exec/union-node.cc
@@ -33,13 +33,14 @@ UnionNode::UnionNode(ObjectPool* pool, const TPlanNode& 
tnode,
     const DescriptorTbl& descs)
     : ExecNode(pool, tnode, descs),
       tuple_id_(tnode.union_node.tuple_id),
-      tuple_desc_(NULL),
+      tuple_desc_(nullptr),
+      
first_materialized_child_idx_(tnode.union_node.first_materialized_child_idx),
       child_idx_(0),
-      child_batch_(NULL),
+      child_batch_(nullptr),
       child_row_idx_(0),
       child_eos_(false),
-      const_expr_list_idx_(0) {
-}
+      const_expr_list_idx_(0),
+      to_close_child_idx_(-1) { }
 
 Status UnionNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Init(tnode, state));
@@ -66,7 +67,7 @@ Status UnionNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Prepare(state));
   tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
-  DCHECK(tuple_desc_ != NULL);
+  DCHECK(tuple_desc_ != nullptr);
 
   // Prepare const expr lists.
   for (const vector<ExprContext*>& exprs : const_expr_lists_) {
@@ -97,98 +98,170 @@ Status UnionNode::Open(RuntimeState* state) {
     RETURN_IF_ERROR(Expr::Open(exprs, state));
   }
 
-  // Open and fetch from the first child if there is one. Ensures that rows are
-  // available for clients to fetch after this Open() has succeeded.
-  if (!children_.empty()) RETURN_IF_ERROR(OpenCurrentChild(state));
+  // Ensures that rows are available for clients to fetch after this Open() has
+  // succeeded.
+  if (!children_.empty()) RETURN_IF_ERROR(child(child_idx_)->Open(state));
 
   return Status::OK();
 }
 
-Status UnionNode::OpenCurrentChild(RuntimeState* state) {
+Status UnionNode::GetNextPassThrough(RuntimeState* state, RowBatch* row_batch) 
{
+  DCHECK(!ReachedLimit());
+  DCHECK(!IsInSubplan());
   DCHECK_LT(child_idx_, children_.size());
-  child_batch_.reset(new RowBatch(
-      child(child_idx_)->row_desc(), state->batch_size(), mem_tracker()));
-  // Open child and fetch the first row batch.
-  RETURN_IF_ERROR(child(child_idx_)->Open(state));
-  RETURN_IF_ERROR(child(child_idx_)->GetNext(state, child_batch_.get(),
-      &child_eos_));
-  child_row_idx_ = 0;
+  DCHECK(IsChildPassthrough(child_idx_));
+  DCHECK(child(child_idx_)->row_desc().LayoutEquals(row_batch->row_desc()));
+  if (child_eos_) RETURN_IF_ERROR(child(child_idx_)->Open(state));
+  DCHECK_EQ(row_batch->num_rows(), 0);
+  RETURN_IF_ERROR(child(child_idx_)->GetNext(state, row_batch, &child_eos_));
+  if (limit_ != -1 && num_rows_returned_ + row_batch->num_rows() > limit_) {
+    row_batch->set_num_rows(limit_ - num_rows_returned_);
+  }
+  num_rows_returned_ += row_batch->num_rows();
+  DCHECK(limit_ == -1 || num_rows_returned_ <= limit_);
+  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  if (child_eos_) {
+    // Even though the child is at eos, it's not OK to Close() it here. Once 
we close
+    // the child, the row batches that it produced are invalid. Marking the 
batch as
+    // needing a deep copy let's us safely close the child in the next 
GetNext() call.
+    // TODO: Remove this as part of IMPALA-4179.
+    row_batch->MarkNeedsDeepCopy();
+    to_close_child_idx_ = child_idx_;
+    ++child_idx_;
+  }
   return Status::OK();
 }
 
-Status UnionNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) 
{
-  SCOPED_TIMER(runtime_profile_->total_time_counter());
-  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
-  RETURN_IF_CANCELLED(state);
-  RETURN_IF_ERROR(QueryMaintenance(state));
-  // Create new tuple buffer for row_batch.
+Status UnionNode::GetNextMaterialized(RuntimeState* state, RowBatch* 
row_batch) {
+  // Fetch from children, evaluate corresponding exprs and materialize.
+  DCHECK(!ReachedLimit());
+  DCHECK_LT(child_idx_, children_.size());
   int64_t tuple_buf_size;
   uint8_t* tuple_buf;
   RETURN_IF_ERROR(
       row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buf_size, 
&tuple_buf));
   memset(tuple_buf, 0, tuple_buf_size);
 
-  // Fetch from children, evaluate corresponding exprs and materialize.
-  while (child_idx_ < children_.size()) {
-    // Row batch was either never set or we're moving on to a different child.
-    if (child_batch_.get() == NULL) RETURN_IF_ERROR(OpenCurrentChild(state));
-
-    // Start or continue consuming row batches from current child.
-    while (true) {
-      RETURN_IF_ERROR(QueryMaintenance(state));
-
-      // Start or continue processing current child batch.
-      while (child_row_idx_ < child_batch_->num_rows()) {
-        TupleRow* child_row = child_batch_->GetRow(child_row_idx_);
-        MaterializeExprs(child_expr_lists_[child_idx_], child_row, tuple_buf, 
row_batch);
-        tuple_buf += tuple_desc_->byte_size();
-        ++child_row_idx_;
-        *eos = ReachedLimit();
-        if (*eos || row_batch->AtCapacity()) {
-          COUNTER_SET(rows_returned_counter_, num_rows_returned_);
-          return Status::OK();
-        }
-      }
-
-      // Fetch new batch if one is available, otherwise move on to next child.
-      if (child_eos_) break;
-      child_batch_->Reset();
-      RETURN_IF_ERROR(child(child_idx_)->GetNext(state, child_batch_.get(),
-          &child_eos_));
+  while (HasMoreMaterialized() && !row_batch->AtCapacity()) {
+    // There are only 2 ways of getting out of this loop:
+    // 1. The loop ends normally when we are either done iterating over the 
children that
+    //    need materialization or the row batch is at capacity.
+    // 2. We return from the function from inside the loop if limit is reached.
+    DCHECK(!IsChildPassthrough(child_idx_));
+    // Child row batch was either never set or we're moving on to a different 
child.
+    if (child_batch_.get() == nullptr) {
+      DCHECK_LT(child_idx_, children_.size());
+      child_batch_.reset(new RowBatch(
+          child(child_idx_)->row_desc(), state->batch_size(), mem_tracker()));
       child_row_idx_ = 0;
+      // Open the current child unless it's the first child, which was already 
opened in
+      // UnionNode::Open().
+      if (child_eos_) RETURN_IF_ERROR(child(child_idx_)->Open(state));
+      // The first batch from each child is always fetched here.
+      RETURN_IF_ERROR(child(child_idx_)->GetNext(
+          state, child_batch_.get(), &child_eos_));
     }
 
-    // Close current child and move on to next one. It is OK to close the 
child as
-    // long as all RowBatches have already been consumed so that we are sure 
to have
-    // transfered all resources. It is not OK to close the child above in the 
case when
-    // ReachedLimit() is true as we may end up releasing resources that are 
referenced
-    // by the output row_batch.
-    child_batch_.reset();
-
-    // Unless we are inside a subplan expecting to call Open()/GetNext() on 
the child
-    // again, the child can be closed at this point.
-    if (!IsInSubplan()) child(child_idx_)->Close(state);
-    ++child_idx_;
-  }
-
-  // Only evaluate the const expr lists by the first fragment instance.
-  if (state->instance_ctx().per_fragment_instance_idx == 0) {
-    // Evaluate and materialize the const expr lists exactly once.
-    while (const_expr_list_idx_ < const_expr_lists_.size()) {
-      MaterializeExprs(
-          const_expr_lists_[const_expr_list_idx_], NULL, tuple_buf, row_batch);
+    while (!row_batch->AtCapacity()) {
+      // This loop fetches row batches from a single child and materializes 
each output
+      // row, until one of these conditions:
+      // 1. The loop ends normally if the row batch is at capacity.
+      // 2. We break out of the loop if all the rows were consumed from the 
current child
+      //    and we are moving on to the next child.
+      // 3. We return from the function from inside the loop if the limit is 
reached.
+      DCHECK(child_batch_.get() != nullptr);
+      DCHECK_LE(child_row_idx_, child_batch_->num_rows());
+      if (child_row_idx_ == child_batch_->num_rows()) {
+        // Move on to the next child if it is at eos.
+        if (child_eos_) break;
+        // Fetch more rows from the child.
+        child_batch_->Reset();
+        child_row_idx_ = 0;
+        // All batches except the first batch from each child are fetched here.
+        RETURN_IF_ERROR(child(child_idx_)->GetNext(
+            state, child_batch_.get(), &child_eos_));
+        // If we fetched an empty batch, go back to the beginning of this 
while loop, and
+        // try again.
+        if (child_batch_->num_rows() == 0) continue;
+      }
+      DCHECK_LT(child_row_idx_, child_batch_->num_rows());
+      TupleRow* child_row = child_batch_->GetRow(child_row_idx_);
+      MaterializeExprs(child_expr_lists_[child_idx_], child_row, tuple_buf, 
row_batch);
       tuple_buf += tuple_desc_->byte_size();
-      ++const_expr_list_idx_;
-      *eos = ReachedLimit();
-      if (*eos || row_batch->AtCapacity()) {
+      ++child_row_idx_;
+      if (ReachedLimit()) {
         COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+        // It's OK to close the child here even if we are inside a subplan.
+        child_batch_.reset();
+        child(child_idx_)->Close(state);
         return Status::OK();
       }
     }
+
+    DCHECK(!ReachedLimit());
+    if (child_eos_ && child_row_idx_ == child_batch_->num_rows()) {
+      // Unless we are inside a subplan expecting to call Open()/GetNext() on 
the child
+      // again, the child can be closed at this point.
+      child_batch_.reset();
+      if (!IsInSubplan()) child(child_idx_)->Close(state);
+      ++child_idx_;
+    } else {
+      // If we haven't finished consuming rows from the current child, we must 
have ended
+      // up here because the row batch is at capacity.
+      DCHECK(row_batch->AtCapacity());
+    }
+  }
+
+  DCHECK_LE(child_idx_, children_.size());
+  return Status::OK();
+}
+
+Status UnionNode::GetNextConst(RuntimeState* state, RowBatch* row_batch) {
+  DCHECK_EQ(state->instance_ctx().per_fragment_instance_idx, 0);
+  DCHECK_LT(const_expr_list_idx_, const_expr_lists_.size());
+  // Create new tuple buffer for row_batch.
+  int64_t tuple_buf_size;
+  uint8_t* tuple_buf;
+  RETURN_IF_ERROR(
+      row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buf_size, 
&tuple_buf));
+  memset(tuple_buf, 0, tuple_buf_size);
+  while (const_expr_list_idx_ < const_expr_lists_.size() &&
+      !row_batch->AtCapacity() && !ReachedLimit()) {
+    MaterializeExprs(
+        const_expr_lists_[const_expr_list_idx_], nullptr, tuple_buf, 
row_batch);
+    tuple_buf += tuple_desc_->byte_size();
+    ++const_expr_list_idx_;
   }
 
   COUNTER_SET(rows_returned_counter_, num_rows_returned_);
-  *eos = true;
+  return Status::OK();
+}
+
+Status UnionNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) 
{
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
+  RETURN_IF_CANCELLED(state);
+  RETURN_IF_ERROR(QueryMaintenance(state));
+
+  if (to_close_child_idx_ != -1) {
+    // The previous child needs to be closed if passthrough was enabled for 
it. In the non
+    // passthrough case, the child was already closed in the previous call to 
GetNext().
+    DCHECK(IsChildPassthrough(to_close_child_idx_));
+    child(to_close_child_idx_)->Close(state);
+    to_close_child_idx_ = -1;
+  }
+
+  if (HasMorePassthrough()) {
+    RETURN_IF_ERROR(GetNextPassThrough(state, row_batch));
+  } else if (HasMoreMaterialized()) {
+    RETURN_IF_ERROR(GetNextMaterialized(state, row_batch));
+  } else if (HasMoreConst(state)) {
+    RETURN_IF_ERROR(GetNextConst(state, row_batch));
+  }
+
+  *eos = ReachedLimit() ||
+      (!HasMorePassthrough() && !HasMoreMaterialized() && 
!HasMoreConst(state));
+
   return Status::OK();
 }
 
@@ -205,11 +278,14 @@ void UnionNode::MaterializeExprs(const 
vector<ExprContext*>& exprs,
 }
 
 Status UnionNode::Reset(RuntimeState* state) {
-  child_row_idx_ = 0;
-  const_expr_list_idx_ = 0;
   child_idx_ = 0;
   child_batch_.reset();
+  child_row_idx_ = 0;
   child_eos_ = false;
+  const_expr_list_idx_ = 0;
+  // Since passthrough is disabled in subplans, verify that there is no 
passthrough
+  // child that needs to be closed.
+  DCHECK_EQ(to_close_child_idx_, -1);
   return ExecNode::Reset(state);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/be/src/exec/union-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node.h b/be/src/exec/union-node.h
index cc835cd..e1474d0 100644
--- a/be/src/exec/union-node.h
+++ b/be/src/exec/union-node.h
@@ -23,20 +23,23 @@
 
 #include "exec/exec-node.h"
 #include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
 
 namespace impala {
 
 class DescriptorTbl;
 class ExprContext;
-class RuntimeState;
 class Tuple;
 class TupleRow;
 class TPlanNode;
 
-/// Node that merges the results of its children by materializing their
-/// evaluated expressions into row batches. The UnionNode pulls row batches 
from its
-/// children sequentially, i.e., it exhausts one child completely before moving
-/// on to the next one.
+/// Node that merges the results of its children by either materializing their
+/// evaluated expressions into row batches or passing through (forwarding) the
+/// batches if the input tuple layout is identical to the output tuple layout
+/// and expressions don't need to be evaluated. The children should be ordered
+/// such that all passthrough children come before the children that need
+/// materialization. The union node pulls from its children sequentially, i.e.
+/// it exhausts one child completely before moving on to the next one.
 class UnionNode : public ExecNode {
  public:
   UnionNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& 
descs);
@@ -55,6 +58,11 @@ class UnionNode : public ExecNode {
   /// Descriptor for tuples this union node constructs.
   const TupleDescriptor* tuple_desc_;
 
+  /// Index of the first non-passthrough child; i.e. a child that needs 
materialization.
+  /// 0 when all children are materialized, 'children_.size()' when no 
children are
+  /// materialized.
+  const int first_materialized_child_idx_;
+
   /// Const exprs materialized by this node. These exprs don't refer to any 
children.
   /// Only materialized by the first fragment instance to avoid duplication.
   std::vector<std::vector<ExprContext*>> const_expr_lists_;
@@ -81,17 +89,53 @@ class UnionNode : public ExecNode {
   /// Index of current const result expr list.
   int const_expr_list_idx_;
 
+  /// Index of the child that needs to be closed on the next GetNext() call. 
Should be set
+  /// to -1 if no child needs to be closed.
+  int to_close_child_idx_;
+
   /// END: Members that must be Reset()
   /////////////////////////////////////////
 
-  /// Opens the child at child_idx_, fetches the first batch into 
child_row_batch_,
-  /// and sets child_row_idx_ to 0. May set child_eos_.
-  Status OpenCurrentChild(RuntimeState* state);
+  /// GetNext() for the passthrough case. We pass 'row_batch' directly into 
the GetNext()
+  /// call on the child.
+  Status GetNextPassThrough(RuntimeState* state, RowBatch* row_batch);
+
+  /// GetNext() for the materialized case. Materializes and evaluates rows 
from each
+  /// non-passthrough child.
+  Status GetNextMaterialized(RuntimeState* state, RowBatch* row_batch);
+
+  /// GetNext() for the constant expression case.
+  Status GetNextConst(RuntimeState* state, RowBatch* row_batch);
 
   /// Evaluates 'exprs' over 'row', materializes the results in 'tuple_buf'.
   /// and appends the new tuple to 'dst_batch'. Increments 
'num_rows_returned_'.
   inline void MaterializeExprs(const std::vector<ExprContext*>& exprs,
       TupleRow* row, uint8_t* tuple_buf, RowBatch* dst_batch);
+
+  /// Returns true if the child at 'child_idx' can be passed through.
+  inline bool IsChildPassthrough(int child_idx) const {
+    DCHECK_LT(child_idx, children_.size());
+    return child_idx < first_materialized_child_idx_;
+  }
+
+  /// Returns true if there are still rows to be returned from passthrough 
children.
+  inline bool HasMorePassthrough() const {
+    return child_idx_ < first_materialized_child_idx_;
+  }
+
+  /// Returns true if there are still rows to be returned from children that 
need
+  /// materialization.
+  inline bool HasMoreMaterialized() const {
+    return first_materialized_child_idx_ != children_.size() &&
+        child_idx_ < children_.size();
+  }
+
+  /// Returns true if there are still rows to be returned from constant 
expressions.
+  inline bool HasMoreConst(const RuntimeState* state) const {
+    return state->instance_ctx().per_fragment_instance_idx == 0 &&
+        const_expr_list_idx_ < const_expr_lists_.size();
+  }
+
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/be/src/runtime/buffered-tuple-stream.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.cc 
b/be/src/runtime/buffered-tuple-stream.cc
index 86695f5..bfe933c 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -547,7 +547,7 @@ template <bool FILL_INDICES, bool HAS_NULLABLE_TUPLE>
 Status BufferedTupleStream::GetNextInternal(RowBatch* batch, bool* eos,
     vector<RowIdx>* indices) {
   DCHECK(!closed_);
-  DCHECK(batch->row_desc().Equals(desc_));
+  DCHECK(batch->row_desc().LayoutEquals(desc_));
   *eos = (rows_returned_ == num_rows_);
   if (*eos) return Status::OK();
   DCHECK_GE(read_block_null_indicators_size_, 0);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/be/src/runtime/descriptors.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index f8ec79c..5f71bf7 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -128,6 +128,15 @@ string SlotDescriptor::DebugString() const {
   return out.str();
 }
 
+bool SlotDescriptor::LayoutEquals(const SlotDescriptor& other_desc) const {
+  if (type() != other_desc.type()) return false;
+  if (is_nullable() != other_desc.is_nullable()) return false;
+  if (slot_size() != other_desc.slot_size()) return false;
+  if (tuple_offset() != other_desc.tuple_offset()) return false;
+  if (!null_indicator_offset().Equals(other_desc.null_indicator_offset())) 
return false;
+  return true;
+}
+
 ColumnDescriptor::ColumnDescriptor(const TColumnDescriptor& tdesc)
   : name_(tdesc.name),
     type_(ColumnType::FromThrift(tdesc.type)) {
@@ -333,6 +342,17 @@ string TupleDescriptor::DebugString() const {
   return out.str();
 }
 
+bool TupleDescriptor::LayoutEquals(const TupleDescriptor& other_desc) const {
+  const vector<SlotDescriptor*>& slots = this->slots();
+  const vector<SlotDescriptor*>& other_slots = other_desc.slots();
+  if (this->byte_size() != other_desc.byte_size()) return false;
+  if (slots.size() != other_slots.size()) return false;
+  for (int i = 0; i < slots.size(); ++i) {
+    if (!slots[i]->LayoutEquals(*other_slots[i])) return false;
+  }
+  return true;
+}
+
 RowDescriptor::RowDescriptor(const DescriptorTbl& desc_tbl,
                              const vector<TTupleId>& row_tuples,
                              const vector<bool>& nullable_tuples)
@@ -436,13 +456,22 @@ bool RowDescriptor::IsPrefixOf(const RowDescriptor& 
other_desc) const {
 
 bool RowDescriptor::Equals(const RowDescriptor& other_desc) const {
   if (tuple_desc_map_.size() != other_desc.tuple_desc_map_.size()) return 
false;
+  return IsPrefixOf(other_desc);
+}
+
+bool RowDescriptor::LayoutIsPrefixOf(const RowDescriptor& other_desc) const {
+  if (tuple_desc_map_.size() > other_desc.tuple_desc_map_.size()) return false;
   for (int i = 0; i < tuple_desc_map_.size(); ++i) {
-    // pointer comparison okay, descriptors are unique
-    if (tuple_desc_map_[i] != other_desc.tuple_desc_map_[i]) return false;
+    if (!tuple_desc_map_[i]->LayoutEquals(*other_desc.tuple_desc_map_[i])) 
return false;
   }
   return true;
 }
 
+bool RowDescriptor::LayoutEquals(const RowDescriptor& other_desc) const {
+  if (tuple_desc_map_.size() != other_desc.tuple_desc_map_.size()) return 
false;
+  return LayoutIsPrefixOf(other_desc);
+}
+
 string RowDescriptor::DebugString() const {
   stringstream ss;
   for (int i = 0; i < tuple_desc_map_.size(); ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/be/src/runtime/descriptors.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 4a50800..3fd6942 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -95,6 +95,10 @@ struct NullIndicatorOffset {
       bit_mask(bit_offset == -1 ? 0 : 1 << bit_offset) {
   }
 
+  bool Equals(const NullIndicatorOffset& o) const {
+    return this->byte_offset == o.byte_offset && this->bit_mask == o.bit_mask;
+  }
+
   std::string DebugString() const;
 };
 
@@ -135,6 +139,10 @@ class SlotDescriptor {
 
   std::string DebugString() const;
 
+  /// Return true if the physical layout of this descriptor matches the 
physical layout
+  /// of other_desc, but not necessarily ids.
+  bool LayoutEquals(const SlotDescriptor& other_desc) const;
+
   /// Generate LLVM code at the insert position of 'builder' that returns a 
boolean value
   /// represented as a LLVM i1 indicating whether this slot is null in 'tuple'.
   llvm::Value* CodegenIsNull(
@@ -397,6 +405,10 @@ class TupleDescriptor {
   /// Returns true if this tuple or any nested collection item tuples have 
string slots.
   bool ContainsStringData() const;
 
+  /// Return true if the physical layout of this descriptor matches that of 
other_desc,
+  /// but not necessarily the id.
+  bool LayoutEquals(const TupleDescriptor& other_desc) const;
+
   /// Creates a typed struct description for llvm.  The layout of the struct 
is computed
   /// by the FE which includes the order of the fields in the resulting struct.
   /// Returns the struct type or NULL if the type could not be created.
@@ -541,6 +553,14 @@ class RowDescriptor {
   /// Return true if the tuple ids of this descriptor match tuple ids of other 
desc.
   bool Equals(const RowDescriptor& other_desc) const;
 
+  /// Return true if the tuples of this descriptor are a prefix of the tuples 
of
+  /// other_desc. Tuples are compared by their physical layout and not by ids.
+  bool LayoutIsPrefixOf(const RowDescriptor& other_desc) const;
+
+  /// Return true if the physical layout of this descriptor matches the 
physical layout
+  /// of other_desc, but not necessarily the ids.
+  bool LayoutEquals(const RowDescriptor& other_desc) const;
+
   std::string DebugString() const;
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index 8fb0b55..8dfc4ba 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -381,7 +381,7 @@ int RowBatch::GetBatchSize(const TRowBatch& batch) {
 }
 
 void RowBatch::AcquireState(RowBatch* src) {
-  DCHECK(row_desc_.Equals(src->row_desc_));
+  DCHECK(row_desc_.LayoutEquals(src->row_desc_));
   DCHECK_EQ(num_tuples_per_row_, src->num_tuples_per_row_);
   DCHECK_EQ(tuple_ptrs_size_, src->tuple_ptrs_size_);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index 87832ad..c238e85 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -225,7 +225,7 @@ enum TImpalaQueryOptions {
   // those queries, the coordinator deletes all files in the final location 
before copying
   // the files there.
   // TODO: Find a way to get this working for INSERT OVERWRITEs too.
-  S3_SKIP_INSERT_STAGING
+  S3_SKIP_INSERT_STAGING,
 
   // Maximum runtime filter size, in bytes.
   RUNTIME_FILTER_MAX_SIZE,
@@ -265,7 +265,7 @@ enum TImpalaQueryOptions {
   // It is preserved as the default for compatibility.
   // TODO: Remove the TWO_LEVEL_THEN_THREE_LEVEL mode completely or at least 
make
   // it non-default in a compatibility breaking release.
-  PARQUET_ARRAY_RESOLUTION
+  PARQUET_ARRAY_RESOLUTION,
 }
 
 // The summary of a DML statement.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 19a654d..b83b84d 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -445,6 +445,8 @@ struct TUnionNode {
   2: required list<list<Exprs.TExpr>> result_expr_lists
   // Separate list of expr lists coming from a constant select stmts.
   3: required list<list<Exprs.TExpr>> const_expr_lists
+  // Index of the first child that needs to be materialized.
+  4: required i64 first_materialized_child_idx
 }
 
 struct TExchangeNode {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java 
b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
index 109cfa2..f67ac8b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
@@ -220,6 +220,20 @@ public class SlotDescriptor {
     setType(expr.getType());
   }
 
+  /**
+   * Return true if the physical layout of this descriptor matches the 
physical layout
+   * of the other descriptor, but not necessarily ids.
+   */
+  public boolean LayoutEquals(SlotDescriptor other) {
+    if (!getType().equals(other.getType())) return false;
+    if (isNullable_ != other.isNullable_) return false;
+    if (getByteSize() != other.getByteSize()) return false;
+    if (getByteOffset() != other.getByteOffset()) return false;
+    if (getNullIndicatorByte() != other.getNullIndicatorByte()) return false;
+    if (getNullIndicatorBit() != other.getNullIndicatorBit()) return false;
+    return true;
+  }
+
   public TSlotDescriptor toThrift() {
     Preconditions.checkState(isMaterialized_);
     List<Integer> materializedPath = getMaterializedPath();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java 
b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
index 077c4cb..2e501c1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
@@ -335,18 +335,6 @@ public class TupleDescriptor {
   }
 
   /**
-   * Returns true if tuples of type 'this' can be assigned to tuples of type 
'desc'
-   * (checks that both have the same number of slots and that slots are of the 
same type)
-   */
-  public boolean isCompatible(TupleDescriptor desc) {
-    if (slots_.size() != desc.slots_.size()) return false;
-    for (int i = 0; i < slots_.size(); ++i) {
-      if (!slots_.get(i).getType().equals(desc.slots_.get(i).getType())) 
return false;
-    }
-    return true;
-  }
-
-  /**
    * Returns a list of slot ids that correspond to partition columns.
    */
   public List<SlotId> getPartitionSlots() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
index a0e7164..f811e49 100644
--- a/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
@@ -152,6 +152,10 @@ public class UnionStmt extends QueryStmt {
   // true if any of the operands_ references an AnalyticExpr
   private boolean hasAnalyticExprs_ = false;
 
+  // List of output expressions produced by the union without the ORDER BY 
portion
+  // (if any). Same as resultExprs_ if there is no ORDER BY.
+  private List<Expr> unionResultExprs_ = Lists.newArrayList();
+
   // END: Members that need to be reset()
   /////////////////////////////////////////
 
@@ -183,6 +187,7 @@ public class UnionStmt extends QueryStmt {
     toSqlString_ = (other.toSqlString_ != null) ? new 
String(other.toSqlString_) : null;
     hasAnalyticExprs_ = other.hasAnalyticExprs_;
     withClause_ = (other.withClause_ != null) ? other.withClause_.clone() : 
null;
+    unionResultExprs_ = Expr.cloneList(other.unionResultExprs_);
   }
 
   public List<UnionOperand> getOperands() { return operands_; }
@@ -262,6 +267,7 @@ public class UnionStmt extends QueryStmt {
       }
     }
 
+    unionResultExprs_ = Expr.cloneList(resultExprs_);
     if (evaluateOrderBy_) createSortTupleInfo(analyzer);
     baseTblResultExprs_ = resultExprs_;
   }
@@ -503,6 +509,7 @@ public class UnionStmt extends QueryStmt {
         }
       }
 
+      boolean isNullable = false;
       // register single-directional value transfers from output slot
       // to operands' result exprs (if those happen to be slotrefs);
       // don't do that if the operand computes analytic exprs
@@ -510,11 +517,16 @@ public class UnionStmt extends QueryStmt {
       for (UnionOperand op: operands_) {
         Expr resultExpr = op.getQueryStmt().getResultExprs().get(i);
         slotDesc.addSourceExpr(resultExpr);
+        SlotRef slotRef = resultExpr.unwrapSlotRef(false);
+        if (slotRef == null || slotRef.getDesc().getIsNullable()) isNullable = 
true;
         if (op.hasAnalyticExprs()) continue;
-        SlotRef slotRef = resultExpr.unwrapSlotRef(true);
+        slotRef = resultExpr.unwrapSlotRef(true);
         if (slotRef == null) continue;
         analyzer.registerValueTransfer(outputSlotRef.getSlotId(), 
slotRef.getSlotId());
       }
+      // If all the child slots are not nullable, then the union output slot 
should not
+      // be nullable as well.
+      slotDesc.setIsNullable(isNullable);
     }
     baseTblResultExprs_ = resultExprs_;
   }
@@ -601,6 +613,8 @@ public class UnionStmt extends QueryStmt {
     return operands_.get(0).getQueryStmt().getColLabels();
   }
 
+  public List<Expr> getUnionResultExprs() { return unionResultExprs_; }
+
   @Override
   public UnionStmt clone() { return new UnionStmt(this); }
 
@@ -621,5 +635,6 @@ public class UnionStmt extends QueryStmt {
     tupleId_ = null;
     toSqlString_ = null;
     hasAnalyticExprs_ = false;
+    unionResultExprs_.clear();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 71ddd5a..8fe3cf9 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -681,7 +681,6 @@ public class DistributedPlanner {
         childFragment.setOutputPartition(DataPartition.RANDOM);
       }
     }
-    unionNode.reorderOperands(ctx_.getRootAnalyzer());
     unionNode.init(ctx_.getRootAnalyzer());
     return unionFragment;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java 
b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index ed43421..5c8218c 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -1529,7 +1529,8 @@ public class SingleNodePlanner {
       Analyzer analyzer, UnionStmt unionStmt, List<UnionOperand> unionOperands,
       PlanNode unionDistinctPlan)
       throws ImpalaException {
-    UnionNode unionNode = new UnionNode(ctx_.getNextNodeId(), 
unionStmt.getTupleId());
+    UnionNode unionNode = new UnionNode(ctx_.getNextNodeId(), 
unionStmt.getTupleId(),
+        unionStmt.getUnionResultExprs(), ctx_.hasSubplan());
     for (UnionOperand op: unionOperands) {
       if (op.getAnalyzer().hasEmptyResultSet()) {
         unmarkCollectionSlots(op.getQueryStmt());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/fe/src/main/java/org/apache/impala/planner/UnionNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/UnionNode.java 
b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
index e88b0e7..d724c59 100644
--- a/fe/src/main/java/org/apache/impala/planner/UnionNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
@@ -18,15 +18,14 @@
 package org.apache.impala.planner;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.Expr;
-import org.apache.impala.analysis.SlotDescriptor;
+import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
-import org.apache.impala.common.Pair;
+import org.apache.impala.analysis.SlotDescriptor;
+import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TExpr;
 import org.apache.impala.thrift.TPlanNode;
@@ -35,16 +34,26 @@ import org.apache.impala.thrift.TUnionNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
- * Node that merges the results of its child plans by materializing
- * the corresponding result exprs into a new tuple.
+ * Node that merges the results of its child plans, Normally, this is done by
+ * materializing the corresponding result exprs into a new tuple. However, if
+ * a child has an identical tuple layout as the output of the union node, and
+ * the child only has naked SlotRefs as result exprs, then the child is marked
+ * as 'passthrough'. The rows of passthrough children are directly returned by
+ * the union node, instead of materializing the child's result exprs into new
+ * tuples.
  */
 public class UnionNode extends PlanNode {
   private final static Logger LOG = LoggerFactory.getLogger(UnionNode.class);
 
+  // List of union result exprs of the originating UnionStmt. Used for
+  // determining passthrough-compatibility of children.
+  protected List<Expr> unionResultExprs_;
+
   // Expr lists corresponding to the input query stmts.
   // The ith resultExprList belongs to the ith child.
   // All exprs are resolved to base tables.
@@ -59,11 +68,27 @@ public class UnionNode extends PlanNode {
   protected List<List<Expr>> materializedResultExprLists_ = 
Lists.newArrayList();
   protected List<List<Expr>> materializedConstExprLists_ = 
Lists.newArrayList();
 
+  // Indicates if this UnionNode is inside a subplan.
+  protected boolean isInSubplan_;
+
+  // Index of the first non-passthrough child.
+  protected int firstMaterializedChildIdx_;
+
   protected final TupleId tupleId_;
 
   protected UnionNode(PlanNodeId id, TupleId tupleId) {
     super(id, tupleId.asList(), "UNION");
+    unionResultExprs_ = Lists.newArrayList();
+    tupleId_ = tupleId;
+    isInSubplan_ = false;
+  }
+
+  protected UnionNode(PlanNodeId id, TupleId tupleId,
+        List<Expr> unionResultExprs, boolean isInSubplan) {
+    super(id, tupleId.asList(), "UNION");
+    unionResultExprs_ = unionResultExprs;
     tupleId_ = tupleId;
+    isInSubplan_ = isInSubplan;
   }
 
   public void addConstExprList(List<Expr> exprs) { constExprLists_.add(exprs); 
}
@@ -103,51 +128,80 @@ public class UnionNode extends PlanNode {
   }
 
   /**
-   * Re-order the union's operands descending by their estimated per-host 
memory,
-   * such that parent nodes can gauge the peak memory consumption of this 
MergeNode after
-   * opening it during execution (a MergeNode opens its first operand in 
Open()).
-   * Scan nodes are always ordered last because they can dynamically scale 
down their
-   * memory usage, whereas many other nodes cannot (e.g., joins, aggregations).
-   * One goal is to decrease the likelihood of a SortNode parent claiming too 
much
-   * memory in its Open(), possibly causing the mem limit to be hit when 
subsequent
-   * union operands are executed.
-   * Can only be called on a fragmented plan because this function calls 
computeCosts()
-   * on this node's children.
-   * TODO: Come up with a good way of handing memory out to individual 
operators so that
-   * they don't trip each other up. Then remove this function.
+   * Returns true if rows from the child with 'childTupleIds' and 
'childResultExprs' can
+   * be returned directly by the union node (without materialization into a 
new tuple).
    */
-  public void reorderOperands(Analyzer analyzer) {
-    Preconditions.checkNotNull(fragment_,
-        "Operands can only be reordered on the fragmented plan.");
-
-    // List of estimated per-host memory consumption (first) by child index 
(second).
-    List<Pair<Long, Integer>> memByChildIdx = Lists.newArrayList();
-    for (int i = 0; i < children_.size(); ++i) {
-      PlanNode child = children_.get(i);
-      
child.computeCosts(analyzer.getQueryCtx().client_request.getQuery_options());
-      memByChildIdx.add(new Pair<Long, Integer>(child.getPerHostMemCost(), i));
+  private boolean isChildPassthrough(
+      Analyzer analyzer, PlanNode childNode, List<Expr> childExprList) {
+    List<TupleId> childTupleIds = childNode.getTupleIds();
+    // Check that if the child outputs a single tuple, then it's not nullable. 
Tuple
+    // nullability can be considered to be part of the physical row layout.
+    Preconditions.checkState(childTupleIds.size() != 1 ||
+        !childNode.getNullableTupleIds().contains(childTupleIds.get(0)));
+    // If the Union node is inside a subplan, passthrough should be disabled 
to avoid
+    // performance issues by forcing tiny batches.
+    // TODO: Remove this as part of IMPALA-4179.
+    if (isInSubplan_) return false;
+    // Pass through is only done for the simple case where the row has a 
single tuple. One
+    // of the motivations for this is that the output of a UnionNode is a row 
with a
+    // single tuple.
+    if (childTupleIds.size() != 1) return false;
+    Preconditions.checkState(!unionResultExprs_.isEmpty());
+
+    TupleDescriptor unionTupleDescriptor = 
analyzer.getDescTbl().getTupleDesc(tupleId_);
+    TupleDescriptor childTupleDescriptor =
+        analyzer.getDescTbl().getTupleDesc(childTupleIds.get(0));
+
+    // Verify that the union tuple descriptor has one slot for every 
expression.
+    Preconditions.checkState(
+        unionTupleDescriptor.getSlots().size() == unionResultExprs_.size());
+    // Verify that the union node has one slot for every child expression.
+    Preconditions.checkState(
+        unionTupleDescriptor.getSlots().size() == childExprList.size());
+
+    if (unionResultExprs_.size() != childTupleDescriptor.getSlots().size()) 
return false;
+    if (unionTupleDescriptor.getByteSize() != 
childTupleDescriptor.getByteSize()) {
+      return false;
     }
 
-    Collections.sort(memByChildIdx,
-        new Comparator<Pair<Long, Integer>>() {
-      public int compare(Pair<Long, Integer> a, Pair<Long, Integer> b) {
-        PlanNode aNode = children_.get(a.second);
-        PlanNode bNode = children_.get(b.second);
-        // Order scan nodes last because they can dynamically scale down their 
mem.
-        if (bNode instanceof ScanNode && !(aNode instanceof ScanNode)) return 
-1;
-        if (aNode instanceof ScanNode && !(bNode instanceof ScanNode)) return 
1;
-        long diff = b.first - a.first;
-        return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
-      }
-    });
+    for (int i = 0; i < unionResultExprs_.size(); ++i) {
+      if (!unionTupleDescriptor.getSlots().get(i).isMaterialized()) continue;
+      SlotRef unionSlotRef = unionResultExprs_.get(i).unwrapSlotRef(false);
+      SlotRef childSlotRef = childExprList.get(i).unwrapSlotRef(false);
+      Preconditions.checkNotNull(unionSlotRef);
+      if (childSlotRef == null) return false;
+      if (!childSlotRef.getDesc().LayoutEquals(unionSlotRef.getDesc())) return 
false;
+    }
+    return true;
+  }
 
+  /**
+   * Compute which children are passthrough and reorder them such that the 
passthrough
+   * children come before the children that need to be materialized. Also 
reorder
+   * 'resultExprLists_'. The children are reordered to simplify the 
implementation in the
+   * BE.
+   */
+   void computePassthrough(Analyzer analyzer) {
     List<List<Expr>> newResultExprLists = Lists.newArrayList();
     ArrayList<PlanNode> newChildren = Lists.newArrayList();
-    for (Pair<Long, Integer> p: memByChildIdx) {
-      newResultExprLists.add(resultExprLists_.get(p.second));
-      newChildren.add(children_.get(p.second));
+    for (int i = 0; i < children_.size(); i++) {
+      if (isChildPassthrough(analyzer, children_.get(i), 
resultExprLists_.get(i))) {
+        newResultExprLists.add(resultExprLists_.get(i));
+        newChildren.add(children_.get(i));
+      }
     }
+    firstMaterializedChildIdx_ = newChildren.size();
+
+    for (int i = 0; i < children_.size(); i++) {
+      if (!isChildPassthrough(analyzer, children_.get(i), 
resultExprLists_.get(i))) {
+        newResultExprLists.add(resultExprLists_.get(i));
+        newChildren.add(children_.get(i));
+      }
+    }
+
+    Preconditions.checkState(resultExprLists_.size() == 
newResultExprLists.size());
     resultExprLists_ = newResultExprLists;
+    Preconditions.checkState(children_.size() == newChildren.size());
     children_ = newChildren;
   }
 
@@ -165,6 +219,7 @@ public class UnionNode extends PlanNode {
     Preconditions.checkState(conjuncts_.isEmpty());
     computeMemLayout(analyzer);
     computeStats(analyzer);
+    computePassthrough(analyzer);
 
     // drop resultExprs/constExprs that aren't getting materialized (= where 
the
     // corresponding output slot isn't being materialized)
@@ -206,7 +261,9 @@ public class UnionNode extends PlanNode {
     for (List<Expr> constTexprList: materializedConstExprLists_) {
       constTexprLists.add(Expr.treesToThrift(constTexprList));
     }
-    msg.union_node = new TUnionNode(tupleId_.asInt(), texprLists, 
constTexprLists);
+    Preconditions.checkState(firstMaterializedChildIdx_ <= children_.size());
+    msg.union_node = new TUnionNode(
+        tupleId_.asInt(), texprLists, constTexprLists, 
firstMaterializedChildIdx_);
     msg.node_type = TPlanNodeType.UNION_NODE;
   }
 
@@ -223,6 +280,20 @@ public class UnionNode extends PlanNode {
     if (!constExprLists_.isEmpty()) {
       output.append(detailPrefix + "constant-operands=" + 
constExprLists_.size() + "\n");
     }
+    if (detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
+      List<String> passThroughNodeIds = Lists.newArrayList();
+      for (int i = 0; i < firstMaterializedChildIdx_; ++i) {
+        passThroughNodeIds.add(children_.get(i).getId().toString());
+      }
+      if (!passThroughNodeIds.isEmpty()) {
+        String result = detailPrefix + "pass-through-operands: ";
+        if (passThroughNodeIds.size() == children_.size()) {
+          output.append(result + "all\n");
+        } else {
+          output.append(result + Joiner.on(",").join(passThroughNodeIds) + 
"\n");
+        }
+      }
+    }
     return output.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 9b641a0..c6a71e3 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -3511,7 +3511,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
   @Test
   public void TestClone() {
     testNumberOfMembers(QueryStmt.class, 9);
-    testNumberOfMembers(UnionStmt.class, 8);
+    testNumberOfMembers(UnionStmt.class, 9);
     testNumberOfMembers(ValuesStmt.class, 0);
 
     // Also check TableRefs.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
index 2fec7c4..a92cee3 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
@@ -184,6 +184,7 @@ PLAN-ROOT SINK
 |  limit: 10
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -203,6 +204,7 @@ PLAN-ROOT SINK
 |  output: count(*)
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -226,6 +228,7 @@ PLAN-ROOT SINK
 |  limit: 10
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -250,6 +253,7 @@ PLAN-ROOT SINK
 |  group by: bigint_col
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -275,6 +279,7 @@ PLAN-ROOT SINK
 |  group by: int_col
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -302,6 +307,7 @@ PLAN-ROOT SINK
 |  group by: int_col
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -329,6 +335,7 @@ PLAN-ROOT SINK
 |  group by: bigint_col, int_col
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -355,6 +362,7 @@ PLAN-ROOT SINK
 |  group by: bigint_col, int_col
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -381,6 +389,7 @@ PLAN-ROOT SINK
 |  group by: int_col
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -410,6 +419,7 @@ PLAN-ROOT SINK
 |  group by: int_col
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -438,6 +448,7 @@ PLAN-ROOT SINK
 |  group by: bigint_col, int_col
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -466,6 +477,7 @@ PLAN-ROOT SINK
 |  group by: bigint_col, int_col
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -497,6 +509,7 @@ PLAN-ROOT SINK
 |  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -533,6 +546,7 @@ PLAN-ROOT SINK
 |  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -576,6 +590,7 @@ PLAN-ROOT SINK
 |  output: count(*)
 |
 00:UNION
+|  pass-through-operands: all
 |  limit: 10
 |
 |--02:SCAN HDFS [functional.alltypessmall]
@@ -593,6 +608,7 @@ PLAN-ROOT SINK
 |  limit: 10
 |
 00:UNION
+|  pass-through-operands: all
 |  limit: 10
 |
 |--02:SCAN HDFS [functional.alltypessmall]

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
index de5e4cb..cb7dd01 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
@@ -1138,43 +1138,44 @@ PLAN-ROOT SINK
 |  order by: min(id) OVER(...) DESC NULLS FIRST
 |
 08:UNION
+|  pass-through-operands: 07
 |
-|--07:AGGREGATE [FINALIZE]
-|  |  group by: min(id) OVER(...)
-|  |
-|  00:UNION
+|--11:ANALYTIC
+|  |  functions: sum(bigint_col)
+|  |  partition by: int_col
+|  |  order by: id ASC
+|  |  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
 |  |
-|  |--06:ANALYTIC
-|  |  |  functions: max(id)
-|  |  |  partition by: bool_col
-|  |  |
-|  |  05:SORT
-|  |  |  order by: bool_col ASC NULLS FIRST
-|  |  |
-|  |  04:SCAN HDFS [functional.alltypessmall]
-|  |     partitions=4/4 files=4 size=6.32KB
+|  10:SORT
+|  |  order by: int_col ASC NULLS FIRST, id ASC
 |  |
-|  03:ANALYTIC
-|  |  functions: min(id)
-|  |  partition by: int_col
+|  09:SCAN HDFS [functional.alltypestiny]
+|     partitions=4/4 files=4 size=460B
+|
+07:AGGREGATE [FINALIZE]
+|  group by: min(id) OVER(...)
+|
+00:UNION
+|
+|--06:ANALYTIC
+|  |  functions: max(id)
+|  |  partition by: bool_col
 |  |
-|  02:SORT
-|  |  order by: int_col ASC NULLS FIRST
+|  05:SORT
+|  |  order by: bool_col ASC NULLS FIRST
 |  |
-|  01:SCAN HDFS [functional.alltypes]
-|     partitions=24/24 files=24 size=478.45KB
+|  04:SCAN HDFS [functional.alltypessmall]
+|     partitions=4/4 files=4 size=6.32KB
 |
-11:ANALYTIC
-|  functions: sum(bigint_col)
+03:ANALYTIC
+|  functions: min(id)
 |  partition by: int_col
-|  order by: id ASC
-|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
 |
-10:SORT
-|  order by: int_col ASC NULLS FIRST, id ASC
+02:SORT
+|  order by: int_col ASC NULLS FIRST
 |
-09:SCAN HDFS [functional.alltypestiny]
-   partitions=4/4 files=4 size=460B
+01:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -1185,6 +1186,7 @@ PLAN-ROOT SINK
 |  order by: min(id) OVER(...) DESC NULLS FIRST
 |
 08:UNION
+|  pass-through-operands: 16
 |
 |--11:ANALYTIC
 |  |  functions: sum(bigint_col)
@@ -1195,15 +1197,15 @@ PLAN-ROOT SINK
 |  10:SORT
 |  |  order by: int_col ASC NULLS FIRST, id ASC
 |  |
-|  13:EXCHANGE [HASH(int_col)]
+|  17:EXCHANGE [HASH(int_col)]
 |  |
 |  09:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
 |
-17:AGGREGATE [FINALIZE]
+16:AGGREGATE [FINALIZE]
 |  group by: min(id) OVER(...)
 |
-16:EXCHANGE [HASH(min(id) OVER(...))]
+15:EXCHANGE [HASH(min(id) OVER(...))]
 |
 07:AGGREGATE [STREAMING]
 |  group by: min(id) OVER(...)
@@ -1217,7 +1219,7 @@ PLAN-ROOT SINK
 |  05:SORT
 |  |  order by: bool_col ASC NULLS FIRST
 |  |
-|  15:EXCHANGE [HASH(bool_col)]
+|  14:EXCHANGE [HASH(bool_col)]
 |  |
 |  04:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -1229,7 +1231,7 @@ PLAN-ROOT SINK
 02:SORT
 |  order by: int_col ASC NULLS FIRST
 |
-14:EXCHANGE [HASH(int_col)]
+13:EXCHANGE [HASH(int_col)]
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
@@ -1814,25 +1816,25 @@ PLAN-ROOT SINK
 |
 00:UNION
 |
-|--01:SCAN HDFS [functional.alltypestiny t1]
-|     partitions=4/4 files=4 size=460B
-|     predicates: t1.id IS NULL, t1.tinyint_col != 5
-|
-08:EXCHANGE [RANDOM]
-|
-04:ANALYTIC
-|  functions: dense_rank()
-|  order by: id ASC
-|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-|
-07:MERGING-EXCHANGE [UNPARTITIONED]
-|  order by: id ASC
-|
-03:SORT
-|  order by: id ASC
+|--08:EXCHANGE [RANDOM]
+|  |
+|  04:ANALYTIC
+|  |  functions: dense_rank()
+|  |  order by: id ASC
+|  |  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  |
+|  07:MERGING-EXCHANGE [UNPARTITIONED]
+|  |  order by: id ASC
+|  |
+|  03:SORT
+|  |  order by: id ASC
+|  |
+|  02:SCAN HDFS [functional.alltypes t1]
+|     partitions=24/24 files=24 size=478.45KB
 |
-02:SCAN HDFS [functional.alltypes t1]
-   partitions=24/24 files=24 size=478.45KB
+01:SCAN HDFS [functional.alltypestiny t1]
+   partitions=4/4 files=4 size=460B
+   predicates: t1.id IS NULL, t1.tinyint_col != 5
 ====
 # Propagate a predicate on a partition key through an inline view that has an 
analytic
 # function. Predicates that are not compatible with analytic function's 
partition by

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/testdata/workloads/functional-planner/queries/PlannerTest/empty.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/empty.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/empty.test
index 014a1ea..aa4d313 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/empty.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/empty.test
@@ -110,6 +110,7 @@ select * from functional.alltypestiny
 PLAN-ROOT SINK
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
@@ -250,6 +251,7 @@ select * from functional.alltypestiny
 PLAN-ROOT SINK
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
@@ -387,6 +389,7 @@ from functional.alltypes where false
 PLAN-ROOT SINK
 |
 02:UNION
+|  pass-through-operands: all
 |
 01:AGGREGATE [FINALIZE]
 |  group by: lead(-496, 81, NULL) OVER(...)
@@ -411,14 +414,15 @@ PLAN-ROOT SINK
 |
 02:UNION
 |  constant-operands=1
+|  pass-through-operands: 01
 |
-|--01:AGGREGATE [FINALIZE]
-|  |  group by: lead(-496, 81, NULL) OVER(...)
-|  |
-|  00:UNION
+|--03:SCAN HDFS [functional.alltypestiny]
+|     partitions=4/4 files=4 size=460B
+|
+01:AGGREGATE [FINALIZE]
+|  group by: lead(-496, 81, NULL) OVER(...)
 |
-03:SCAN HDFS [functional.alltypestiny]
-   partitions=4/4 files=4 size=460B
+00:UNION
 ====
 # IMPALA-2216: Make sure the final output exprs are substituted, even
 # if the resulting plan is an EmptySetNode.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test
index 13f6326..fdaed6d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test
@@ -984,6 +984,7 @@ PLAN-ROOT SINK
 |
 00:UNION
 |  constant-operands=1
+|  pass-through-operands: all
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
@@ -995,6 +996,7 @@ PLAN-ROOT SINK
 |
 00:UNION
 |  constant-operands=1
+|  pass-through-operands: all
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index 6b5291a..63db8ba 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -326,3 +326,33 @@ PLAN-ROOT SINK
 00:SCAN KUDU [functional_kudu.alltypestiny]
    predicates: id IN (1, NULL), bigint_col IN (NULL), bool_col IN (NULL), 
double_col IN (NULL), float_col IN (NULL), smallint_col IN (NULL), string_col 
IN (NULL), tinyint_col IN (NULL)
 ====
+# IMPALA-3586: The operand with the Kudu scan cannot be passed through because 
id is
+# not-nullable (primary key).
+select id from functional_kudu.alltypes
+union all
+select id from functional.alltypes;
+---- PLAN
+PLAN-ROOT SINK
+|
+00:UNION
+|  pass-through-operands: 02
+|
+|--01:SCAN KUDU [functional_kudu.alltypes]
+|
+02:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# IMPALA-3586: When both operands are Kudu scans, they should both be passed 
through.
+select id from functional_kudu.alltypes
+union all
+select id from functional_kudu.alltypes;
+---- PLAN
+PLAN-ROOT SINK
+|
+00:UNION
+|  pass-through-operands: all
+|
+|--02:SCAN KUDU [functional_kudu.alltypes]
+|
+01:SCAN KUDU [functional_kudu.alltypes]
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/testdata/workloads/functional-planner/queries/PlannerTest/order.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/order.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/order.test
index 266c517..8bbcc7c 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/order.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/order.test
@@ -218,6 +218,7 @@ PLAN-ROOT SINK
 |  order by: int_col DESC
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -235,6 +236,7 @@ PLAN-ROOT SINK
 |  order by: int_col DESC
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -258,6 +260,7 @@ PLAN-ROOT SINK
 |  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -283,6 +286,7 @@ PLAN-ROOT SINK
 |  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -546,6 +550,7 @@ PLAN-ROOT SINK
 |  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--04:SCAN HDFS [functional.alltypestiny]
 |     partitions=1/4 files=1 size=115B
@@ -576,6 +581,7 @@ PLAN-ROOT SINK
 |  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--04:SCAN HDFS [functional.alltypestiny]
 |     partitions=1/4 files=1 size=115B
@@ -604,11 +610,13 @@ PLAN-ROOT SINK
 |  order by: id ASC, bool_col ASC
 |
 04:UNION
+|  pass-through-operands: all
 |
 |--03:AGGREGATE [FINALIZE]
 |  |  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
 |  |
 |  00:UNION
+|  |  pass-through-operands: all
 |  |
 |  |--02:SCAN HDFS [functional.alltypestiny]
 |  |     partitions=1/4 files=1 size=115B
@@ -631,27 +639,29 @@ PLAN-ROOT SINK
 |  order by: id ASC, bool_col ASC
 |
 04:UNION
+|  pass-through-operands: all
 |
-|--06:SCAN HDFS [functional.alltypestiny]
-|     partitions=1/4 files=1 size=115B
-|
-|--05:SCAN HDFS [functional.alltypestiny]
+|--09:AGGREGATE [FINALIZE]
+|  |  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
+|  |
+|  08:EXCHANGE 
[HASH(id,bool_col,tinyint_col,smallint_col,int_col,bigint_col,float_col,double_col,date_string_col,string_col,timestamp_col,year,month)]
+|  |
+|  03:AGGREGATE [STREAMING]
+|  |  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
+|  |
+|  00:UNION
+|  |  pass-through-operands: all
+|  |
+|  |--02:SCAN HDFS [functional.alltypestiny]
+|  |     partitions=1/4 files=1 size=115B
+|  |
+|  01:SCAN HDFS [functional.alltypestiny]
 |     partitions=1/4 files=1 size=115B
 |
-09:AGGREGATE [FINALIZE]
-|  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
-|
-08:EXCHANGE 
[HASH(id,bool_col,tinyint_col,smallint_col,int_col,bigint_col,float_col,double_col,date_string_col,string_col,timestamp_col,year,month)]
-|
-03:AGGREGATE [STREAMING]
-|  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
-|
-00:UNION
-|
-|--02:SCAN HDFS [functional.alltypestiny]
+|--06:SCAN HDFS [functional.alltypestiny]
 |     partitions=1/4 files=1 size=115B
 |
-01:SCAN HDFS [functional.alltypestiny]
+05:SCAN HDFS [functional.alltypestiny]
    partitions=1/4 files=1 size=115B
 ====
 select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month 
from functional.alltypestiny where year=2009 and month=1
@@ -681,16 +691,31 @@ PLAN-ROOT SINK
 |  order by: year ASC, month ASC, id ASC
 |
 09:UNION
+|  pass-through-operands: 10,11,08
+|
+|--15:TOP-N [LIMIT=3]
+|  |  order by: id ASC
+|  |
+|  12:UNION
+|  |  pass-through-operands: all
+|  |
+|  |--14:SCAN HDFS [functional.alltypestiny]
+|  |     partitions=0/4 files=0 size=0B
+|  |
+|  13:SCAN HDFS [functional.alltypestiny]
+|     partitions=1/4 files=1 size=115B
 |
 |--08:AGGREGATE [FINALIZE]
 |  |  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
 |  |
 |  00:UNION
+|  |  pass-through-operands: 01,02,03
 |  |
 |  |--07:TOP-N [LIMIT=3]
 |  |  |  order by: id ASC
 |  |  |
 |  |  04:UNION
+|  |  |  pass-through-operands: all
 |  |  |
 |  |  |--06:SCAN HDFS [functional.alltypestiny]
 |  |  |     partitions=1/4 files=1 size=115B
@@ -707,17 +732,6 @@ PLAN-ROOT SINK
 |  01:SCAN HDFS [functional.alltypestiny]
 |     partitions=1/4 files=1 size=115B
 |
-|--15:TOP-N [LIMIT=3]
-|  |  order by: id ASC
-|  |
-|  12:UNION
-|  |
-|  |--14:SCAN HDFS [functional.alltypestiny]
-|  |     partitions=0/4 files=0 size=0B
-|  |
-|  13:SCAN HDFS [functional.alltypestiny]
-|     partitions=1/4 files=1 size=115B
-|
 |--11:SCAN HDFS [functional.alltypestiny]
 |     partitions=1/4 files=1 size=115B
 |
@@ -733,16 +747,11 @@ PLAN-ROOT SINK
 |  order by: year ASC, month ASC, id ASC
 |
 09:UNION
-|
-|--11:SCAN HDFS [functional.alltypestiny]
-|     partitions=1/4 files=1 size=115B
-|
-|--10:SCAN HDFS [functional.alltypestiny]
-|     partitions=1/4 files=1 size=115B
+|  pass-through-operands: 10,11,20
 |
 |--22:EXCHANGE [RANDOM]
 |  |
-|  17:MERGING-EXCHANGE [UNPARTITIONED]
+|  21:MERGING-EXCHANGE [UNPARTITIONED]
 |  |  order by: id ASC
 |  |  limit: 3
 |  |
@@ -750,6 +759,7 @@ PLAN-ROOT SINK
 |  |  order by: id ASC
 |  |
 |  12:UNION
+|  |  pass-through-operands: all
 |  |
 |  |--14:SCAN HDFS [functional.alltypestiny]
 |  |     partitions=0/4 files=0 size=0B
@@ -757,40 +767,48 @@ PLAN-ROOT SINK
 |  13:SCAN HDFS [functional.alltypestiny]
 |     partitions=1/4 files=1 size=115B
 |
-21:AGGREGATE [FINALIZE]
-|  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
-|
-20:EXCHANGE 
[HASH(id,bool_col,tinyint_col,smallint_col,int_col,bigint_col,float_col,double_col,date_string_col,string_col,timestamp_col,year,month)]
-|
-08:AGGREGATE [STREAMING]
-|  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
-|
-00:UNION
-|
-|--03:SCAN HDFS [functional.alltypestiny]
-|     partitions=1/4 files=1 size=115B
-|
-|--02:SCAN HDFS [functional.alltypestiny]
-|     partitions=1/4 files=1 size=115B
-|
-|--01:SCAN HDFS [functional.alltypestiny]
+|--20:AGGREGATE [FINALIZE]
+|  |  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
+|  |
+|  19:EXCHANGE 
[HASH(id,bool_col,tinyint_col,smallint_col,int_col,bigint_col,float_col,double_col,date_string_col,string_col,timestamp_col,year,month)]
+|  |
+|  08:AGGREGATE [STREAMING]
+|  |  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
+|  |
+|  00:UNION
+|  |  pass-through-operands: 01,02,03
+|  |
+|  |--18:EXCHANGE [RANDOM]
+|  |  |
+|  |  17:MERGING-EXCHANGE [UNPARTITIONED]
+|  |  |  order by: id ASC
+|  |  |  limit: 3
+|  |  |
+|  |  07:TOP-N [LIMIT=3]
+|  |  |  order by: id ASC
+|  |  |
+|  |  04:UNION
+|  |  |  pass-through-operands: all
+|  |  |
+|  |  |--06:SCAN HDFS [functional.alltypestiny]
+|  |  |     partitions=1/4 files=1 size=115B
+|  |  |
+|  |  05:SCAN HDFS [functional.alltypestiny]
+|  |     partitions=1/4 files=1 size=115B
+|  |
+|  |--03:SCAN HDFS [functional.alltypestiny]
+|  |     partitions=1/4 files=1 size=115B
+|  |
+|  |--02:SCAN HDFS [functional.alltypestiny]
+|  |     partitions=1/4 files=1 size=115B
+|  |
+|  01:SCAN HDFS [functional.alltypestiny]
 |     partitions=1/4 files=1 size=115B
 |
-19:EXCHANGE [RANDOM]
-|
-18:MERGING-EXCHANGE [UNPARTITIONED]
-|  order by: id ASC
-|  limit: 3
-|
-07:TOP-N [LIMIT=3]
-|  order by: id ASC
-|
-04:UNION
-|
-|--06:SCAN HDFS [functional.alltypestiny]
+|--11:SCAN HDFS [functional.alltypestiny]
 |     partitions=1/4 files=1 size=115B
 |
-05:SCAN HDFS [functional.alltypestiny]
+10:SCAN HDFS [functional.alltypestiny]
    partitions=1/4 files=1 size=115B
 ====
 # Sort over top-n
@@ -839,6 +857,7 @@ PLAN-ROOT SINK
 |  order by: bigint_col ASC
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -859,6 +878,7 @@ PLAN-ROOT SINK
 |  order by: bigint_col ASC
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -946,6 +966,7 @@ select * from functional.alltypessmall order by bigint_col
 PLAN-ROOT SINK
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -958,6 +979,7 @@ PLAN-ROOT SINK
 03:EXCHANGE [UNPARTITIONED]
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -976,6 +998,7 @@ PLAN-ROOT SINK
 |  order by: bigint_col ASC
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -992,6 +1015,7 @@ PLAN-ROOT SINK
 |  order by: bigint_col ASC
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -1014,6 +1038,7 @@ PLAN-ROOT SINK
 |  group by: int_col
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--05:TOP-N [LIMIT=10 OFFSET=5]
 |  |  order by: int_col ASC
@@ -1048,9 +1073,7 @@ PLAN-ROOT SINK
 |  group by: int_col
 |
 00:UNION
-|
-|--01:SCAN HDFS [functional.alltypes]
-|     partitions=24/24 files=24 size=478.45KB
+|  pass-through-operands: all
 |
 |--11:EXCHANGE [RANDOM]
 |  |
@@ -1065,16 +1088,19 @@ PLAN-ROOT SINK
 |  04:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |
-10:EXCHANGE [RANDOM]
-|
-08:MERGING-EXCHANGE [UNPARTITIONED]
-|  order by: int_col ASC
-|  limit: 10
-|
-03:TOP-N [LIMIT=10]
-|  order by: int_col ASC
+|--10:EXCHANGE [RANDOM]
+|  |
+|  08:MERGING-EXCHANGE [UNPARTITIONED]
+|  |  order by: int_col ASC
+|  |  limit: 10
+|  |
+|  03:TOP-N [LIMIT=10]
+|  |  order by: int_col ASC
+|  |
+|  02:SCAN HDFS [functional.alltypes]
+|     partitions=24/24 files=24 size=478.45KB
 |
-02:SCAN HDFS [functional.alltypes]
+01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ====
 # Test slot materialization
@@ -1200,6 +1226,7 @@ PLAN-ROOT SINK
 |  order by: bigint_col ASC
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypesagg]
 |     partitions=11/11 files=11 size=814.73KB
@@ -1217,6 +1244,7 @@ PLAN-ROOT SINK
 |  order by: bigint_col ASC
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypesagg]
 |     partitions=11/11 files=11 size=814.73KB

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
index 0938475..1d0bfe5 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
@@ -870,6 +870,7 @@ PLAN-ROOT SINK
 |  other predicates: id IS NULL, int_col = 17
 |
 |--01:UNION
+|  |  pass-through-operands: all
 |  |
 |  |--05:AGGREGATE [FINALIZE]
 |  |  |  group by: id, int_col
@@ -933,6 +934,7 @@ PLAN-ROOT SINK
 |     predicates: isnull(a.id, 0) = 0
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--04:AGGREGATE [FINALIZE]
 |  |  group by: id, int_col

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
index 6042557..7f18c84 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
@@ -714,6 +714,7 @@ PLAN-ROOT SINK
 |     predicates: t3.bool_col = FALSE
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypestiny t2]
 |     partitions=4/4 files=4 size=460B
@@ -748,6 +749,7 @@ PLAN-ROOT SINK
 |     predicates: b.int_col = 1
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--03:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
@@ -789,6 +791,7 @@ PLAN-ROOT SINK
 |  group by: month, year
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--03:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
index 60e9dd9..2c91472 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
@@ -149,6 +149,7 @@ select * from functional_hbase.alltypes limit 2
 PLAN-ROOT SINK
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HBASE [functional_hbase.alltypes]
 |     limit: 2
@@ -159,6 +160,7 @@ PLAN-ROOT SINK
 PLAN-ROOT SINK
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HBASE [functional_hbase.alltypes]
 |     limit: 2
@@ -173,6 +175,7 @@ select * from functional_hbase.alltypes limit 5
 PLAN-ROOT SINK
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HBASE [functional_hbase.alltypes]
 |     limit: 5
@@ -183,6 +186,7 @@ PLAN-ROOT SINK
 PLAN-ROOT SINK
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--04:EXCHANGE [UNPARTITIONED]
 |  |  limit: 5
@@ -278,19 +282,20 @@ where year=2009 and month=2
 PLAN-ROOT SINK
 |
 04:UNION
+|  pass-through-operands: 03
 |
-|--03:AGGREGATE [FINALIZE]
-|  |  group by: id, bool_col
-|  |
-|  00:UNION
-|  |
-|  |--02:SCAN HDFS [functional.alltypestiny b]
-|  |     partitions=1/4 files=1 size=115B
-|  |
-|  01:SCAN HDFS [functional.alltypestiny a]
+|--05:SCAN HDFS [functional.alltypestiny c]
+|     partitions=1/4 files=1 size=115B
+|
+03:AGGREGATE [FINALIZE]
+|  group by: id, bool_col
+|
+00:UNION
+|
+|--02:SCAN HDFS [functional.alltypestiny b]
 |     partitions=1/4 files=1 size=115B
 |
-05:SCAN HDFS [functional.alltypestiny c]
+01:SCAN HDFS [functional.alltypestiny a]
    partitions=1/4 files=1 size=115B
 ====
 # IMPALA-2527: Tests that the small query optimization is disabled for 
colleciton types

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
index 90b1713..44e59a0 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
@@ -2064,6 +2064,7 @@ union all
 PLAN-ROOT SINK
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--05:SCAN HDFS [functional.alltypestiny]
 |     partitions=1/4 files=1 size=115B
@@ -2095,6 +2096,7 @@ PLAN-ROOT SINK
 |  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--05:SCAN HDFS [functional.alltypestiny]
 |     partitions=1/4 files=1 size=115B

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a50c3440/testdata/workloads/functional-planner/queries/PlannerTest/topn.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/topn.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/topn.test
index 4252ac7..d3ab315 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/topn.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/topn.test
@@ -289,6 +289,7 @@ PLAN-ROOT SINK
 |  order by: int_col DESC
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -307,6 +308,7 @@ PLAN-ROOT SINK
 |  order by: int_col DESC
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -330,6 +332,7 @@ PLAN-ROOT SINK
 |  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -356,6 +359,7 @@ PLAN-ROOT SINK
 |  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
 |
 00:UNION
+|  pass-through-operands: all
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
@@ -401,6 +405,7 @@ PLAN-ROOT SINK
 |  order by: int_col ASC
 |
 00:UNION
+|  pass-through-operands: all
 |  limit: 10
 |
 |--02:SCAN HDFS [functional.alltypessmall]
@@ -418,6 +423,7 @@ PLAN-ROOT SINK
 |  limit: 10
 |
 00:UNION
+|  pass-through-operands: all
 |  limit: 10
 |
 |--02:SCAN HDFS [functional.alltypessmall]

Reply via email to