This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b42285f2de1 [featrue](expr) support common subexpression elimination 
be part (#32673)
b42285f2de1 is described below

commit b42285f2de112e8fda21d27dcacea17408fdbe6b
Author: Mryange <[email protected]>
AuthorDate: Mon Apr 1 10:20:35 2024 +0800

    [featrue](expr) support common subexpression elimination be part (#32673)
---
 be/src/exec/exec_node.cpp               | 83 ++++++++++++++++++++++++---------
 be/src/exec/exec_node.h                 | 24 ++++++++++
 be/src/pipeline/pipeline_x/operator.cpp | 52 ++++++++++++++++++---
 be/src/pipeline/pipeline_x/operator.h   | 42 +++++++++++++++++
 be/src/vec/core/block.cpp               |  9 ++++
 be/src/vec/core/block.h                 |  3 ++
 be/src/vec/exec/scan/vscanner.cpp       | 79 ++++++++++++++++++++-----------
 be/src/vec/exec/scan/vscanner.h         |  2 +
 gensrc/thrift/PlanNodes.thrift          |  5 +-
 9 files changed, 245 insertions(+), 54 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index ed032d09767..63b88aa9de2 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -89,6 +89,18 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, 
const DescriptorTbl
         _output_row_descriptor = std::make_unique<RowDescriptor>(
                 descs, std::vector {tnode.output_tuple_id}, std::vector 
{true});
     }
+    if (!tnode.intermediate_output_tuple_id_list.empty()) {
+        DCHECK(tnode.__isset.output_tuple_id) << " no final output tuple id";
+        // common subexpression elimination
+        DCHECK_EQ(tnode.intermediate_output_tuple_id_list.size(),
+                  tnode.intermediate_projections_list.size());
+        
_intermediate_output_row_descriptor.reserve(tnode.intermediate_output_tuple_id_list.size());
+        for (auto output_tuple_id : tnode.intermediate_output_tuple_id_list) {
+            _intermediate_output_row_descriptor.push_back(
+                    RowDescriptor(descs, std::vector {output_tuple_id}, 
std::vector {true}));
+        }
+    }
+
     _query_statistics = std::make_shared<QueryStatistics>();
 }
 
@@ -114,7 +126,15 @@ Status ExecNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
         DCHECK(tnode.__isset.output_tuple_id);
         
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.projections, 
_projections));
     }
-
+    if (!tnode.intermediate_projections_list.empty()) {
+        DCHECK(tnode.__isset.projections) << "no final projections";
+        
_intermediate_projections.reserve(tnode.intermediate_projections_list.size());
+        for (const auto& tnode_projections : 
tnode.intermediate_projections_list) {
+            vectorized::VExprContextSPtrs projections;
+            
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode_projections, 
projections));
+            _intermediate_projections.push_back(projections);
+        }
+    }
     return Status::OK();
 }
 
@@ -143,7 +163,12 @@ Status ExecNode::prepare(RuntimeState* state) {
         RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
     }
 
