This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b42285f2de1 [featrue](expr) support common subexpression elimination
be part (#32673)
b42285f2de1 is described below
commit b42285f2de112e8fda21d27dcacea17408fdbe6b
Author: Mryange <[email protected]>
AuthorDate: Mon Apr 1 10:20:35 2024 +0800
[featrue](expr) support common subexpression elimination be part (#32673)
---
be/src/exec/exec_node.cpp | 83 ++++++++++++++++++++++++---------
be/src/exec/exec_node.h | 24 ++++++++++
be/src/pipeline/pipeline_x/operator.cpp | 52 ++++++++++++++++++---
be/src/pipeline/pipeline_x/operator.h | 42 +++++++++++++++++
be/src/vec/core/block.cpp | 9 ++++
be/src/vec/core/block.h | 3 ++
be/src/vec/exec/scan/vscanner.cpp | 79 ++++++++++++++++++++-----------
be/src/vec/exec/scan/vscanner.h | 2 +
gensrc/thrift/PlanNodes.thrift | 5 +-
9 files changed, 245 insertions(+), 54 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index ed032d09767..63b88aa9de2 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -89,6 +89,18 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl
_output_row_descriptor = std::make_unique<RowDescriptor>(
descs, std::vector {tnode.output_tuple_id}, std::vector
{true});
}
+ if (!tnode.intermediate_output_tuple_id_list.empty()) {
+ DCHECK(tnode.__isset.output_tuple_id) << " no final output tuple id";
+ // common subexpression elimination
+ DCHECK_EQ(tnode.intermediate_output_tuple_id_list.size(),
+ tnode.intermediate_projections_list.size());
+
_intermediate_output_row_descriptor.reserve(tnode.intermediate_output_tuple_id_list.size());
+ for (auto output_tuple_id : tnode.intermediate_output_tuple_id_list) {
+ _intermediate_output_row_descriptor.push_back(
+ RowDescriptor(descs, std::vector {output_tuple_id},
std::vector {true}));
+ }
+ }
+
_query_statistics = std::make_shared<QueryStatistics>();
}
@@ -114,7 +126,15 @@ Status ExecNode::init(const TPlanNode& tnode,
RuntimeState* state) {
DCHECK(tnode.__isset.output_tuple_id);
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.projections,
_projections));
}
-
+ if (!tnode.intermediate_projections_list.empty()) {
+ DCHECK(tnode.__isset.projections) << "no final projections";
+
_intermediate_projections.reserve(tnode.intermediate_projections_list.size());
+ for (const auto& tnode_projections :
tnode.intermediate_projections_list) {
+ vectorized::VExprContextSPtrs projections;
+
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode_projections,
projections));
+ _intermediate_projections.push_back(projections);
+ }
+ }
return Status::OK();
}
@@ -143,7 +163,12 @@ Status ExecNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
}
- RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state,
intermediate_row_desc()));
+ for (int i = 0; i < _intermediate_projections.size(); i++) {
+
RETURN_IF_ERROR(vectorized::VExpr::prepare(_intermediate_projections[i], state,
+ intermediate_row_desc(i)));
+ }
+
+ RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state,
projections_row_desc()));
for (auto& i : _children) {
RETURN_IF_ERROR(i->prepare(state));
@@ -155,6 +180,9 @@ Status ExecNode::alloc_resource(RuntimeState* state) {
for (auto& conjunct : _conjuncts) {
RETURN_IF_ERROR(conjunct->open(state));
}
+ for (auto& projections : _intermediate_projections) {
+ RETURN_IF_ERROR(vectorized::VExpr::open(projections, state));
+ }
RETURN_IF_ERROR(vectorized::VExpr::open(_projections, state));
return Status::OK();
}
@@ -514,6 +542,22 @@ std::string ExecNode::get_name() {
Status ExecNode::do_projections(vectorized::Block* origin_block,
vectorized::Block* output_block) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_projection_timer);
+ const size_t rows = origin_block->rows();
+ if (rows == 0) {
+ return Status::OK();
+ }
+ vectorized::Block input_block = *origin_block;
+
+ std::vector<int> result_column_ids;
+ for (auto& projections : _intermediate_projections) {
+ result_column_ids.resize(projections.size());
+ for (int i = 0; i < projections.size(); i++) {
+ RETURN_IF_ERROR(projections[i]->execute(&input_block,
&result_column_ids[i]));
+ }
+ input_block.shuffle_columns(result_column_ids);
+ }
+
+ DCHECK_EQ(rows, input_block.rows());
auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from,
size_t rows) {
if (to->is_nullable() && !from->is_nullable()) {
if (_keep_origin || !from->is_exclusive()) {
@@ -535,29 +579,26 @@ Status ExecNode::do_projections(vectorized::Block*
origin_block, vectorized::Blo
using namespace vectorized;
MutableBlock mutable_block =
VectorizedUtils::build_mutable_mem_reuse_block(output_block,
*_output_row_descriptor);
- auto rows = origin_block->rows();
- if (rows != 0) {
- auto& mutable_columns = mutable_block.mutable_columns();
+ auto& mutable_columns = mutable_block.mutable_columns();
- if (mutable_columns.size() != _projections.size()) {
- return Status::InternalError(
- "Logical error during processing {}, output of projections
{} mismatches with "
- "exec node output {}",
- this->get_name(), _projections.size(),
mutable_columns.size());
- }
+ if (mutable_columns.size() != _projections.size()) {
+ return Status::InternalError(
+ "Logical error during processing {}, output of projections {}
mismatches with "
+ "exec node output {}",
+ this->get_name(), _projections.size(), mutable_columns.size());
+ }
- for (int i = 0; i < mutable_columns.size(); ++i) {
- auto result_column_id = -1;
- RETURN_IF_ERROR(_projections[i]->execute(origin_block,
&result_column_id));
- auto column_ptr = origin_block->get_by_position(result_column_id)
-
.column->convert_to_full_column_if_const();
- //TODO: this is a quick fix, we need a new function like
"change_to_nullable" to do it
- insert_column_datas(mutable_columns[i], column_ptr, rows);
- }
- DCHECK(mutable_block.rows() == rows);
- output_block->set_columns(std::move(mutable_columns));
+ for (int i = 0; i < mutable_columns.size(); ++i) {
+ auto result_column_id = -1;
+ RETURN_IF_ERROR(_projections[i]->execute(&input_block,
&result_column_id));
+ auto column_ptr = input_block.get_by_position(result_column_id)
+ .column->convert_to_full_column_if_const();
+ //TODO: this is a quick fix, we need a new function like
"change_to_nullable" to do it
+ insert_column_datas(mutable_columns[i], column_ptr, rows);
}
+ DCHECK(mutable_block.rows() == rows);
+ output_block->set_columns(std::move(mutable_columns));
return Status::OK();
}
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index f2303068437..10b035835d7 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -220,6 +220,26 @@ public:
return _output_row_descriptor ? *_output_row_descriptor :
_row_descriptor;
}
virtual const RowDescriptor& intermediate_row_desc() const { return
_row_descriptor; }
+
+ // input expr -> intermediate_projections[0] ->
intermediate_projections[1] -> intermediate_projections[2] ... -> final
projections -> output expr
+ // prepare _row_descriptor intermediate_row_desc[0]
intermediate_row_desc[1] intermediate_row_desc.end()
_output_row_descriptor
+
+ [[nodiscard]] const RowDescriptor& intermediate_row_desc(int idx) {
+ if (idx == 0) {
+ return intermediate_row_desc();
+ }
+ DCHECK((idx - 1) < _intermediate_output_row_descriptor.size());
+ return _intermediate_output_row_descriptor[idx - 1];
+ }
+
+ [[nodiscard]] const RowDescriptor& projections_row_desc() const {
+ if (_intermediate_output_row_descriptor.empty()) {
+ return intermediate_row_desc();
+ } else {
+ return _intermediate_output_row_descriptor.back();
+ }
+ }
+
int64_t rows_returned() const { return _num_rows_returned; }
int64_t limit() const { return _limit; }
bool reached_limit() const { return _limit != -1 && _num_rows_returned >=
_limit; }
@@ -270,6 +290,10 @@ protected:
std::unique_ptr<RowDescriptor> _output_row_descriptor;
vectorized::VExprContextSPtrs _projections;
+ std::vector<RowDescriptor> _intermediate_output_row_descriptor;
+ // Used in common subexpression elimination to compute intermediate
results.
+ std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;
+
/// Resource information sent from the frontend.
const TBackendResourceProfile _resource_profile;
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 989b1ee00a5..4a16cb65a01 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -23,6 +23,8 @@
#include <string>
#include "common/logging.h"
+#include "common/status.h"
+#include "exec/exec_node.h"
#include "pipeline/exec/aggregation_sink_operator.h"
#include "pipeline/exec/aggregation_source_operator.h"
#include "pipeline/exec/analytic_sink_operator.h"
@@ -123,10 +125,20 @@ Status OperatorXBase::init(const TPlanNode& tnode,
RuntimeState* /*state*/) {
}
// create the projections expr
+
if (tnode.__isset.projections) {
DCHECK(tnode.__isset.output_tuple_id);
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.projections,
_projections));
}
+ if (!tnode.intermediate_projections_list.empty()) {
+ DCHECK(tnode.__isset.projections) << "no final projections";
+
_intermediate_projections.reserve(tnode.intermediate_projections_list.size());
+ for (const auto& tnode_projections :
tnode.intermediate_projections_list) {
+ vectorized::VExprContextSPtrs projections;
+
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode_projections,
projections));
+ _intermediate_projections.push_back(projections);
+ }
+ }
return Status::OK();
}
@@ -134,8 +146,11 @@ Status OperatorXBase::prepare(RuntimeState* state) {
for (auto& conjunct : _conjuncts) {
RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
}
-
- RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state,
intermediate_row_desc()));
+ for (int i = 0; i < _intermediate_projections.size(); i++) {
+
RETURN_IF_ERROR(vectorized::VExpr::prepare(_intermediate_projections[i], state,
+ intermediate_row_desc(i)));
+ }
+ RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state,
projections_row_desc()));
if (_child_x && !is_source()) {
RETURN_IF_ERROR(_child_x->prepare(state));
@@ -149,6 +164,9 @@ Status OperatorXBase::open(RuntimeState* state) {
RETURN_IF_ERROR(conjunct->open(state));
}
RETURN_IF_ERROR(vectorized::VExpr::open(_projections, state));
+ for (auto& projections : _intermediate_projections) {
+ RETURN_IF_ERROR(vectorized::VExpr::open(projections, state));
+ }
if (_child_x && !is_source()) {
RETURN_IF_ERROR(_child_x->open(state));
}
@@ -175,7 +193,22 @@ Status OperatorXBase::do_projections(RuntimeState* state,
vectorized::Block* ori
auto* local_state = state->get_local_state(operator_id());
SCOPED_TIMER(local_state->exec_time_counter());
SCOPED_TIMER(local_state->_projection_timer);
+ const size_t rows = origin_block->rows();
+ if (rows == 0) {
+ return Status::OK();
+ }
+ vectorized::Block input_block = *origin_block;
+ std::vector<int> result_column_ids;
+ for (const auto& projections : _intermediate_projections) {
+ result_column_ids.resize(projections.size());
+ for (int i = 0; i < projections.size(); i++) {
+ RETURN_IF_ERROR(projections[i]->execute(&input_block,
&result_column_ids[i]));
+ }
+ input_block.shuffle_columns(result_column_ids);
+ }
+
+ DCHECK_EQ(rows, input_block.rows());
auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from,
size_t rows) {
if (to->is_nullable() && !from->is_nullable()) {
if (_keep_origin || !from->is_exclusive()) {
@@ -198,15 +231,13 @@ Status OperatorXBase::do_projections(RuntimeState* state,
vectorized::Block* ori
vectorized::MutableBlock mutable_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
*_output_row_descriptor);
- auto rows = origin_block->rows();
-
if (rows != 0) {
auto& mutable_columns = mutable_block.mutable_columns();
DCHECK(mutable_columns.size() == local_state->_projections.size());
for (int i = 0; i < mutable_columns.size(); ++i) {
auto result_column_id = -1;
-
RETURN_IF_ERROR(local_state->_projections[i]->execute(origin_block,
&result_column_id));
- auto column_ptr = origin_block->get_by_position(result_column_id)
+
RETURN_IF_ERROR(local_state->_projections[i]->execute(&input_block,
&result_column_id));
+ auto column_ptr = input_block.get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
insert_column_datas(mutable_columns[i], column_ptr, rows);
}
@@ -365,6 +396,15 @@ Status
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
for (size_t i = 0; i < _projections.size(); i++) {
RETURN_IF_ERROR(_parent->_projections[i]->clone(state,
_projections[i]));
}
+
_intermediate_projections.resize(_parent->_intermediate_projections.size());
+ for (int i = 0; i < _parent->_intermediate_projections.size(); i++) {
+
_intermediate_projections[i].resize(_parent->_intermediate_projections[i].size());
+ for (int j = 0; j < _parent->_intermediate_projections[i].size(); j++)
{
+ RETURN_IF_ERROR(_parent->_intermediate_projections[i][j]->clone(
+ state, _intermediate_projections[i][j]));
+ }
+ }
+
_rows_returned_counter =
ADD_COUNTER_WITH_LEVEL(_runtime_profile, "RowsProduced",
TUnit::UNIT, 1);
_blocks_returned_counter =
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index b1667affe61..aa2bf7aa5e0 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -135,6 +135,9 @@ protected:
RuntimeState* _state = nullptr;
vectorized::VExprContextSPtrs _conjuncts;
vectorized::VExprContextSPtrs _projections;
+ // Used in common subexpression elimination to compute intermediate
results.
+ std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;
+
bool _closed = false;
vectorized::Block _origin_block;
};
@@ -155,6 +158,22 @@ public:
if (tnode.__isset.output_tuple_id) {
_output_row_descriptor.reset(new RowDescriptor(descs,
{tnode.output_tuple_id}, {true}));
}
+ if (tnode.__isset.output_tuple_id) {
+ _output_row_descriptor = std::make_unique<RowDescriptor>(
+ descs, std::vector {tnode.output_tuple_id}, std::vector
{true});
+ }
+ if (!tnode.intermediate_output_tuple_id_list.empty()) {
+ DCHECK(tnode.__isset.output_tuple_id) << " no final output tuple
id";
+ // common subexpression elimination
+ DCHECK_EQ(tnode.intermediate_output_tuple_id_list.size(),
+ tnode.intermediate_projections_list.size());
+ _intermediate_output_row_descriptor.reserve(
+ tnode.intermediate_output_tuple_id_list.size());
+ for (auto output_tuple_id :
tnode.intermediate_output_tuple_id_list) {
+ _intermediate_output_row_descriptor.push_back(
+ RowDescriptor(descs, std::vector {output_tuple_id},
std::vector {true}));
+ }
+ }
}
OperatorXBase(ObjectPool* pool, int node_id, int operator_id)
@@ -247,6 +266,25 @@ public:
return _row_descriptor;
}
+ // input expr -> intermediate_projections[0] ->
intermediate_projections[1] -> intermediate_projections[2] ... -> final
projections -> output expr
+ // prepare _row_descriptor intermediate_row_desc[0]
intermediate_row_desc[1] intermediate_row_desc.end()
_output_row_descriptor
+
+ [[nodiscard]] const RowDescriptor& intermediate_row_desc(int idx) {
+ if (idx == 0) {
+ return intermediate_row_desc();
+ }
+ DCHECK((idx - 1) < _intermediate_output_row_descriptor.size());
+ return _intermediate_output_row_descriptor[idx - 1];
+ }
+
+ [[nodiscard]] const RowDescriptor& projections_row_desc() const {
+ if (_intermediate_output_row_descriptor.empty()) {
+ return intermediate_row_desc();
+ } else {
+ return _intermediate_output_row_descriptor.back();
+ }
+ }
+
[[nodiscard]] std::string debug_string() const override { return ""; }
virtual std::string debug_string(int indentation_level = 0) const;
@@ -318,6 +356,10 @@ protected:
std::unique_ptr<RowDescriptor> _output_row_descriptor = nullptr;
vectorized::VExprContextSPtrs _projections;
+ std::vector<RowDescriptor> _intermediate_output_row_descriptor;
+ // Used in common subexpression elimination to compute intermediate
results.
+ std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;
+
/// Resource information sent from the frontend.
const TBackendResourceProfile _resource_profile;
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index c93bfb11f09..1d8d3e83801 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -719,6 +719,15 @@ void Block::swap(Block&& other) noexcept {
row_same_bit = std::move(other.row_same_bit);
}
+void Block::shuffle_columns(const std::vector<int>& result_column_ids) {
+ Container tmp_data;
+ tmp_data.reserve(result_column_ids.size());
+ for (const int result_column_id : result_column_ids) {
+ tmp_data.push_back(data[result_column_id]);
+ }
+ swap(Block {tmp_data});
+}
+
void Block::update_hash(SipHash& hash) const {
for (size_t row_no = 0, num_rows = rows(); row_no < num_rows; ++row_no) {
for (const auto& col : data) {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index ce32cc5cf39..d6567de0a44 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -234,6 +234,9 @@ public:
void swap(Block& other) noexcept;
void swap(Block&& other) noexcept;
+ // Shuffle columns in place based on the result_column_ids
+ void shuffle_columns(const std::vector<int>& result_column_ids);
+
// Default column size = -1 means clear all column in block
// Else clear column [0, column_size) delete column [column_size,
data.size)
void clear_column_data(int column_size = -1) noexcept;
diff --git a/be/src/vec/exec/scan/vscanner.cpp
b/be/src/vec/exec/scan/vscanner.cpp
index 7354b9e085f..de0b6b45691 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -20,6 +20,7 @@
#include <glog/logging.h>
#include "common/config.h"
+#include "exec/exec_node.h"
#include "pipeline/exec/scan_operator.h"
#include "runtime/descriptors.h"
#include "util/defer_op.h"
@@ -69,6 +70,19 @@ Status VScanner::prepare(RuntimeState* state, const
VExprContextSPtrs& conjuncts
}
}
+ const auto& intermediate_projections =
+ _parent ? _parent->_intermediate_projections :
_local_state->_intermediate_projections;
+ if (!intermediate_projections.empty()) {
+ _intermediate_projections.resize(intermediate_projections.size());
+ for (int i = 0; i < intermediate_projections.size(); i++) {
+
_intermediate_projections[i].resize(intermediate_projections[i].size());
+ for (int j = 0; j < intermediate_projections[i].size(); j++) {
+ RETURN_IF_ERROR(intermediate_projections[i][j]->clone(
+ state, _intermediate_projections[i][j]));
+ }
+ }
+ }
+
return Status::OK();
}
@@ -172,42 +186,55 @@ Status VScanner::_filter_output_block(Block* block) {
}
Status VScanner::_do_projections(vectorized::Block* origin_block,
vectorized::Block* output_block) {
- auto projection_timer = _parent ? _parent->_projection_timer :
_local_state->_projection_timer;
- auto exec_timer = _parent ? _parent->_exec_timer :
_local_state->_exec_timer;
+ auto& projection_timer = _parent ? _parent->_projection_timer :
_local_state->_projection_timer;
+ auto& exec_timer = _parent ? _parent->_exec_timer :
_local_state->_exec_timer;
SCOPED_TIMER(exec_timer);
SCOPED_TIMER(projection_timer);
+ const size_t rows = origin_block->rows();
+ if (rows == 0) {
+ return Status::OK();
+ }
+ vectorized::Block input_block = *origin_block;
+
+ std::vector<int> result_column_ids;
+ for (auto& projections : _intermediate_projections) {
+ result_column_ids.resize(projections.size());
+ for (int i = 0; i < projections.size(); i++) {
+ RETURN_IF_ERROR(projections[i]->execute(&input_block,
&result_column_ids[i]));
+ }
+ input_block.shuffle_columns(result_column_ids);
+ }
+
+ DCHECK_EQ(rows, input_block.rows());
MutableBlock mutable_block =
VectorizedUtils::build_mutable_mem_reuse_block(output_block,
*_output_row_descriptor);
- auto rows = origin_block->rows();
- if (rows != 0) {
- auto& mutable_columns = mutable_block.mutable_columns();
+ auto& mutable_columns = mutable_block.mutable_columns();
- if (mutable_columns.size() != _projections.size()) {
- return Status::InternalError(
- "Logical error in scanner, output of projections {}
mismatches with "
- "scanner output {}",
- _projections.size(), mutable_columns.size());
- }
+ if (mutable_columns.size() != _projections.size()) {
+ return Status::InternalError(
+ "Logical error in scanner, output of projections {} mismatches
with "
+ "scanner output {}",
+ _projections.size(), mutable_columns.size());
+ }
- for (int i = 0; i < mutable_columns.size(); ++i) {
- auto result_column_id = -1;
- RETURN_IF_ERROR(_projections[i]->execute(origin_block,
&result_column_id));
- auto column_ptr = origin_block->get_by_position(result_column_id)
-
.column->convert_to_full_column_if_const();
- //TODO: this is a quick fix, we need a new function like
"change_to_nullable" to do it
- if (mutable_columns[i]->is_nullable() xor
column_ptr->is_nullable()) {
- DCHECK(mutable_columns[i]->is_nullable() &&
!column_ptr->is_nullable());
- reinterpret_cast<ColumnNullable*>(mutable_columns[i].get())
- ->insert_range_from_not_nullable(*column_ptr, 0, rows);
- } else {
- mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
- }
+ for (int i = 0; i < mutable_columns.size(); ++i) {
+ auto result_column_id = -1;
+ RETURN_IF_ERROR(_projections[i]->execute(&input_block,
&result_column_id));
+ auto column_ptr = input_block.get_by_position(result_column_id)
+ .column->convert_to_full_column_if_const();
+ //TODO: this is a quick fix, we need a new function like
"change_to_nullable" to do it
+ if (mutable_columns[i]->is_nullable() xor column_ptr->is_nullable()) {
+ DCHECK(mutable_columns[i]->is_nullable() &&
!column_ptr->is_nullable());
+ reinterpret_cast<ColumnNullable*>(mutable_columns[i].get())
+ ->insert_range_from_not_nullable(*column_ptr, 0, rows);
+ } else {
+ mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
}
- DCHECK(mutable_block.rows() == rows);
- output_block->set_columns(std::move(mutable_columns));
}
+ DCHECK(mutable_block.rows() == rows);
+ output_block->set_columns(std::move(mutable_columns));
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index cf91446c4f6..ba953192507 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -203,6 +203,8 @@ protected:
// It includes predicate in SQL and runtime filters.
VExprContextSPtrs _conjuncts;
VExprContextSPtrs _projections;
+ // Used in common subexpression elimination to compute intermediate
results.
+ std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;
vectorized::Block _origin_block;
VExprContextSPtrs _common_expr_ctxs_push_down;
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 2fadcdae538..d88ab993363 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1294,10 +1294,13 @@ struct TPlanNode {
49: optional i64 push_down_count
50: optional list<list<Exprs.TExpr>> distribute_expr_lists
-
+ // projections is final projections, which means projecting into results and
materializing them into the output block.
101: optional list<Exprs.TExpr> projections
102: optional Types.TTupleId output_tuple_id
103: optional TPartitionSortNode partition_sort_node
+ // Intermediate projections will not materialize into the output block.
+ 104: optional list<list<Exprs.TExpr>> intermediate_projections_list
+ 105: optional list<Types.TTupleId> intermediate_output_tuple_id_list
}
// A flattened representation of a tree of PlanNodes, obtained by depth-first
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]