Repository: incubator-impala
Updated Branches:
refs/heads/master baba8960b -> 58b206ff0
IMPALA-4883: Union Codegen
For each non-passthrough child of the Union node, codegen the loop that
does per row tuple materialization.
Testing:
Ran test_queries.py test locally in exchaustive mode.
Benchmark:
Ran a local benchmark on a local 10 GB TPCDS dataset on an unpartitioned
store_sales table.
SELECT
COUNT(c),
COUNT(ss_customer_sk),
COUNT(ss_cdemo_sk),
COUNT(ss_hdemo_sk),
COUNT(ss_addr_sk),
COUNT(ss_store_sk),
COUNT(ss_promo_sk),
COUNT(ss_ticket_number),
COUNT(ss_quantity),
COUNT(ss_wholesale_cost),
COUNT(ss_list_price),
COUNT(ss_sales_price),
COUNT(ss_ext_discount_amt),
COUNT(ss_ext_sales_price),
COUNT(ss_ext_wholesale_cost),
COUNT(ss_ext_list_price),
COUNT(ss_ext_tax),
COUNT(ss_coupon_amt),
COUNT(ss_net_paid),
COUNT(ss_net_paid_inc_tax),
COUNT(ss_net_profit),
COUNT(ss_sold_date_sk)
FROM (
select fnv_hash(ss_sold_time_sk) c, * from
tpcds_10_parquet.store_sales_unpartitioned
union all
select fnv_hash(ss_sold_time_sk) c, * from
tpcds_10_parquet.store_sales_unpartitioned
union all
select fnv_hash(ss_sold_time_sk) c, * from
tpcds_10_parquet.store_sales_unpartitioned
union all
select fnv_hash(ss_sold_time_sk) c, * from
tpcds_10_parquet.store_sales_unpartitioned
union all
select fnv_hash(ss_sold_time_sk) c, * from
tpcds_10_parquet.store_sales_unpartitioned
union all
select fnv_hash(ss_sold_time_sk) c, * from
tpcds_10_parquet.store_sales_unpartitioned
union all
select fnv_hash(ss_sold_time_sk) c, * from
tpcds_10_parquet.store_sales_unpartitioned
union all
select fnv_hash(ss_sold_time_sk) c, * from
tpcds_10_parquet.store_sales_unpartitioned
union all
select fnv_hash(ss_sold_time_sk) c, * from
tpcds_10_parquet.store_sales_unpartitioned
union all
select fnv_hash(ss_sold_time_sk) c, * from
tpcds_10_parquet.store_sales_unpartitioned
) t
Before: 39s704ms
Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem
Est. Peak Mem Detail
------------------------------------------------------------------------------------------------------------------------------
13:AGGREGATE 1 194.504us 194.504us 1 1 28.00 KB
-1.00 B FINALIZE
12:EXCHANGE 1 17.284us 17.284us 3 1 0
-1.00 B UNPARTITIONED
11:AGGREGATE 3 2s202ms 2s934ms 3 1 115.00 KB
10.00 MB
00:UNION 3 32s514ms 34s926ms 288.01M 288.01M 3.08 MB
0
|--02:SCAN HDFS 3 158.373ms 216.085ms 28.80M 28.80M 489.71 MB
1.88 GB tpcds_10_parquet.store_sales
|--03:SCAN HDFS 3 167.002ms 171.738ms 28.80M 28.80M 489.74 MB
1.88 GB tpcds_10_parquet.store_sales
|--04:SCAN HDFS 3 125.331ms 145.496ms 28.80M 28.80M 489.57 MB
1.88 GB tpcds_10_parquet.store_sales
|--05:SCAN HDFS 3 148.478ms 194.311ms 28.80M 28.80M 489.69 MB
1.88 GB tpcds_10_parquet.store_sales
|--06:SCAN HDFS 3 143.995ms 162.781ms 28.80M 28.80M 489.57 MB
1.88 GB tpcds_10_parquet.store_sales
|--07:SCAN HDFS 3 169.731ms 250.201ms 28.80M 28.80M 489.58 MB
1.88 GB tpcds_10_parquet.store_sales
|--08:SCAN HDFS 3 164.110ms 254.374ms 28.80M 28.80M 489.61 MB
1.88 GB tpcds_10_parquet.store_sales
|--09:SCAN HDFS 3 135.631ms 162.117ms 28.80M 28.80M 489.63 MB
1.88 GB tpcds_10_parquet.store_sales
|--10:SCAN HDFS 3 138.736ms 167.778ms 28.80M 28.80M 489.67 MB
1.88 GB tpcds_10_parquet.store_sales
01:SCAN HDFS 3 202.015ms 248.728ms 28.80M 28.80M 489.68 MB
1.88 GB tpcds_10_parquet.store_sales
After: 20s177ms
Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem
Est. Peak Mem Detail
------------------------------------------------------------------------------------------------------------------------------
13:AGGREGATE 1 174.617us 174.617us 1 1 28.00 KB
-1.00 B FINALIZE
12:EXCHANGE 1 16.693us 16.693us 3 1 0
-1.00 B UNPARTITIONED
11:AGGREGATE 3 2s830ms 3s615ms 3 1 115.00 KB
10.00 MB
00:UNION 3 4s296ms 5s258ms 288.01M 288.01M 3.08 MB
0
|--02:SCAN HDFS 3 1s212ms 1s340ms 28.80M 28.80M 488.82 MB
1.88 GB tpcds_10_parquet.store_sales
|--03:SCAN HDFS 3 1s387ms 1s570ms 28.80M 28.80M 489.37 MB
1.88 GB tpcds_10_parquet.store_sales
|--04:SCAN HDFS 3 1s224ms 1s347ms 28.80M 28.80M 487.22 MB
1.88 GB tpcds_10_parquet.store_sales
|--05:SCAN HDFS 3 1s245ms 1s321ms 28.80M 28.80M 489.25 MB
1.88 GB tpcds_10_parquet.store_sales
|--06:SCAN HDFS 3 1s232ms 1s505ms 28.80M 28.80M 484.21 MB
1.88 GB tpcds_10_parquet.store_sales
|--07:SCAN HDFS 3 1s348ms 1s518ms 28.80M 28.80M 488.20 MB
1.88 GB tpcds_10_parquet.store_sales
|--08:SCAN HDFS 3 1s231ms 1s335ms 28.80M 28.80M 483.58 MB
1.88 GB tpcds_10_parquet.store_sales
|--09:SCAN HDFS 3 1s179ms 1s349ms 28.80M 28.80M 482.76 MB
1.88 GB tpcds_10_parquet.store_sales
|--10:SCAN HDFS 3 1s121ms 1s154ms 28.80M 28.80M 486.59 MB
1.88 GB tpcds_10_parquet.store_sales
01:SCAN HDFS 3 1s284ms 1s523ms 28.80M 28.80M 486.70 MB
1.88 GB tpcds_10_parquet.store_sales
Change-Id: Ib4107d27582ff5416172810364a6e76d3d93c439
Reviewed-on: http://gerrit.cloudera.org:8080/6459
Reviewed-by: Alex Behm <[email protected]>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/75553165
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/75553165
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/75553165
Branch: refs/heads/master
Commit: 75553165eed6c5decaa7fd0bfa3ae4d537aeb7ff
Parents: baba896
Author: Taras Bobrovytsky <[email protected]>
Authored: Tue Mar 21 18:21:23 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Fri Apr 21 04:53:09 2017 +0000
----------------------------------------------------------------------
be/src/codegen/gen_ir_descriptions.py | 4 +-
be/src/codegen/impala-ir.cc | 1 +
be/src/exec/CMakeLists.txt | 1 +
be/src/exec/union-node-ir.cc | 51 +++++++
be/src/exec/union-node.cc | 133 ++++++++++++-------
be/src/exec/union-node.h | 34 ++++-
.../queries/QueryTest/nested-types-subplan.test | 7 +-
.../queries/QueryTest/union.test | 29 ++++
8 files changed, 203 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/75553165/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py
b/be/src/codegen/gen_ir_descriptions.py
index 4b62dfe..8a82218 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -214,7 +214,9 @@ ir_functions = [
["MEMPOOL_CHECKED_ALLOCATE",
"_ZN6impala7MemPool8AllocateILb1EEEPhli"],
["RUNTIME_FILTER_EVAL",
- "_ZNK6impala13RuntimeFilter4EvalEPvRKNS_10ColumnTypeE"]
+ "_ZNK6impala13RuntimeFilter4EvalEPvRKNS_10ColumnTypeE"],
+ ["UNION_MATERIALIZE_BATCH",
+ "_ZN6impala9UnionNode16MaterializeBatchEPNS_8RowBatchEPPh"]
]
enums_preamble = '\
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/75553165/be/src/codegen/impala-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc
index 5ff816b..6c8a9d5 100644
--- a/be/src/codegen/impala-ir.cc
+++ b/be/src/codegen/impala-ir.cc
@@ -37,6 +37,7 @@
#include "exec/partitioned-hash-join-builder-ir.cc"
#include "exec/partitioned-hash-join-node-ir.cc"
#include "exec/topn-node-ir.cc"
+#include "exec/union-node-ir.cc"
#include "exprs/aggregate-functions-ir.cc"
#include "exprs/bit-byte-functions-ir.cc"
#include "exprs/cast-functions-ir.cc"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/75553165/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 1193660..8632790 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -98,6 +98,7 @@ add_library(Exec
topn-node.cc
topn-node-ir.cc
union-node.cc
+ union-node-ir.cc
unnest-node.cc
)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/75553165/be/src/exec/union-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node-ir.cc b/be/src/exec/union-node-ir.cc
new file mode 100644
index 0000000..38dfc97
--- /dev/null
+++ b/be/src/exec/union-node-ir.cc
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/union-node.h"
+#include "runtime/tuple-row.h"
+
+using namespace impala;
+
+void IR_ALWAYS_INLINE UnionNode::MaterializeExprs(const
std::vector<ExprContext*>& exprs,
+ TupleRow* row, uint8_t* tuple_buf, RowBatch* dst_batch) {
+ DCHECK(!dst_batch->AtCapacity());
+ Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf);
+ TupleRow* dst_row = dst_batch->GetRow(dst_batch->AddRow());
+ dst_tuple->MaterializeExprs<false, false>(row, *tuple_desc_, exprs,
tuple_pool_.get());
+ dst_row->SetTuple(0, dst_tuple);
+ dst_batch->CommitLastRow();
+}
+
+void UnionNode::MaterializeBatch(RowBatch* dst_batch, uint8_t** tuple_buf) {
+ // Take all references to member variables out of the loop to reduce the
number of
+ // loads and stores.
+ RowBatch* child_batch = child_batch_.get();
+ int tuple_byte_size = tuple_desc_->byte_size();
+ uint8_t* cur_tuple = *tuple_buf;
+ const std::vector<ExprContext*>& child_exprs = child_expr_lists_[child_idx_];
+
+ int num_rows_to_process = std::min(child_batch->num_rows() - child_row_idx_,
+ dst_batch->capacity() - dst_batch->num_rows());
+ FOREACH_ROW_LIMIT(child_batch, child_row_idx_, num_rows_to_process,
batch_iter) {
+ TupleRow* child_row = batch_iter.Get();
+ MaterializeExprs(child_exprs, child_row, cur_tuple, dst_batch);
+ cur_tuple += tuple_byte_size;
+ }
+
+ child_row_idx_ += num_rows_to_process;
+ *tuple_buf = cur_tuple;
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/75553165/be/src/exec/union-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc
index bff7435..0da1760 100644
--- a/be/src/exec/union-node.cc
+++ b/be/src/exec/union-node.cc
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include "codegen/llvm-codegen.h"
#include "exec/union-node.h"
#include "exprs/expr.h"
#include "exprs/expr-context.h"
@@ -27,7 +28,8 @@
#include "common/names.h"
-namespace impala {
+using namespace llvm;
+using namespace impala;
UnionNode::UnionNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
@@ -35,6 +37,7 @@ UnionNode::UnionNode(ObjectPool* pool, const TPlanNode& tnode,
tuple_id_(tnode.union_node.tuple_id),
tuple_desc_(nullptr),
first_materialized_child_idx_(tnode.union_node.first_materialized_child_idx),
+ tuple_pool_(nullptr),
child_idx_(0),
child_batch_(nullptr),
child_row_idx_(0),
@@ -68,6 +71,8 @@ Status UnionNode::Prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::Prepare(state));
tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
DCHECK(tuple_desc_ != nullptr);
+ tuple_pool_.reset(new MemPool(mem_tracker()));
+ codegend_union_materialize_batch_fns_.resize(child_expr_lists_.size());
// Prepare const expr lists.
for (const vector<ExprContext*>& exprs : const_expr_lists_) {
@@ -86,6 +91,50 @@ Status UnionNode::Prepare(RuntimeState* state) {
return Status::OK();
}
+void UnionNode::Codegen(RuntimeState* state) {
+ DCHECK(state->ShouldCodegen());
+ ExecNode::Codegen(state);
+ if (IsNodeCodegenDisabled()) return;
+
+ LlvmCodeGen* codegen = state->codegen();
+ DCHECK(codegen != nullptr);
+ std::stringstream codegen_message;
+ Status codegen_status;
+ for (int i = 0; i < child_expr_lists_.size(); ++i) {
+ if (IsChildPassthrough(i)) continue;
+
+ llvm::Function* tuple_materialize_exprs_fn;
+ codegen_status = Tuple::CodegenMaterializeExprs(codegen, false,
*tuple_desc_,
+ child_expr_lists_[i], tuple_pool_.get(), &tuple_materialize_exprs_fn);
+ if (!codegen_status.ok()) {
+ // Codegen may fail in some corner cases (e.g. we don't handle
TYPE_CHAR). If this
+ // happens, abort codegen for this and the remaining children.
+ codegen_message << "Codegen failed for child: " << children_[i]->id();
+ break;
+ }
+
+ // Get a copy of the function. This function will be modified and added to
the
+ // vector of functions.
+ Function* union_materialize_batch_fn =
+ codegen->GetFunction(IRFunction::UNION_MATERIALIZE_BATCH, true);
+ DCHECK(union_materialize_batch_fn != nullptr);
+
+ int replaced = codegen->ReplaceCallSites(union_materialize_batch_fn,
+ tuple_materialize_exprs_fn, Tuple::MATERIALIZE_EXPRS_SYMBOL);
+ DCHECK_EQ(replaced, 1) << LlvmCodeGen::Print(union_materialize_batch_fn);
+
+ union_materialize_batch_fn = codegen->FinalizeFunction(
+ union_materialize_batch_fn);
+ DCHECK(union_materialize_batch_fn != nullptr);
+
+ // Add the function to Jit and to the vector of codegened functions.
+ codegen->AddFunctionToJit(union_materialize_batch_fn,
+
reinterpret_cast<void**>(&(codegend_union_materialize_batch_fns_.data()[i])));
+ }
+ runtime_profile()->AddCodegenMsg(
+ codegen_status.ok(), codegen_status, codegen_message.str());
+}
+
Status UnionNode::Open(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_ERROR(ExecNode::Open(state));
@@ -114,12 +163,6 @@ Status UnionNode::GetNextPassThrough(RuntimeState* state,
RowBatch* row_batch) {
if (child_eos_) RETURN_IF_ERROR(child(child_idx_)->Open(state));
DCHECK_EQ(row_batch->num_rows(), 0);
RETURN_IF_ERROR(child(child_idx_)->GetNext(state, row_batch, &child_eos_));
- if (limit_ != -1 && num_rows_returned_ + row_batch->num_rows() > limit_) {
- row_batch->set_num_rows(limit_ - num_rows_returned_);
- }
- num_rows_returned_ += row_batch->num_rows();
- DCHECK(limit_ == -1 || num_rows_returned_ <= limit_);
- COUNTER_SET(rows_returned_counter_, num_rows_returned_);
if (child_eos_) {
// Even though the child is at eos, it's not OK to Close() it here. Once
we close
// the child, the row batches that it produced are invalid. Marking the
batch as
@@ -143,10 +186,8 @@ Status UnionNode::GetNextMaterialized(RuntimeState* state,
RowBatch* row_batch)
memset(tuple_buf, 0, tuple_buf_size);
while (HasMoreMaterialized() && !row_batch->AtCapacity()) {
- // There are only 2 ways of getting out of this loop:
- // 1. The loop ends normally when we are either done iterating over the
children that
- // need materialization or the row batch is at capacity.
- // 2. We return from the function from inside the loop if limit is reached.
+ // The loop runs until we are either done iterating over the children that
require
+ // materialization, or the row batch is at capacity.
DCHECK(!IsChildPassthrough(child_idx_));
// Child row batch was either never set or we're moving on to a different
child.
if (child_batch_.get() == nullptr) {
@@ -163,12 +204,6 @@ Status UnionNode::GetNextMaterialized(RuntimeState* state,
RowBatch* row_batch)
}
while (!row_batch->AtCapacity()) {
- // This loop fetches row batches from a single child and materializes
each output
- // row, until one of these conditions:
- // 1. The loop ends normally if the row batch is at capacity.
- // 2. We break out of the loop if all the rows were consumed from the
current child
- // and we are moving on to the next child.
- // 3. We return from the function from inside the loop if the limit is
reached.
DCHECK(child_batch_.get() != nullptr);
DCHECK_LE(child_row_idx_, child_batch_->num_rows());
if (child_row_idx_ == child_batch_->num_rows()) {
@@ -184,21 +219,17 @@ Status UnionNode::GetNextMaterialized(RuntimeState*
state, RowBatch* row_batch)
// try again.
if (child_batch_->num_rows() == 0) continue;
}
- DCHECK_LT(child_row_idx_, child_batch_->num_rows());
- TupleRow* child_row = child_batch_->GetRow(child_row_idx_);
- MaterializeExprs(child_expr_lists_[child_idx_], child_row, tuple_buf,
row_batch);
- tuple_buf += tuple_desc_->byte_size();
- ++child_row_idx_;
- if (ReachedLimit()) {
- COUNTER_SET(rows_returned_counter_, num_rows_returned_);
- // It's OK to close the child here even if we are inside a subplan.
- child_batch_.reset();
- child(child_idx_)->Close(state);
- return Status::OK();
+ DCHECK_EQ(codegend_union_materialize_batch_fns_.size(),
children_.size());
+ if (codegend_union_materialize_batch_fns_[child_idx_] == nullptr) {
+ MaterializeBatch(row_batch, &tuple_buf);
+ } else {
+ codegend_union_materialize_batch_fns_[child_idx_](this, row_batch,
&tuple_buf);
}
}
-
+ // It shouldn't be the case that we reached the limit because we shouldn't
have
+ // incremented 'num_rows_returned_' yet.
DCHECK(!ReachedLimit());
+
if (child_eos_ && child_row_idx_ == child_batch_->num_rows()) {
// Unless we are inside a subplan expecting to call Open()/GetNext() on
the child
// again, the child can be closed at this point.
@@ -225,15 +256,14 @@ Status UnionNode::GetNextConst(RuntimeState* state,
RowBatch* row_batch) {
RETURN_IF_ERROR(
row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buf_size,
&tuple_buf));
memset(tuple_buf, 0, tuple_buf_size);
- while (const_expr_list_idx_ < const_expr_lists_.size() &&
- !row_batch->AtCapacity() && !ReachedLimit()) {
+
+ while (const_expr_list_idx_ < const_expr_lists_.size() &&
!row_batch->AtCapacity()) {
MaterializeExprs(
const_expr_lists_[const_expr_list_idx_], nullptr, tuple_buf,
row_batch);
tuple_buf += tuple_desc_->byte_size();
++const_expr_list_idx_;
}
- COUNTER_SET(rows_returned_counter_, num_rows_returned_);
return Status::OK();
}
@@ -242,15 +272,22 @@ Status UnionNode::GetNext(RuntimeState* state, RowBatch*
row_batch, bool* eos) {
RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
+ // The tuple pool should be empty between GetNext() calls.
+ DCHECK_EQ(tuple_pool_.get()->GetTotalChunkSizes(), 0);
if (to_close_child_idx_ != -1) {
// The previous child needs to be closed if passthrough was enabled for
it. In the non
// passthrough case, the child was already closed in the previous call to
GetNext().
DCHECK(IsChildPassthrough(to_close_child_idx_));
+ DCHECK(!IsInSubplan());
child(to_close_child_idx_)->Close(state);
to_close_child_idx_ = -1;
}
+ // Save the number of rows in case GetNext() is called with a non-empty
batch, which can
+ // happen in a subplan.
+ int num_rows_before = row_batch->num_rows();
+
if (HasMorePassthrough()) {
RETURN_IF_ERROR(GetNextPassThrough(state, row_batch));
} else if (HasMoreMaterialized()) {
@@ -259,32 +296,33 @@ Status UnionNode::GetNext(RuntimeState* state, RowBatch*
row_batch, bool* eos) {
RETURN_IF_ERROR(GetNextConst(state, row_batch));
}
+ int num_rows_added = row_batch->num_rows() - num_rows_before;
+ DCHECK_GE(num_rows_added, 0);
+ if (limit_ != -1 && num_rows_returned_ + num_rows_added > limit_) {
+ // Truncate the row batch if we went over the limit.
+ num_rows_added = limit_ - num_rows_returned_;
+ row_batch->set_num_rows(num_rows_before + num_rows_added);
+ DCHECK_GE(num_rows_added, 0);
+ }
+ num_rows_returned_ += num_rows_added;
+
*eos = ReachedLimit() ||
(!HasMorePassthrough() && !HasMoreMaterialized() &&
!HasMoreConst(state));
+ // Attach the memory in the tuple pool (if any) to the row batch.
+ row_batch->tuple_data_pool()->AcquireData(tuple_pool_.get(), false);
+ COUNTER_SET(rows_returned_counter_, num_rows_returned_);
return Status::OK();
}
-void UnionNode::MaterializeExprs(const vector<ExprContext*>& exprs,
- TupleRow* row, uint8_t* tuple_buf, RowBatch* dst_batch) {
- DCHECK(!dst_batch->AtCapacity());
- Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf);
- TupleRow* dst_row = dst_batch->GetRow(dst_batch->AddRow());
- dst_tuple->MaterializeExprs<false, false>(row, *tuple_desc_,
- exprs, dst_batch->tuple_data_pool());
- dst_row->SetTuple(0, dst_tuple);
- dst_batch->CommitLastRow();
- ++num_rows_returned_;
-}
-
Status UnionNode::Reset(RuntimeState* state) {
child_idx_ = 0;
child_batch_.reset();
child_row_idx_ = 0;
child_eos_ = false;
const_expr_list_idx_ = 0;
- // Since passthrough is disabled in subplans, verify that there is no
passthrough
- // child that needs to be closed.
+ // Since passthrough is disabled in subplans, verify that there is no
passthrough child
+ // that needs to be closed.
DCHECK_EQ(to_close_child_idx_, -1);
return ExecNode::Reset(state);
}
@@ -298,7 +336,6 @@ void UnionNode::Close(RuntimeState* state) {
for (const vector<ExprContext*>& exprs : child_expr_lists_) {
Expr::Close(exprs, state);
}
+ if (tuple_pool_.get() != nullptr) tuple_pool_->FreeAll();
ExecNode::Close(state);
}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/75553165/be/src/exec/union-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node.h b/be/src/exec/union-node.h
index e1474d0..b1715f9 100644
--- a/be/src/exec/union-node.h
+++ b/be/src/exec/union-node.h
@@ -21,6 +21,7 @@
#include <boost/scoped_ptr.hpp>
+#include "codegen/impala-ir.h"
#include "exec/exec-node.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
@@ -46,6 +47,7 @@ class UnionNode : public ExecNode {
virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
virtual Status Prepare(RuntimeState* state);
+ virtual void Codegen(RuntimeState* state);
virtual Status Open(RuntimeState* state);
virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
virtual Status Reset(RuntimeState* state);
@@ -63,6 +65,13 @@ class UnionNode : public ExecNode {
/// materialized.
const int first_materialized_child_idx_;
+ /// Used by MaterializeExprs() to materialize var-len slots. The ownership
of the memory
+ /// in this pool should be transferred to the row batch at the end of each
GetNext()
+ /// call. The memory can't be attached to the row batch in
MaterializeExprs() because
+ /// the pointer to the mem pool is hard coded in the codegen'ed
MaterializeExprs().
+ /// TODO (IMPALA-5192): Remove this when no longer necessary in the future.
+ boost::scoped_ptr<MemPool> tuple_pool_;
+
/// Const exprs materialized by this node. These exprs don't refer to any
children.
/// Only materialized by the first fragment instance to avoid duplication.
std::vector<std::vector<ExprContext*>> const_expr_lists_;
@@ -83,6 +92,13 @@ class UnionNode : public ExecNode {
/// Index of current row in child_row_batch_.
int child_row_idx_;
+ typedef void (*UnionMaterializeBatchFn)(UnionNode*, RowBatch*, uint8_t**);
+ /// Vector of pointers to codegen'ed MaterializeBatch functions. The vector
contains one
+ /// function for each child. The size of the vector should be equal to the
number of
+ /// children. If a child is passthrough, there should be a NULL for that
child. If
+ /// Codegen is disabled, there should be a NULL for every child.
+ std::vector<UnionMaterializeBatchFn> codegend_union_materialize_batch_fns_;
+
/// Saved from the last to GetNext() on the current child.
bool child_eos_;
@@ -96,6 +112,9 @@ class UnionNode : public ExecNode {
/// END: Members that must be Reset()
/////////////////////////////////////////
+ /// The following GetNext* functions don't apply the limit. It must be
enforced by the
+ /// caller.
+
/// GetNext() for the passthrough case. We pass 'row_batch' directly into
the GetNext()
/// call on the child.
Status GetNextPassThrough(RuntimeState* state, RowBatch* row_batch);
@@ -107,31 +126,36 @@ class UnionNode : public ExecNode {
/// GetNext() for the constant expression case.
Status GetNextConst(RuntimeState* state, RowBatch* row_batch);
+ /// Evaluates exprs for the current child and materializes the results into
'tuple_buf',
+ /// which is attached to 'dst_batch'. Runs until 'dst_batch' is at capacity,
or all rows
+ /// have been consumed from the current child batch. Updates
'child_row_idx_'.
+ void MaterializeBatch(RowBatch* dst_batch, uint8_t** tuple_buf);
+
/// Evaluates 'exprs' over 'row', materializes the results in 'tuple_buf'.
/// and appends the new tuple to 'dst_batch'. Increments
'num_rows_returned_'.
- inline void MaterializeExprs(const std::vector<ExprContext*>& exprs,
+ void MaterializeExprs(const std::vector<ExprContext*>& exprs,
TupleRow* row, uint8_t* tuple_buf, RowBatch* dst_batch);
/// Returns true if the child at 'child_idx' can be passed through.
- inline bool IsChildPassthrough(int child_idx) const {
+ bool IsChildPassthrough(int child_idx) const {
DCHECK_LT(child_idx, children_.size());
return child_idx < first_materialized_child_idx_;
}
/// Returns true if there are still rows to be returned from passthrough
children.
- inline bool HasMorePassthrough() const {
+ bool HasMorePassthrough() const {
return child_idx_ < first_materialized_child_idx_;
}
/// Returns true if there are still rows to be returned from children that
need
/// materialization.
- inline bool HasMoreMaterialized() const {
+ bool HasMoreMaterialized() const {
return first_materialized_child_idx_ != children_.size() &&
child_idx_ < children_.size();
}
/// Returns true if there are still rows to be returned from constant
expressions.
- inline bool HasMoreConst(const RuntimeState* state) const {
+ bool HasMoreConst(const RuntimeState* state) const {
return state->instance_ctx().per_fragment_instance_idx == 0 &&
const_expr_list_idx_ < const_expr_lists_.size();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/75553165/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
----------------------------------------------------------------------
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
b/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
index fa494ab..90c925d 100644
---
a/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
+++
b/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
@@ -580,9 +580,10 @@ BIGINT,BIGINT
# IMPALA-3678: union in a subplan - passthrough should be disabled.
select count(c.c_custkey), count(v.tot_price)
from tpch_nested_parquet.customer c, (
-select sum(o_totalprice) tot_price from c.c_orders
-union
-select sum(o_totalprice) tot_price from c.c_orders) v;
+ select sum(o_totalprice) tot_price from c.c_orders
+ union
+ select sum(o_totalprice) tot_price from c.c_orders
+) v;
---- RESULTS
150000,99996
---- TYPES
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/75553165/testdata/workloads/functional-query/queries/QueryTest/union.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/union.test
b/testdata/workloads/functional-query/queries/QueryTest/union.test
index ae75c12..5783c24 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/union.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/union.test
@@ -1121,3 +1121,32 @@ select t1.bigint_col from alltypestiny t1 inner join
---- TYPES
bigint
=====
+---- QUERY
+# IMPALA-4883: The second union operand references a char column, which causes
codegen
+# to fail and be disabled for that operand and all operands that follow it.
For this query
+# codegen is enabled only for the first operand.
+select count(s) from (
+ select cast(id as string) as s from alltypestiny
+ union all
+ select cast(cl as string) as s from functional.chars_tiny
+ union all
+ select cast(id as string) as s from alltypestiny
+) t
+---- RESULTS
+24
+---- TYPES
+bigint
+=====
+---- QUERY
+# IMPALA-4883: Verify that the union limit is enforced correctly
+select count(b) from (
+ select bigint_col as b from alltypestiny limit 4
+ union all
+ (select bigint_col as b from alltypestiny limit 4)
+ limit 7
+) t
+---- RESULTS
+7
+---- TYPES
+bigint
+=====