-    RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, 
intermediate_row_desc()));
+    for (int i = 0; i < _intermediate_projections.size(); i++) {
+        
RETURN_IF_ERROR(vectorized::VExpr::prepare(_intermediate_projections[i], state,
+                                                   intermediate_row_desc(i)));
+    }
+
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, 
projections_row_desc()));
 
     for (auto& i : _children) {
         RETURN_IF_ERROR(i->prepare(state));
@@ -155,6 +180,9 @@ Status ExecNode::alloc_resource(RuntimeState* state) {
     for (auto& conjunct : _conjuncts) {
         RETURN_IF_ERROR(conjunct->open(state));
     }
+    for (auto& projections : _intermediate_projections) {
+        RETURN_IF_ERROR(vectorized::VExpr::open(projections, state));
+    }
     RETURN_IF_ERROR(vectorized::VExpr::open(_projections, state));
     return Status::OK();
 }
@@ -514,6 +542,22 @@ std::string ExecNode::get_name() {
 Status ExecNode::do_projections(vectorized::Block* origin_block, 
vectorized::Block* output_block) {
     SCOPED_TIMER(_exec_timer);
     SCOPED_TIMER(_projection_timer);
+    const size_t rows = origin_block->rows();
+    if (rows == 0) {
+        return Status::OK();
+    }
+    vectorized::Block input_block = *origin_block;
+
+    std::vector<int> result_column_ids;
+    for (auto& projections : _intermediate_projections) {
+        result_column_ids.resize(projections.size());
+        for (int i = 0; i < projections.size(); i++) {
+            RETURN_IF_ERROR(projections[i]->execute(&input_block, 
&result_column_ids[i]));
+        }
+        input_block.shuffle_columns(result_column_ids);
+    }
+
+    DCHECK_EQ(rows, input_block.rows());
     auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, 
size_t rows) {
         if (to->is_nullable() && !from->is_nullable()) {
             if (_keep_origin || !from->is_exclusive()) {
@@ -535,29 +579,26 @@ Status ExecNode::do_projections(vectorized::Block* 
origin_block, vectorized::Blo
     using namespace vectorized;
     MutableBlock mutable_block =
             VectorizedUtils::build_mutable_mem_reuse_block(output_block, 
*_output_row_descriptor);
-    auto rows = origin_block->rows();
 
-    if (rows != 0) {
-        auto& mutable_columns = mutable_block.mutable_columns();
+    auto& mutable_columns = mutable_block.mutable_columns();
 
-        if (mutable_columns.size() != _projections.size()) {
-            return Status::InternalError(
-                    "Logical error during processing {}, output of projections 
{} mismatches with "
-                    "exec node output {}",
-                    this->get_name(), _projections.size(), 
mutable_columns.size());
-        }
+    if (mutable_columns.size() != _projections.size()) {
+        return Status::InternalError(
+                "Logical error during processing {}, output of projections {} 
mismatches with "
+                "exec node output {}",
+                this->get_name(), _projections.size(), mutable_columns.size());
+    }
 
-        for (int i = 0; i < mutable_columns.size(); ++i) {
-            auto result_column_id = -1;
-            RETURN_IF_ERROR(_projections[i]->execute(origin_block, 
&result_column_id));
-            auto column_ptr = origin_block->get_by_position(result_column_id)
-                                      
.column->convert_to_full_column_if_const();
-            //TODO: this is a quick fix, we need a new function like 
"change_to_nullable" to do it
-            insert_column_datas(mutable_columns[i], column_ptr, rows);
-        }
-        DCHECK(mutable_block.rows() == rows);
-        output_block->set_columns(std::move(mutable_columns));
+    for (int i = 0; i < mutable_columns.size(); ++i) {
+        auto result_column_id = -1;
+        RETURN_IF_ERROR(_projections[i]->execute(&input_block, 
&result_column_id));
+        auto column_ptr = input_block.get_by_position(result_column_id)
+                                  .column->convert_to_full_column_if_const();
+        //TODO: this is a quick fix, we need a new function like 
"change_to_nullable" to do it
+        insert_column_datas(mutable_columns[i], column_ptr, rows);
     }
+    DCHECK(mutable_block.rows() == rows);
+    output_block->set_columns(std::move(mutable_columns));
 
     return Status::OK();
 }
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index f2303068437..10b035835d7 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -220,6 +220,26 @@ public:
         return _output_row_descriptor ? *_output_row_descriptor : 
_row_descriptor;
     }
     virtual const RowDescriptor& intermediate_row_desc() const { return 
_row_descriptor; }
+
+    //  input expr -> intermediate_projections[0] -> 
intermediate_projections[1] -> intermediate_projections[2]    ... ->     final 
projections         ->         output expr
+    //  prepare        _row_descriptor          intermediate_row_desc[0]       
      intermediate_row_desc[1]            intermediate_row_desc.end()          
_output_row_descriptor
+
+    [[nodiscard]] const RowDescriptor& intermediate_row_desc(int idx) {
+        if (idx == 0) {
+            return intermediate_row_desc();
+        }
+        DCHECK((idx - 1) < _intermediate_output_row_descriptor.size());
+        return _intermediate_output_row_descriptor[idx - 1];
+    }
+
+    [[nodiscard]] const RowDescriptor& projections_row_desc() const {
+        if (_intermediate_output_row_descriptor.empty()) {
+            return intermediate_row_desc();
+        } else {
+            return _intermediate_output_row_descriptor.back();
+        }
+    }
+
     int64_t rows_returned() const { return _num_rows_returned; }
     int64_t limit() const { return _limit; }
     bool reached_limit() const { return _limit != -1 && _num_rows_returned >= 
_limit; }
@@ -270,6 +290,10 @@ protected:
     std::unique_ptr<RowDescriptor> _output_row_descriptor;
     vectorized::VExprContextSPtrs _projections;
 
+    std::vector<RowDescriptor> _intermediate_output_row_descriptor;
+    // Used in common subexpression elimination to compute intermediate 
results.
+    std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;
+
     /// Resource information sent from the frontend.
     const TBackendResourceProfile _resource_profile;
 
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 989b1ee00a5..4a16cb65a01 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -23,6 +23,8 @@
 #include <string>
 
 #include "common/logging.h"
+#include "common/status.h"
+#include "exec/exec_node.h"
 #include "pipeline/exec/aggregation_sink_operator.h"
 #include "pipeline/exec/aggregation_source_operator.h"
 #include "pipeline/exec/analytic_sink_operator.h"
@@ -123,10 +125,20 @@ Status OperatorXBase::init(const TPlanNode& tnode, 
RuntimeState* /*state*/) {
     }
 
     // create the projections expr
+
     if (tnode.__isset.projections) {
         DCHECK(tnode.__isset.output_tuple_id);
         
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.projections, 
_projections));
     }
+    if (!tnode.intermediate_projections_list.empty()) {
+        DCHECK(tnode.__isset.projections) << "no final projections";
+        
_intermediate_projections.reserve(tnode.intermediate_projections_list.size());
+        for (const auto& tnode_projections : 
tnode.intermediate_projections_list) {
+            vectorized::VExprContextSPtrs projections;
+            
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode_projections, 
projections));
+            _intermediate_projections.push_back(projections);
+        }
+    }
     return Status::OK();
 }
 
