Repository: incubator-impala
Updated Branches:
  refs/heads/master baba8960b -> 58b206ff0


IMPALA-4883: Union Codegen

For each non-passthrough child of the Union node, codegen the loop that
does per row tuple materialization.

Testing:
Ran test_queries.py test locally in exchaustive mode.

Benchmark:
Ran a local benchmark on a local 10 GB TPCDS dataset on an unpartitioned
store_sales table.

SELECT
  COUNT(c),
  COUNT(ss_customer_sk),
  COUNT(ss_cdemo_sk),
  COUNT(ss_hdemo_sk),
  COUNT(ss_addr_sk),
  COUNT(ss_store_sk),
  COUNT(ss_promo_sk),
  COUNT(ss_ticket_number),
  COUNT(ss_quantity),
  COUNT(ss_wholesale_cost),
  COUNT(ss_list_price),
  COUNT(ss_sales_price),
  COUNT(ss_ext_discount_amt),
  COUNT(ss_ext_sales_price),
  COUNT(ss_ext_wholesale_cost),
  COUNT(ss_ext_list_price),
  COUNT(ss_ext_tax),
  COUNT(ss_coupon_amt),
  COUNT(ss_net_paid),
  COUNT(ss_net_paid_inc_tax),
  COUNT(ss_net_profit),
  COUNT(ss_sold_date_sk)
FROM (
  select fnv_hash(ss_sold_time_sk) c, * from 
tpcds_10_parquet.store_sales_unpartitioned
  union all
  select fnv_hash(ss_sold_time_sk) c, * from 
tpcds_10_parquet.store_sales_unpartitioned
  union all
  select fnv_hash(ss_sold_time_sk) c, * from 
tpcds_10_parquet.store_sales_unpartitioned
  union all
  select fnv_hash(ss_sold_time_sk) c, * from 
tpcds_10_parquet.store_sales_unpartitioned
  union all
  select fnv_hash(ss_sold_time_sk) c, * from 
tpcds_10_parquet.store_sales_unpartitioned
  union all
  select fnv_hash(ss_sold_time_sk) c, * from 
tpcds_10_parquet.store_sales_unpartitioned
  union all
  select fnv_hash(ss_sold_time_sk) c, * from 
tpcds_10_parquet.store_sales_unpartitioned
  union all
  select fnv_hash(ss_sold_time_sk) c, * from 
tpcds_10_parquet.store_sales_unpartitioned
  union all
  select fnv_hash(ss_sold_time_sk) c, * from 
tpcds_10_parquet.store_sales_unpartitioned
  union all
  select fnv_hash(ss_sold_time_sk) c, * from 
tpcds_10_parquet.store_sales_unpartitioned
) t

Before: 39s704ms
Operator          #Hosts   Avg Time   Max Time    #Rows  Est. #Rows   Peak Mem  
Est. Peak Mem  Detail
------------------------------------------------------------------------------------------------------------------------------
13:AGGREGATE           1  194.504us  194.504us        1           1   28.00 KB  
      -1.00 B  FINALIZE
12:EXCHANGE            1   17.284us   17.284us        3           1          0  
      -1.00 B  UNPARTITIONED
11:AGGREGATE           3    2s202ms    2s934ms        3           1  115.00 KB  
     10.00 MB
00:UNION               3   32s514ms   34s926ms  288.01M     288.01M    3.08 MB  
            0
|--02:SCAN HDFS        3  158.373ms  216.085ms   28.80M      28.80M  489.71 MB  
      1.88 GB  tpcds_10_parquet.store_sales
|--03:SCAN HDFS        3  167.002ms  171.738ms   28.80M      28.80M  489.74 MB  
      1.88 GB  tpcds_10_parquet.store_sales
|--04:SCAN HDFS        3  125.331ms  145.496ms   28.80M      28.80M  489.57 MB  
      1.88 GB  tpcds_10_parquet.store_sales