@@ -134,8 +146,11 @@ Status OperatorXBase::prepare(RuntimeState* state) {
     for (auto& conjunct : _conjuncts) {
         RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
     }
-
-    RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, 
intermediate_row_desc()));
+    for (int i = 0; i < _intermediate_projections.size(); i++) {
+        
RETURN_IF_ERROR(vectorized::VExpr::prepare(_intermediate_projections[i], state,
+                                                   intermediate_row_desc(i)));
+    }
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, 
projections_row_desc()));
 
     if (_child_x && !is_source()) {
         RETURN_IF_ERROR(_child_x->prepare(state));
@@ -149,6 +164,9 @@ Status OperatorXBase::open(RuntimeState* state) {
         RETURN_IF_ERROR(conjunct->open(state));
     }
     RETURN_IF_ERROR(vectorized::VExpr::open(_projections, state));
+    for (auto& projections : _intermediate_projections) {
+        RETURN_IF_ERROR(vectorized::VExpr::open(projections, state));
+    }
     if (_child_x && !is_source()) {
         RETURN_IF_ERROR(_child_x->open(state));
     }
@@ -175,7 +193,22 @@ Status OperatorXBase::do_projections(RuntimeState* state, 
vectorized::Block* ori
     auto* local_state = state->get_local_state(operator_id());
     SCOPED_TIMER(local_state->exec_time_counter());
     SCOPED_TIMER(local_state->_projection_timer);
+    const size_t rows = origin_block->rows();
+    if (rows == 0) {
+        return Status::OK();
+    }
+    vectorized::Block input_block = *origin_block;
 
+    std::vector<int> result_column_ids;
+    for (const auto& projections : _intermediate_projections) {
+        result_column_ids.resize(projections.size());
+        for (int i = 0; i < projections.size(); i++) {
+            RETURN_IF_ERROR(projections[i]->execute(&input_block, 
&result_column_ids[i]));
+        }
+        input_block.shuffle_columns(result_column_ids);
+    }
+
+    DCHECK_EQ(rows, input_block.rows());
     auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, 
size_t rows) {
         if (to->is_nullable() && !from->is_nullable()) {
             if (_keep_origin || !from->is_exclusive()) {
@@ -198,15 +231,13 @@ Status OperatorXBase::do_projections(RuntimeState* state, 
vectorized::Block* ori
     vectorized::MutableBlock mutable_block =
             
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
                                                                        
*_output_row_descriptor);
-    auto rows = origin_block->rows();
-
     if (rows != 0) {
         auto& mutable_columns = mutable_block.mutable_columns();
         DCHECK(mutable_columns.size() == local_state->_projections.size());
         for (int i = 0; i < mutable_columns.size(); ++i) {
             auto result_column_id = -1;
-            
RETURN_IF_ERROR(local_state->_projections[i]->execute(origin_block, 
&result_column_id));
-            auto column_ptr = origin_block->get_by_position(result_column_id)
+            
RETURN_IF_ERROR(local_state->_projections[i]->execute(&input_block, 
&result_column_id));
+            auto column_ptr = input_block.get_by_position(result_column_id)
                                       
.column->convert_to_full_column_if_const();
             insert_column_datas(mutable_columns[i], column_ptr, rows);
         }
@@ -365,6 +396,15 @@ Status 
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
     for (size_t i = 0; i < _projections.size(); i++) {
         RETURN_IF_ERROR(_parent->_projections[i]->clone(state, 
_projections[i]));
     }
+    
_intermediate_projections.resize(_parent->_intermediate_projections.size());
+    for (int i = 0; i < _parent->_intermediate_projections.size(); i++) {
+        
_intermediate_projections[i].resize(_parent->_intermediate_projections[i].size());
+        for (int j = 0; j < _parent->_intermediate_projections[i].size(); j++) 
{
+            RETURN_IF_ERROR(_parent->_intermediate_projections[i][j]->clone(
+                    state, _intermediate_projections[i][j]));
+        }
+    }
+
     _rows_returned_counter =
             ADD_COUNTER_WITH_LEVEL(_runtime_profile, "RowsProduced", 
TUnit::UNIT, 1);
     _blocks_returned_counter =
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index b1667affe61..aa2bf7aa5e0 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -135,6 +135,9 @@ protected:
     RuntimeState* _state = nullptr;
     vectorized::VExprContextSPtrs _conjuncts;
     vectorized::VExprContextSPtrs _projections;
+    // Used in common subexpression elimination to compute intermediate 
results.
+    std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;
+
     bool _closed = false;
     vectorized::Block _origin_block;
 };
@@ -155,6 +158,22 @@ public:
         if (tnode.__isset.output_tuple_id) {
             _output_row_descriptor.reset(new RowDescriptor(descs, 
{tnode.output_tuple_id}, {true}));
         }
+        if (tnode.__isset.output_tuple_id) {
+            _output_row_descriptor = std::make_unique<RowDescriptor>(
+                    descs, std::vector {tnode.output_tuple_id}, std::vector 
{true});
+        }
+        if (!tnode.intermediate_output_tuple_id_list.empty()) {
+            DCHECK(tnode.__isset.output_tuple_id) << " no final output tuple 
id";
+            // common subexpression elimination
+            DCHECK_EQ(tnode.intermediate_output_tuple_id_list.size(),
+                      tnode.intermediate_projections_list.size());
+            _intermediate_output_row_descriptor.reserve(
+                    tnode.intermediate_output_tuple_id_list.size());
+            for (auto output_tuple_id : 
tnode.intermediate_output_tuple_id_list) {
+                _intermediate_output_row_descriptor.push_back(
+                        RowDescriptor(descs, std::vector {output_tuple_id}, 
std::vector {true}));
+            }
+        }
     }
 
     OperatorXBase(ObjectPool* pool, int node_id, int operator_id)
@@ -247,6 +266,25 @@ public:
         return _row_descriptor;
     }
 
+    //  input expr -> intermediate_projections[0] -> 
intermediate_projections[1] -> intermediate_projections[2]    ... ->     final 
projections         ->         output expr
+    //  prepare        _row_descriptor          intermediate_row_desc[0]       
      intermediate_row_desc[1]            intermediate_row_desc.end()          
_output_row_descriptor
+
+    [[nodiscard]] const RowDescriptor& intermediate_row_desc(int idx) {
+        if (idx == 0) {
+            return intermediate_row_desc();
+        }
+        DCHECK((idx - 1) < _intermediate_output_row_descriptor.size());
+        return _intermediate_output_row_descriptor[idx - 1];
+    }
+
+    [[nodiscard]] const RowDescriptor& projections_row_desc() const {
+        if (_intermediate_output_row_descriptor.empty()) {
+            return intermediate_row_desc();
+        } else {
+            return _intermediate_output_row_descriptor.back();
+        }
+    }
+
     [[nodiscard]] std::string debug_string() const override { return ""; }
 
     virtual std::string debug_string(int indentation_level = 0) const;
@@ -318,6 +356,10 @@ protected:
     std::unique_ptr<RowDescriptor> _output_row_descriptor = nullptr;
     vectorized::VExprContextSPtrs _projections;
 
+    std::vector<RowDescriptor> _intermediate_output_row_descriptor;
+    // Used in common subexpression elimination to compute intermediate 
results.
+    std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;
+
     /// Resource information sent from the frontend.
     const TBackendResourceProfile _resource_profile;
 
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index c93bfb11f09..1d8d3e83801 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -719,6 +719,15 @@ void Block::swap(Block&& other) noexcept {
     row_same_bit = std::move(other.row_same_bit);
 }
 
+void Block::shuffle_columns(const std::vector<int>& result_column_ids) {
+    Container tmp_data;
+    tmp_data.reserve(result_column_ids.size());
+    for (const int result_column_id : result_column_ids) {
+        tmp_data.push_back(data[result_column_id]);
+    }
+    swap(Block {tmp_data});
+}
+
 void Block::update_hash(SipHash& hash) const {
     for (size_t row_no = 0, num_rows = rows(); row_no < num_rows; ++row_no) {
         for (const auto& col : data) {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index ce32cc5cf39..d6567de0a44 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -234,6 +234,9 @@ public:
     void swap(Block& other) noexcept;
     void swap(Block&& other) noexcept;
 
+    // Shuffle columns in place based on the result_column_ids
+    void shuffle_columns(const std::vector<int>& result_column_ids);
+
     // Default column size = -1 means clear all column in block
     // Else clear column [0, column_size) delete column [column_size, 
data.size)
     void clear_column_data(int column_size = -1) noexcept;
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index 7354b9e085f..de0b6b45691 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -20,6 +20,7 @@
 #include <glog/logging.h>
 
 #include "common/config.h"
+#include "exec/exec_node.h"
 #include "pipeline/exec/scan_operator.h"
 #include "runtime/descriptors.h"
 #include "util/defer_op.h"
@@ -69,6 +70,19 @@ Status VScanner::prepare(RuntimeState* state, const 
VExprContextSPtrs& conjuncts
         }
     }
 
+    const auto& intermediate_projections =
+            _parent ? _parent->_intermediate_projections : 
_local_state->_intermediate_projections;
+    if (!intermediate_projections.empty()) {
+        _intermediate_projections.resize(intermediate_projections.size());
+        for (int i = 0; i < intermediate_projections.size(); i++) {
+            
_intermediate_projections[i].resize(intermediate_projections[i].size());
+            for (int j = 0; j < intermediate_projections[i].size(); j++) {
+                RETURN_IF_ERROR(intermediate_projections[i][j]->clone(
+                        state, _intermediate_projections[i][j]));
+            }
+        }
+    }
+
     return Status::OK();
 }
 
@@ -172,42 +186,55 @@ Status VScanner::_filter_output_block(Block* block) {
 }
 
 Status VScanner::_do_projections(vectorized::Block* origin_block, 
vectorized::Block* output_block) {
-    auto projection_timer = _parent ? _parent->_projection_timer : 
_local_state->_projection_timer;
-    auto exec_timer = _parent ? _parent->_exec_timer : 
_local_state->_exec_timer;
+    auto& projection_timer = _parent ? _parent->_projection_timer : 
_local_state->_projection_timer;
+    auto& exec_timer = _parent ? _parent->_exec_timer : 
_local_state->_exec_timer;
     SCOPED_TIMER(exec_timer);
     SCOPED_TIMER(projection_timer);
 
+    const size_t rows = origin_block->rows();
+    if (rows == 0) {
+        return Status::OK();
+    }
+    vectorized::Block input_block = *origin_block;
+
+    std::vector<int> result_column_ids;
+    for (auto& projections : _intermediate_projections) {
+        result_column_ids.resize(projections.size());
+        for (int i = 0; i < projections.size(); i++) {
+            RETURN_IF_ERROR(projections[i]->execute(&input_block, 
&result_column_ids[i]));
+        }
+        input_block.shuffle_columns(result_column_ids);
+    }
+
+    DCHECK_EQ(rows, input_block.rows());
     MutableBlock mutable_block =
             VectorizedUtils::build_mutable_mem_reuse_block(output_block, 
*_output_row_descriptor);
-    auto rows = origin_block->rows();
 
-    if (rows != 0) {
-        auto& mutable_columns = mutable_block.mutable_columns();
+    auto& mutable_columns = mutable_block.mutable_columns();
 
-        if (mutable_columns.size() != _projections.size()) {
-            return Status::InternalError(
-                    "Logical error in scanner, output of projections {} 
mismatches with "
-                    "scanner output {}",
-                    _projections.size(), mutable_columns.size());
-        }
+    if (mutable_columns.size() != _projections.size()) {
+        return Status::InternalError(
+                "Logical error in scanner, output of projections {} mismatches 
with "
+                "scanner output {}",
+                _projections.size(), mutable_columns.size());
+    }
 
-        for (int i = 0; i < mutable_columns.size(); ++i) {
-            auto result_column_id = -1;
-            RETURN_IF_ERROR(_projections[i]->execute(origin_block, 
&result_column_id));
-            auto column_ptr = origin_block->get_by_position(result_column_id)
-                                      
.column->convert_to_full_column_if_const();
-            //TODO: this is a quick fix, we need a new function like 
"change_to_nullable" to do it
-            if (mutable_columns[i]->is_nullable() xor 
column_ptr->is_nullable()) {
-                DCHECK(mutable_columns[i]->is_nullable() && 
!column_ptr->is_nullable());
-                reinterpret_cast<ColumnNullable*>(mutable_columns[i].get())
-                        ->insert_range_from_not_nullable(*column_ptr, 0, rows);
-            } else {
-                mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
-            }
+    for (int i = 0; i < mutable_columns.size(); ++i) {
+        auto result_column_id = -1;
+        RETURN_IF_ERROR(_projections[i]->execute(&input_block, 
&result_column_id));
+        auto column_ptr = input_block.get_by_position(result_column_id)
+                                  .column->convert_to_full_column_if_const();
+        //TODO: this is a quick fix, we need a new function like 
"change_to_nullable" to do it
+        if (mutable_columns[i]->is_nullable() xor column_ptr->is_nullable()) {
+            DCHECK(mutable_columns[i]->is_nullable() && 
!column_ptr->is_nullable());
+            reinterpret_cast<ColumnNullable*>(mutable_columns[i].get())
+                    ->insert_range_from_not_nullable(*column_ptr, 0, rows);
+        } else {
+            mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
         }
-        DCHECK(mutable_block.rows() == rows);
-        output_block->set_columns(std::move(mutable_columns));
     }
+    DCHECK(mutable_block.rows() == rows);
+    output_block->set_columns(std::move(mutable_columns));
 
     return Status::OK();
 }
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index cf91446c4f6..ba953192507 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -203,6 +203,8 @@ protected:
     // It includes predicate in SQL and runtime filters.
     VExprContextSPtrs _conjuncts;
     VExprContextSPtrs _projections;
+    // Used in common subexpression elimination to compute intermediate 
results.
+    std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;
     vectorized::Block _origin_block;
 
     VExprContextSPtrs _common_expr_ctxs_push_down;
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 2fadcdae538..d88ab993363 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1294,10 +1294,13 @@ struct TPlanNode {
   49: optional i64 push_down_count
 
   50: optional list<list<Exprs.TExpr>> distribute_expr_lists
-  
+  // projections is final projections, which means projecting into results and 
materializing them into the output block.
   101: optional list<Exprs.TExpr> projections
   102: optional Types.TTupleId output_tuple_id
   103: optional TPartitionSortNode partition_sort_node
+  // Intermediate projections will not materialize into the output block.
+  104: optional list<list<Exprs.TExpr>> intermediate_projections_list
+  105: optional list<Types.TTupleId> intermediate_output_tuple_id_list
 }
 
 // A flattened representation of a tree of PlanNodes, obtained by depth-first


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to