|--05:SCAN HDFS        3  148.478ms  194.311ms   28.80M      28.80M  489.69 MB  
      1.88 GB  tpcds_10_parquet.store_sales
|--06:SCAN HDFS        3  143.995ms  162.781ms   28.80M      28.80M  489.57 MB  
      1.88 GB  tpcds_10_parquet.store_sales
|--07:SCAN HDFS        3  169.731ms  250.201ms   28.80M      28.80M  489.58 MB  
      1.88 GB  tpcds_10_parquet.store_sales
|--08:SCAN HDFS        3  164.110ms  254.374ms   28.80M      28.80M  489.61 MB  
      1.88 GB  tpcds_10_parquet.store_sales
|--09:SCAN HDFS        3  135.631ms  162.117ms   28.80M      28.80M  489.63 MB  
      1.88 GB  tpcds_10_parquet.store_sales
|--10:SCAN HDFS        3  138.736ms  167.778ms   28.80M      28.80M  489.67 MB  
      1.88 GB  tpcds_10_parquet.store_sales
01:SCAN HDFS           3  202.015ms  248.728ms   28.80M      28.80M  489.68 MB  
      1.88 GB  tpcds_10_parquet.store_sales

After: 20s177ms
Operator          #Hosts   Avg Time   Max Time    #Rows  Est. #Rows   Peak Mem  
Est. Peak Mem  Detail
------------------------------------------------------------------------------------------------------------------------------
13:AGGREGATE           1  174.617us  174.617us        1           1   28.00 KB  
      -1.00 B  FINALIZE
12:EXCHANGE            1   16.693us   16.693us        3           1          0  
      -1.00 B  UNPARTITIONED
11:AGGREGATE           3    2s830ms    3s615ms        3           1  115.00 KB  
     10.00 MB
00:UNION               3    4s296ms    5s258ms  288.01M     288.01M    3.08 MB  
            0
|--02:SCAN HDFS        3    1s212ms    1s340ms   28.80M      28.80M  488.82 MB  
      1.88 GB  tpcds_10_parquet.store_sales
|--03:SCAN HDFS        3    1s387ms    1s570ms   28.80M      28.80M  489.37 MB  
      1.88 GB  tpcds_10_parquet.store_sales
|--04:SCAN HDFS        3    1s224ms    1s347ms   28.80M      28.80M  487.22 MB  
      1.88 GB  tpcds_10_parquet.store_sales
|--05:SCAN HDFS        3    1s245ms    1s321ms   28.80M      28.80M  489.25 MB  
      1.88 GB  tpcds_10_parquet.store_sales
|--06:SCAN HDFS        3    1s232ms    1s505ms   28.80M      28.80M  484.21 MB  
      1.88 GB  tpcds_10_parquet.store_sales
|--07:SCAN HDFS        3    1s348ms    1s518ms   28.80M      28.80M  488.20 MB  
      1.88 GB  tpcds_10_parquet.store_sales
|--08:SCAN HDFS        3    1s231ms    1s335ms   28.80M      28.80M  483.58 MB  
      1.88 GB  tpcds_10_parquet.store_sales
|--09:SCAN HDFS        3    1s179ms    1s349ms   28.80M      28.80M  482.76 MB  
      1.88 GB  tpcds_10_parquet.store_sales
|--10:SCAN HDFS        3    1s121ms    1s154ms   28.80M      28.80M  486.59 MB  
      1.88 GB  tpcds_10_parquet.store_sales
01:SCAN HDFS           3    1s284ms    1s523ms   28.80M      28.80M  486.70 MB  
      1.88 GB  tpcds_10_parquet.store_sales

Change-Id: Ib4107d27582ff5416172810364a6e76d3d93c439
Reviewed-on: http://gerrit.cloudera.org:8080/6459
Reviewed-by: Alex Behm <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/75553165
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/75553165
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/75553165

Branch: refs/heads/master
Commit: 75553165eed6c5decaa7fd0bfa3ae4d537aeb7ff
Parents: baba896
Author: Taras Bobrovytsky <[email protected]>
Authored: Tue Mar 21 18:21:23 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Fri Apr 21 04:53:09 2017 +0000

----------------------------------------------------------------------
 be/src/codegen/gen_ir_descriptions.py           |   4 +-
 be/src/codegen/impala-ir.cc                     |   1 +
 be/src/exec/CMakeLists.txt                      |   1 +
 be/src/exec/union-node-ir.cc                    |  51 +++++++
 be/src/exec/union-node.cc                       | 133 ++++++++++++-------
 be/src/exec/union-node.h                        |  34 ++++-
 .../queries/QueryTest/nested-types-subplan.test |   7 +-
 .../queries/QueryTest/union.test                |  29 ++++
 8 files changed, 203 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/75553165/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py 
b/be/src/codegen/gen_ir_descriptions.py
index 4b62dfe..8a82218 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -214,7 +214,9 @@ ir_functions = [
   ["MEMPOOL_CHECKED_ALLOCATE",
    "_ZN6impala7MemPool8AllocateILb1EEEPhli"],
   ["RUNTIME_FILTER_EVAL",
-   "_ZNK6impala13RuntimeFilter4EvalEPvRKNS_10ColumnTypeE"]
+   "_ZNK6impala13RuntimeFilter4EvalEPvRKNS_10ColumnTypeE"],
+  ["UNION_MATERIALIZE_BATCH",
+  "_ZN6impala9UnionNode16MaterializeBatchEPNS_8RowBatchEPPh"]
 ]
 
 enums_preamble = '\

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/75553165/be/src/codegen/impala-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc
index 5ff816b..6c8a9d5 100644
--- a/be/src/codegen/impala-ir.cc
+++ b/be/src/codegen/impala-ir.cc
@@ -37,6 +37,7 @@
 #include "exec/partitioned-hash-join-builder-ir.cc"
 #include "exec/partitioned-hash-join-node-ir.cc"
 #include "exec/topn-node-ir.cc"
+#include "exec/union-node-ir.cc"
 #include "exprs/aggregate-functions-ir.cc"
 #include "exprs/bit-byte-functions-ir.cc"
 #include "exprs/cast-functions-ir.cc"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/75553165/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 1193660..8632790 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -98,6 +98,7 @@ add_library(Exec
   topn-node.cc
   topn-node-ir.cc
   union-node.cc
+  union-node-ir.cc
   unnest-node.cc
 )
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/75553165/be/src/exec/union-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node-ir.cc b/be/src/exec/union-node-ir.cc
new file mode 100644
index 0000000..38dfc97
--- /dev/null
+++ b/be/src/exec/union-node-ir.cc
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/union-node.h"
+#include "runtime/tuple-row.h"
+
+using namespace impala;
+
+void IR_ALWAYS_INLINE UnionNode::MaterializeExprs(const 
std::vector<ExprContext*>& exprs,
+    TupleRow* row, uint8_t* tuple_buf, RowBatch* dst_batch) {
+  DCHECK(!dst_batch->AtCapacity());
+  Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf);
+  TupleRow* dst_row = dst_batch->GetRow(dst_batch->AddRow());
+  dst_tuple->MaterializeExprs<false, false>(row, *tuple_desc_, exprs, 
tuple_pool_.get());
+  dst_row->SetTuple(0, dst_tuple);
+  dst_batch->CommitLastRow();
+}
+
+void UnionNode::MaterializeBatch(RowBatch* dst_batch, uint8_t** tuple_buf) {
+  // Take all references to member variables out of the loop to reduce the 
number of
+  // loads and stores.
+  RowBatch* child_batch = child_batch_.get();
+  int tuple_byte_size = tuple_desc_->byte_size();
+  uint8_t* cur_tuple = *tuple_buf;
+  const std::vector<ExprContext*>& child_exprs = child_expr_lists_[child_idx_];
+
+  int num_rows_to_process = std::min(child_batch->num_rows() - child_row_idx_,
+      dst_batch->capacity() - dst_batch->num_rows());
+  FOREACH_ROW_LIMIT(child_batch, child_row_idx_, num_rows_to_process, 
batch_iter) {
+    TupleRow* child_row = batch_iter.Get();
+    MaterializeExprs(child_exprs, child_row, cur_tuple, dst_batch);
+    cur_tuple += tuple_byte_size;
+  }
+
+  child_row_idx_ += num_rows_to_process;
+  *tuple_buf = cur_tuple;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/75553165/be/src/exec/union-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc
index bff7435..0da1760 100644
--- a/be/src/exec/union-node.cc
+++ b/be/src/exec/union-node.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "codegen/llvm-codegen.h"
 #include "exec/union-node.h"
 #include "exprs/expr.h"
 #include "exprs/expr-context.h"
@@ -27,7 +28,8 @@
 
 #include "common/names.h"
 
-namespace impala {
+using namespace llvm;
+using namespace impala;
 
 UnionNode::UnionNode(ObjectPool* pool, const TPlanNode& tnode,
     const DescriptorTbl& descs)
@@ -35,6 +37,7 @@ UnionNode::UnionNode(ObjectPool* pool, const TPlanNode& tnode,
       tuple_id_(tnode.union_node.tuple_id),
       tuple_desc_(nullptr),
       
first_materialized_child_idx_(tnode.union_node.first_materialized_child_idx),
+      tuple_pool_(nullptr),
       child_idx_(0),
       child_batch_(nullptr),
       child_row_idx_(0),
@@ -68,6 +71,8 @@ Status UnionNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Prepare(state));
   tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
   DCHECK(tuple_desc_ != nullptr);
+  tuple_pool_.reset(new MemPool(mem_tracker()));
+  codegend_union_materialize_batch_fns_.resize(child_expr_lists_.size());
 
   // Prepare const expr lists.
   for (const vector<ExprContext*>& exprs : const_expr_lists_) {
@@ -86,6 +91,50 @@ Status UnionNode::Prepare(RuntimeState* state) {
   return Status::OK();
 }
 
+void UnionNode::Codegen(RuntimeState* state) {
+  DCHECK(state->ShouldCodegen());
+  ExecNode::Codegen(state);
+  if (IsNodeCodegenDisabled()) return;
+
+  LlvmCodeGen* codegen = state->codegen();
+  DCHECK(codegen != nullptr);
+  std::stringstream codegen_message;
+  Status codegen_status;
+  for (int i = 0; i < child_expr_lists_.size(); ++i) {
+    if (IsChildPassthrough(i)) continue;
+
+    llvm::Function* tuple_materialize_exprs_fn;
+    codegen_status = Tuple::CodegenMaterializeExprs(codegen, false, 
*tuple_desc_,
+        child_expr_lists_[i], tuple_pool_.get(), &tuple_materialize_exprs_fn);
+    if (!codegen_status.ok()) {
+      // Codegen may fail in some corner cases (e.g. we don't handle 
TYPE_CHAR). If this
+      // happens, abort codegen for this and the remaining children.
+      codegen_message << "Codegen failed for child: " << children_[i]->id();
+      break;
+    }
+
+    // Get a copy of the function. This function will be modified and added to 
the
+    // vector of functions.
+    Function* union_materialize_batch_fn =
+        codegen->GetFunction(IRFunction::UNION_MATERIALIZE_BATCH, true);
+    DCHECK(union_materialize_batch_fn != nullptr);
+
+    int replaced = codegen->ReplaceCallSites(union_materialize_batch_fn,
+        tuple_materialize_exprs_fn, Tuple::MATERIALIZE_EXPRS_SYMBOL);
+    DCHECK_EQ(replaced, 1) << LlvmCodeGen::Print(union_materialize_batch_fn);
+
+    union_materialize_batch_fn = codegen->FinalizeFunction(
+        union_materialize_batch_fn);
+    DCHECK(union_materialize_batch_fn != nullptr);
+
+    // Add the function to Jit and to the vector of codegened functions.
+    codegen->AddFunctionToJit(union_materialize_batch_fn,
+        
reinterpret_cast<void**>(&(codegend_union_materialize_batch_fns_.data()[i])));
+  }
+  runtime_profile()->AddCodegenMsg(
+      codegen_status.ok(), codegen_status, codegen_message.str());
+}
+
 Status UnionNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Open(state));
@@ -114,12 +163,6 @@ Status UnionNode::GetNextPassThrough(RuntimeState* state, 
RowBatch* row_batch) {
   if (child_eos_) RETURN_IF_ERROR(child(child_idx_)->Open(state));
   DCHECK_EQ(row_batch->num_rows(), 0);
   RETURN_IF_ERROR(child(child_idx_)->GetNext(state, row_batch, &child_eos_));
-  if (limit_ != -1 && num_rows_returned_ + row_batch->num_rows() > limit_) {
-    row_batch->set_num_rows(limit_ - num_rows_returned_);
-  }
-  num_rows_returned_ += row_batch->num_rows();
-  DCHECK(limit_ == -1 || num_rows_returned_ <= limit_);
-  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
   if (child_eos_) {
     // Even though the child is at eos, it's not OK to Close() it here. Once 
we close
     // the child, the row batches that it produced are invalid. Marking the 
batch as
@@ -143,10 +186,8 @@ Status UnionNode::GetNextMaterialized(RuntimeState* state, 
RowBatch* row_batch)
   memset(tuple_buf, 0, tuple_buf_size);
 
   while (HasMoreMaterialized() && !row_batch->AtCapacity()) {
-    // There are only 2 ways of getting out of this loop:
-    // 1. The loop ends normally when we are either done iterating over the 
children that
-    //    need materialization or the row batch is at capacity.
-    // 2. We return from the function from inside the loop if limit is reached.
+    // The loop runs until we are either done iterating over the children that 
require
+    // materialization, or the row batch is at capacity.
     DCHECK(!IsChildPassthrough(child_idx_));
     // Child row batch was either never set or we're moving on to a different 
child.
     if (child_batch_.get() == nullptr) {
@@ -163,12 +204,6 @@ Status UnionNode::GetNextMaterialized(RuntimeState* state, 
RowBatch* row_batch)
     }
 
     while (!row_batch->AtCapacity()) {
-      // This loop fetches row batches from a single child and materializes 
each output
-      // row, until one of these conditions:
-      // 1. The loop ends normally if the row batch is at capacity.
-      // 2. We break out of the loop if all the rows were consumed from the 
current child
-      //    and we are moving on to the next child.
-      // 3. We return from the function from inside the loop if the limit is 
reached.
       DCHECK(child_batch_.get() != nullptr);
       DCHECK_LE(child_row_idx_, child_batch_->num_rows());
       if (child_row_idx_ == child_batch_->num_rows()) {
@@ -184,21 +219,17 @@ Status UnionNode::GetNextMaterialized(RuntimeState* 
state, RowBatch* row_batch)
         // try again.
         if (child_batch_->num_rows() == 0) continue;
       }
-      DCHECK_LT(child_row_idx_, child_batch_->num_rows());
-      TupleRow* child_row = child_batch_->GetRow(child_row_idx_);
-      MaterializeExprs(child_expr_lists_[child_idx_], child_row, tuple_buf, 
row_batch);
-      tuple_buf += tuple_desc_->byte_size();
-      ++child_row_idx_;
-      if (ReachedLimit()) {
-        COUNTER_SET(rows_returned_counter_, num_rows_returned_);
-        // It's OK to close the child here even if we are inside a subplan.
-        child_batch_.reset();
-        child(child_idx_)->Close(state);
-        return Status::OK();
+      DCHECK_EQ(codegend_union_materialize_batch_fns_.size(), 
children_.size());
+      if (codegend_union_materialize_batch_fns_[child_idx_] == nullptr) {
+        MaterializeBatch(row_batch, &tuple_buf);
+      } else {
+        codegend_union_materialize_batch_fns_[child_idx_](this, row_batch, 
&tuple_buf);
       }
     }
-
+    // It shouldn't be the case that we reached the limit because we shouldn't 
have
+    // incremented 'num_rows_returned_' yet.
     DCHECK(!ReachedLimit());
+
     if (child_eos_ && child_row_idx_ == child_batch_->num_rows()) {
       // Unless we are inside a subplan expecting to call Open()/GetNext() on 
the child
       // again, the child can be closed at this point.
@@ -225,15 +256,14 @@ Status UnionNode::GetNextConst(RuntimeState* state, 
RowBatch* row_batch) {
   RETURN_IF_ERROR(
       row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buf_size, 
&tuple_buf));
   memset(tuple_buf, 0, tuple_buf_size);
-  while (const_expr_list_idx_ < const_expr_lists_.size() &&
-      !row_batch->AtCapacity() && !ReachedLimit()) {
+
+  while (const_expr_list_idx_ < const_expr_lists_.size() && 
!row_batch->AtCapacity()) {
     MaterializeExprs(
         const_expr_lists_[const_expr_list_idx_], nullptr, tuple_buf, 
row_batch);
     tuple_buf += tuple_desc_->byte_size();
     ++const_expr_list_idx_;
   }
 
-  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
   return Status::OK();
 }
 
@@ -242,15 +272,22 @@ Status UnionNode::GetNext(RuntimeState* state, RowBatch* 
row_batch, bool* eos) {
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
+  // The tuple pool should be empty between GetNext() calls.
+  DCHECK_EQ(tuple_pool_.get()->GetTotalChunkSizes(), 0);
 
   if (to_close_child_idx_ != -1) {
     // The previous child needs to be closed if passthrough was enabled for 
it. In the non
     // passthrough case, the child was already closed in the previous call to 
GetNext().
     DCHECK(IsChildPassthrough(to_close_child_idx_));
+    DCHECK(!IsInSubplan());
     child(to_close_child_idx_)->Close(state);
     to_close_child_idx_ = -1;
   }
 
+  // Save the number of rows in case GetNext() is called with a non-empty 
batch, which can
+  // happen in a subplan.
+  int num_rows_before = row_batch->num_rows();
+
   if (HasMorePassthrough()) {
     RETURN_IF_ERROR(GetNextPassThrough(state, row_batch));
   } else if (HasMoreMaterialized()) {
@@ -259,32 +296,33 @@ Status UnionNode::GetNext(RuntimeState* state, RowBatch* 
row_batch, bool* eos) {
     RETURN_IF_ERROR(GetNextConst(state, row_batch));
   }
 
+  int num_rows_added = row_batch->num_rows() - num_rows_before;
+  DCHECK_GE(num_rows_added, 0);
+  if (limit_ != -1 && num_rows_returned_ + num_rows_added > limit_) {
+    // Truncate the row batch if we went over the limit.
+    num_rows_added = limit_ - num_rows_returned_;
+    row_batch->set_num_rows(num_rows_before + num_rows_added);
+    DCHECK_GE(num_rows_added, 0);
+  }
+  num_rows_returned_ += num_rows_added;
+
   *eos = ReachedLimit() ||
       (!HasMorePassthrough() && !HasMoreMaterialized() && 
!HasMoreConst(state));
 
+  // Attach the memory in the tuple pool (if any) to the row batch.
+  row_batch->tuple_data_pool()->AcquireData(tuple_pool_.get(), false);
+  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
   return Status::OK();
 }
 
-void UnionNode::MaterializeExprs(const vector<ExprContext*>& exprs,
-    TupleRow* row, uint8_t* tuple_buf, RowBatch* dst_batch) {
-  DCHECK(!dst_batch->AtCapacity());
-  Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf);
-  TupleRow* dst_row = dst_batch->GetRow(dst_batch->AddRow());
-  dst_tuple->MaterializeExprs<false, false>(row, *tuple_desc_,
-      exprs, dst_batch->tuple_data_pool());
-  dst_row->SetTuple(0, dst_tuple);
-  dst_batch->CommitLastRow();
-  ++num_rows_returned_;
-}
-
 Status UnionNode::Reset(RuntimeState* state) {
   child_idx_ = 0;
   child_batch_.reset();
   child_row_idx_ = 0;
   child_eos_ = false;
   const_expr_list_idx_ = 0;
-  // Since passthrough is disabled in subplans, verify that there is no 
passthrough
-  // child that needs to be closed.
+  // Since passthrough is disabled in subplans, verify that there is no 
passthrough child
+  // that needs to be closed.
   DCHECK_EQ(to_close_child_idx_, -1);
   return ExecNode::Reset(state);
 }
@@ -298,7 +336,6 @@ void UnionNode::Close(RuntimeState* state) {
   for (const vector<ExprContext*>& exprs : child_expr_lists_) {
     Expr::Close(exprs, state);
   }
+  if (tuple_pool_.get() != nullptr) tuple_pool_->FreeAll();
   ExecNode::Close(state);
 }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/75553165/be/src/exec/union-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node.h b/be/src/exec/union-node.h
index e1474d0..b1715f9 100644
--- a/be/src/exec/union-node.h
+++ b/be/src/exec/union-node.h
@@ -21,6 +21,7 @@
 
 #include <boost/scoped_ptr.hpp>
 
+#include "codegen/impala-ir.h"
 #include "exec/exec-node.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
@@ -46,6 +47,7 @@ class UnionNode : public ExecNode {
 
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
   virtual Status Prepare(RuntimeState* state);
+  virtual void Codegen(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
   virtual Status Reset(RuntimeState* state);
@@ -63,6 +65,13 @@ class UnionNode : public ExecNode {
   /// materialized.
   const int first_materialized_child_idx_;
 
+  /// Used by MaterializeExprs() to materialize var-len slots. The ownership 
of the memory
+  /// in this pool should be transferred to the row batch at the end of each 
GetNext()
+  /// call. The memory can't be attached to the row batch in 
MaterializeExprs() because
+  /// the pointer to the mem pool is hard coded in the codegen'ed 
MaterializeExprs().
+  /// TODO (IMPALA-5192): Remove this when no longer necessary in the future.
+  boost::scoped_ptr<MemPool> tuple_pool_;
+
   /// Const exprs materialized by this node. These exprs don't refer to any 
children.
   /// Only materialized by the first fragment instance to avoid duplication.
   std::vector<std::vector<ExprContext*>> const_expr_lists_;
@@ -83,6 +92,13 @@ class UnionNode : public ExecNode {
   /// Index of current row in child_row_batch_.
   int child_row_idx_;
 
+  typedef void (*UnionMaterializeBatchFn)(UnionNode*, RowBatch*, uint8_t**);
+  /// Vector of pointers to codegen'ed MaterializeBatch functions. The vector 
contains one
+  /// function for each child. The size of the vector should be equal to the 
number of
+  /// children. If a child is passthrough, there should be a NULL for that 
child. If
+  /// Codegen is disabled, there should be a NULL for every child.
+  std::vector<UnionMaterializeBatchFn> codegend_union_materialize_batch_fns_;
+
   /// Saved from the last to GetNext() on the current child.
   bool child_eos_;
 
@@ -96,6 +112,9 @@ class UnionNode : public ExecNode {
   /// END: Members that must be Reset()
   /////////////////////////////////////////
 
+  /// The following GetNext* functions don't apply the limit. It must be 
enforced by the
+  /// caller.
+
   /// GetNext() for the passthrough case. We pass 'row_batch' directly into 
the GetNext()
   /// call on the child.
   Status GetNextPassThrough(RuntimeState* state, RowBatch* row_batch);
@@ -107,31 +126,36 @@ class UnionNode : public ExecNode {
   /// GetNext() for the constant expression case.
   Status GetNextConst(RuntimeState* state, RowBatch* row_batch);
 
+  /// Evaluates exprs for the current child and materializes the results into 
'tuple_buf',
+  /// which is attached to 'dst_batch'. Runs until 'dst_batch' is at capacity, 
or all rows
+  /// have been consumed from the current child batch. Updates 
'child_row_idx_'.
+  void MaterializeBatch(RowBatch* dst_batch, uint8_t** tuple_buf);
+
   /// Evaluates 'exprs' over 'row', materializes the results in 'tuple_buf'.
   /// and appends the new tuple to 'dst_batch'. Increments 
'num_rows_returned_'.
-  inline void MaterializeExprs(const std::vector<ExprContext*>& exprs,
+  void MaterializeExprs(const std::vector<ExprContext*>& exprs,
       TupleRow* row, uint8_t* tuple_buf, RowBatch* dst_batch);
 
   /// Returns true if the child at 'child_idx' can be passed through.
-  inline bool IsChildPassthrough(int child_idx) const {
+  bool IsChildPassthrough(int child_idx) const {
     DCHECK_LT(child_idx, children_.size());
     return child_idx < first_materialized_child_idx_;
   }
 
   /// Returns true if there are still rows to be returned from passthrough 
children.
-  inline bool HasMorePassthrough() const {
+  bool HasMorePassthrough() const {
     return child_idx_ < first_materialized_child_idx_;
   }
 
   /// Returns true if there are still rows to be returned from children that 
need
   /// materialization.
-  inline bool HasMoreMaterialized() const {
+  bool HasMoreMaterialized() const {
     return first_materialized_child_idx_ != children_.size() &&
         child_idx_ < children_.size();
   }
 
   /// Returns true if there are still rows to be returned from constant 
expressions.
-  inline bool HasMoreConst(const RuntimeState* state) const {
+  bool HasMoreConst(const RuntimeState* state) const {
     return state->instance_ctx().per_fragment_instance_idx == 0 &&
         const_expr_list_idx_ < const_expr_lists_.size();
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/75553165/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
 
b/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
index fa494ab..90c925d 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
@@ -580,9 +580,10 @@ BIGINT,BIGINT
 # IMPALA-3678: union in a subplan - passthrough should be disabled.
 select count(c.c_custkey), count(v.tot_price)
 from tpch_nested_parquet.customer c, (
-select sum(o_totalprice) tot_price from c.c_orders
-union
-select sum(o_totalprice) tot_price from c.c_orders) v;
+  select sum(o_totalprice) tot_price from c.c_orders
+  union
+  select sum(o_totalprice) tot_price from c.c_orders
+) v;
 ---- RESULTS
 150000,99996
 ---- TYPES

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/75553165/testdata/workloads/functional-query/queries/QueryTest/union.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/union.test 
b/testdata/workloads/functional-query/queries/QueryTest/union.test
index ae75c12..5783c24 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/union.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/union.test
@@ -1121,3 +1121,32 @@ select t1.bigint_col from alltypestiny t1 inner join
 ---- TYPES
 bigint
 =====
+---- QUERY
+# IMPALA-4883: The second union operand references a char column, which causes 
codegen
+# to fail and be disabled for that operand and all operands that follow it. 
For this query
+# codegen is enabled only for the first operand.
+select count(s) from (
+  select cast(id as string) as s from alltypestiny
+  union all
+  select cast(cl as string) as s from functional.chars_tiny
+  union all
+  select cast(id as string) as s from alltypestiny
+) t
+---- RESULTS
+24
+---- TYPES
+bigint
+=====
+---- QUERY
+# IMPALA-4883: Verify that the union limit is enforced correctly
+select count(b) from (
+  select bigint_col as b from alltypestiny limit 4
+  union all
+  (select bigint_col as b from alltypestiny limit 4)
+  limit 7
+) t
+---- RESULTS
+7
+---- TYPES
+bigint
+=====

Reply via email to