This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a74ad1702 [feature](Nereids)add the ability of projection on each 
ExecNode and add column prune on OlapScan (#11842)
9a74ad1702 is described below

commit 9a74ad1702fdf850e758934d8237402fac60b0d6
Author: Kikyou1997 <[email protected]>
AuthorDate: Tue Aug 30 16:17:10 2022 +0800

    [feature](Nereids)add the ability of projection on each ExecNode and add 
column prune on OlapScan (#11842)
    
    We have added logical project before, but to actually finish the prune to 
reduce the data IO, we need to add related supports in translator and BE.
    This PR:
    - add projections on each ExecNode in BE
    - translate PhysicalProject into projections on PlanNode in FE
    - do column prune on ScanNode in FE
    
    Co-authored-by: HappenLee <[email protected]>
---
 be/src/exec/exec_node.cpp                          | 57 ++++++++++++++-
 be/src/exec/exec_node.h                            | 14 +++-
 be/src/runtime/plan_fragment_executor.cpp          |  5 +-
 be/src/vec/exec/join/vhash_join_node.cpp           |  8 ++-
 be/src/vec/exec/scan/new_olap_scanner.cpp          |  5 +-
 be/src/vec/exec/vaggregation_node.cpp              |  4 +-
 be/src/vec/exec/vanalytic_eval_node.cpp            |  5 +-
 be/src/vec/exec/vassert_num_rows_node.cpp          |  4 +-
 be/src/vec/exec/vblocking_join_node.cpp            |  5 +-
 be/src/vec/exec/vcross_join_node.cpp               |  5 +-
 be/src/vec/exec/vrepeat_node.cpp                   |  2 +-
 be/src/vec/exec/vselect_node.cpp                   |  5 +-
 be/src/vec/exec/vset_operation_node.cpp            |  7 +-
 be/src/vec/exec/vsort_node.cpp                     |  2 +-
 be/src/vec/exec/vtable_function_node.cpp           |  2 +-
 be/src/vec/exec/vunion_node.cpp                    |  7 +-
 be/src/vec/exprs/vexpr.cpp                         |  8 +--
 be/src/vec/functions/like.cpp                      | 11 ++-
 .../org/apache/doris/analysis/SlotDescriptor.java  |  6 ++
 .../glue/translator/ExpressionTranslator.java      |  3 +-
 .../glue/translator/PhysicalPlanTranslator.java    | 78 +++++++++++++-------
 .../glue/translator/PlanTranslatorContext.java     | 15 ++--
 .../org/apache/doris/nereids/jobs/JobType.java     |  2 +-
 .../doris/nereids/jobs/batch/RewriteJob.java       |  2 +
 .../doris/nereids/stats/StatsCalculator.java       |  2 +-
 .../nereids/trees/expressions/Expression.java      | 23 ------
 .../nereids/trees/expressions/SlotReference.java   |  4 --
 .../doris/nereids/trees/plans/AbstractPlan.java    | 12 ++++
 .../doris/nereids/trees/plans/GroupPlan.java       |  1 +
 .../org/apache/doris/nereids/trees/plans/Plan.java |  4 --
 .../trees/plans/physical/AbstractPhysicalPlan.java |  7 --
 .../nereids/trees/plans/visitor/PlanVisitor.java   |  2 +-
 .../org/apache/doris/planner/ExchangeNode.java     | 13 +++-
 .../org/apache/doris/planner/HashJoinNode.java     | 29 +++++++-
 .../org/apache/doris/planner/OlapScanNode.java     |  1 -
 .../java/org/apache/doris/planner/PlanNode.java    | 31 +++++++-
 .../apache/doris/planner/SingleNodePlanner.java    |  2 +-
 .../translator/PhysicalPlanTranslatorTest.java     | 82 ++++++++++++++++++++++
 .../apache/doris/nereids/util/PlanRewriter.java    |  4 +-
 .../doris/planner/DistributedPlannerTest.java      |  2 +
 40 files changed, 352 insertions(+), 129 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 0b643135b0..9a2b335728 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -150,9 +150,13 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& 
tnode, const DescriptorTbl
           _rows_returned_rate(nullptr),
           _memory_used_counter(nullptr),
           _get_next_span(),
-          _is_closed(false) {}
+          _is_closed(false) {
+    if (tnode.__isset.output_tuple_id) {
+        _output_row_descriptor.reset(new RowDescriptor(descs, 
{tnode.output_tuple_id}, {true}));
+    }
+}
 
-ExecNode::~ExecNode() {}
+ExecNode::~ExecNode() = default;
 
 void ExecNode::push_down_predicate(RuntimeState* state, 
std::list<ExprContext*>* expr_ctxs) {
     if (_type != TPlanNodeType::AGGREGATION_NODE) {
@@ -194,6 +198,13 @@ Status ExecNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
     }
     RETURN_IF_ERROR(Expr::create_expr_trees(_pool, tnode.conjuncts, 
&_conjunct_ctxs));
 
+    // create the projections expr
+    if (tnode.__isset.projections) {
+        DCHECK(tnode.__isset.output_tuple_id);
+        RETURN_IF_ERROR(
+                vectorized::VExpr::create_expr_trees(_pool, tnode.projections, 
&_projections));
+    }
+
     return Status::OK();
 }
 
@@ -220,6 +231,7 @@ Status ExecNode::prepare(RuntimeState* state) {
         typeid(*this) != typeid(doris::vectorized::NewOlapScanNode)) {
         RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, _row_descriptor));
     }
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, 
_row_descriptor));
 
     for (int i = 0; i < _children.size(); ++i) {
         RETURN_IF_ERROR(_children[i]->prepare(state));
@@ -239,6 +251,8 @@ Status ExecNode::open(RuntimeState* state) {
     } else {
         return Status::OK();
     }
+    RETURN_IF_ERROR(Expr::open(_conjunct_ctxs, state));
+    return vectorized::VExpr::open(_projections, state);
 }
 
 Status ExecNode::reset(RuntimeState* state) {
@@ -282,6 +296,7 @@ Status ExecNode::close(RuntimeState* state) {
         typeid(*this) != typeid(doris::vectorized::NewOlapScanNode)) {
         Expr::close(_conjunct_ctxs, state);
     }
+    vectorized::VExpr::close(_projections, state);
 
     if (_buffer_pool_client.is_registered()) {
         
state->exec_env()->buffer_pool()->DeregisterClient(&_buffer_pool_client);
@@ -769,4 +784,42 @@ std::string ExecNode::get_name() {
     return (_is_vec ? "V" : "") + print_plan_node_type(_type);
 }
 
+Status ExecNode::do_projections(vectorized::Block* origin_block, 
vectorized::Block* output_block) {
+    using namespace vectorized;
+    auto is_mem_reuse = output_block->mem_reuse();
+    MutableBlock mutable_block =
+            is_mem_reuse ? MutableBlock(output_block)
+                         : 
MutableBlock(VectorizedUtils::create_empty_columnswithtypename(
+                                   *_output_row_descriptor));
+    auto rows = origin_block->rows();
+
+    if (rows != 0) {
+        auto& mutable_columns = mutable_block.mutable_columns();
+        DCHECK(mutable_columns.size() == _projections.size());
+        for (int i = 0; i < mutable_columns.size(); ++i) {
+            auto result_column_id = -1;
+            RETURN_IF_ERROR(_projections[i]->execute(origin_block, 
&result_column_id));
+            auto column_ptr = origin_block->get_by_position(result_column_id)
+                                      
.column->convert_to_full_column_if_const();
+            mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
+        }
+
+        if (!is_mem_reuse) output_block->swap(mutable_block.to_block());
+        DCHECK(output_block->rows() == rows);
+    }
+
+    return Status::OK();
+}
+
+Status ExecNode::get_next_after_projects(RuntimeState* state, 
vectorized::Block* block, bool* eos) {
+    // delete the UNLIKELY after support new optimizers
+    if (UNLIKELY(_output_row_descriptor)) {
+        
_origin_block.clear_column_data(_row_descriptor.num_materialized_slots());
+        auto status = get_next(state, &_origin_block, eos);
+        if (UNLIKELY(!status.ok())) return status;
+        return do_projections(&_origin_block, block);
+    }
+    return get_next(state, block, eos);
+}
+
 } // namespace doris
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 5b700090b2..c9f4323dca 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -105,6 +105,9 @@ public:
     virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* 
eos);
     virtual Status get_next(RuntimeState* state, vectorized::Block* block, 
bool* eos);
 
+    // new interface to compatible new optimizers in FE
+    Status get_next_after_projects(RuntimeState* state, vectorized::Block* 
block, bool* eos);
+
     // Resets the stream of row batches to be retrieved by subsequent 
GetNext() calls.
     // Clears all internal state, returning this node to the state it was in 
after calling
     // Prepare() and before calling Open(). This function must not clear memory
@@ -179,7 +182,9 @@ public:
 
     int id() const { return _id; }
     TPlanNodeType::type type() const { return _type; }
-    virtual const RowDescriptor& row_desc() const { return _row_descriptor; }
+    virtual const RowDescriptor& row_desc() const {
+        return _output_row_descriptor ? *_output_row_descriptor : 
_row_descriptor;
+    }
     int64_t rows_returned() const { return _num_rows_returned; }
     int64_t limit() const { return _limit; }
     bool reached_limit() const { return _limit != -1 && _num_rows_returned >= 
_limit; }
@@ -221,6 +226,9 @@ protected:
     // and add block rows for profile
     void reached_limit(vectorized::Block* block, bool* eos);
 
+    /// Only use in vectorized exec engine try to do projections to trans 
_row_desc -> _output_row_desc
+    Status do_projections(vectorized::Block* origin_block, vectorized::Block* 
output_block);
+
     /// Extends blocking queue for row batches. Row batches have a property 
that
     /// they must be processed in the order they were produced, even in 
cancellation
     /// paths. Preceding row batches can contain ptrs to memory in subsequent 
row batches
@@ -276,6 +284,10 @@ protected:
 
     std::vector<ExecNode*> _children;
     RowDescriptor _row_descriptor;
+    vectorized::Block _origin_block;
+
+    std::unique_ptr<RowDescriptor> _output_row_descriptor;
+    std::vector<doris::vectorized::VExprContext*> _projections;
 
     /// Resource information sent from the frontend.
     const TBackendResourceProfile _resource_profile;
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 9346b947d3..6274b47174 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -339,8 +339,9 @@ Status 
PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block*
     while (!_done) {
         _block->clear_column_data(_plan->row_desc().num_materialized_slots());
         SCOPED_TIMER(profile()->total_time_counter());
-        RETURN_IF_ERROR_AND_CHECK_SPAN(_plan->get_next(_runtime_state.get(), 
_block.get(), &_done),
-                                       _plan->get_next_span(), _done);
+        RETURN_IF_ERROR_AND_CHECK_SPAN(
+                _plan->get_next_after_projects(_runtime_state.get(), 
_block.get(), &_done),
+                _plan->get_next_span(), _done);
 
         if (_block->rows() > 0) {
             COUNTER_UPDATE(_rows_produced_counter, _block->rows());
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 228e0020e8..05040fc2b6 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -892,6 +892,7 @@ Status HashJoinNode::prepare(RuntimeState* state) {
         RETURN_IF_ERROR((*_vother_join_conjunct_ptr)->prepare(state, 
_intermediate_row_desc));
     }
     RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, 
_intermediate_row_desc));
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, 
_intermediate_row_desc));
 
     // right table data types
     _right_table_data_types = 
VectorizedUtils::get_data_types(child(1)->row_desc());
@@ -937,8 +938,9 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* 
output_block, bool* eo
 
         do {
             SCOPED_TIMER(_probe_next_timer);
-            RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next(state, 
&_probe_block, &_probe_eos),
-                                           child(0)->get_next_span(), 
_probe_eos);
+            RETURN_IF_ERROR_AND_CHECK_SPAN(
+                    child(0)->get_next_after_projects(state, &_probe_block, 
&_probe_eos),
+                    child(0)->get_next_span(), _probe_eos);
         } while (_probe_block.rows() == 0 && !_probe_eos);
 
         probe_rows = _probe_block.rows();
@@ -1135,7 +1137,7 @@ Status HashJoinNode::_hash_table_build(RuntimeState* 
state) {
         block.clear_column_data();
         RETURN_IF_CANCELLED(state);
 
-        RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next(state, &block, &eos),
+        
RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next_after_projects(state, &block, 
&eos),
                                        child(1)->get_next_span(), eos);
         _mem_used += block.allocated_bytes();
 
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 00e9b2bc34..0b25890a50 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -175,9 +175,8 @@ Status NewOlapScanner::_init_tablet_reader_params(
                             _tablet_reader_params.delete_predicates.begin()));
 
     // Merge the columns in delete predicate that not in latest schema in to 
current tablet schema
-    for (auto& del_pred_pb : _tablet_reader_params.delete_predicates) {
-        _tablet_schema->merge_dropped_columns(
-                _tablet->tablet_schema(Version(del_pred_pb.version(), 
del_pred_pb.version())));
+    for (auto& del_pred_rs : _tablet_reader_params.delete_predicates) {
+        
_tablet_schema->merge_dropped_columns(_tablet->tablet_schema(del_pred_rs->version()));
     }
 
     // Range
diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index a724c46afe..ae6421e7a6 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -433,7 +433,7 @@ Status AggregationNode::open(RuntimeState* state) {
     while (!eos) {
         RETURN_IF_CANCELLED(state);
         release_block_memory(block);
-        RETURN_IF_ERROR_AND_CHECK_SPAN(_children[0]->get_next(state, &block, 
&eos),
+        
RETURN_IF_ERROR_AND_CHECK_SPAN(_children[0]->get_next_after_projects(state, 
&block, &eos),
                                        _children[0]->get_next_span(), eos);
         if (block.rows() == 0) {
             continue;
@@ -461,7 +461,7 @@ Status AggregationNode::get_next(RuntimeState* state, 
Block* block, bool* eos) {
         do {
             release_block_memory(_preagg_block);
             RETURN_IF_ERROR_AND_CHECK_SPAN(
-                    _children[0]->get_next(state, &_preagg_block, &child_eos),
+                    _children[0]->get_next_after_projects(state, 
&_preagg_block, &child_eos),
                     _children[0]->get_next_span(), child_eos);
         } while (_preagg_block.rows() == 0 && !child_eos);
 
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp 
b/be/src/vec/exec/vanalytic_eval_node.cpp
index dde7cb8453..fc5b224253 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -467,8 +467,9 @@ Status 
VAnalyticEvalNode::_fetch_next_block_data(RuntimeState* state) {
     Block block;
     RETURN_IF_CANCELLED(state);
     do {
-        RETURN_IF_ERROR_AND_CHECK_SPAN(_children[0]->get_next(state, &block, 
&_input_eos),
-                                       _children[0]->get_next_span(), 
_input_eos);
+        RETURN_IF_ERROR_AND_CHECK_SPAN(
+                _children[0]->get_next_after_projects(state, &block, 
&_input_eos),
+                _children[0]->get_next_span(), _input_eos);
     } while (!_input_eos && block.rows() == 0);
 
     if (_input_eos && block.rows() == 0) {
diff --git a/be/src/vec/exec/vassert_num_rows_node.cpp 
b/be/src/vec/exec/vassert_num_rows_node.cpp
index 28e1389bb7..b9239a3c53 100644
--- a/be/src/vec/exec/vassert_num_rows_node.cpp
+++ b/be/src/vec/exec/vassert_num_rows_node.cpp
@@ -51,8 +51,8 @@ Status VAssertNumRowsNode::get_next(RuntimeState* state, 
Block* block, bool* eos
     INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
                                  "VAssertNumRowsNode::get_next");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next(state, block, eos), 
child(0)->get_next_span(),
-                                   *eos);
+    RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, 
block, eos),
+                                   child(0)->get_next_span(), *eos);
     _num_rows_returned += block->rows();
     bool assert_res = false;
     switch (_assertion) {
diff --git a/be/src/vec/exec/vblocking_join_node.cpp 
b/be/src/vec/exec/vblocking_join_node.cpp
index c6d548de91..809da414f4 100644
--- a/be/src/vec/exec/vblocking_join_node.cpp
+++ b/be/src/vec/exec/vblocking_join_node.cpp
@@ -119,8 +119,9 @@ Status VBlockingJoinNode::open(RuntimeState* state) {
     // Seed left child in preparation for get_next().
     while (true) {
         release_block_memory(_left_block);
-        RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next(state, &_left_block, 
&_left_side_eos),
-                                       child(0)->get_next_span(), 
_left_side_eos);
+        RETURN_IF_ERROR_AND_CHECK_SPAN(
+                child(0)->get_next_after_projects(state, &_left_block, 
&_left_side_eos),
+                child(0)->get_next_span(), _left_side_eos);
         COUNTER_UPDATE(_left_child_row_counter, _left_block.rows());
         _left_block_pos = 0;
 
diff --git a/be/src/vec/exec/vcross_join_node.cpp 
b/be/src/vec/exec/vcross_join_node.cpp
index 4a8c2f3b17..c3b94f7561 100644
--- a/be/src/vec/exec/vcross_join_node.cpp
+++ b/be/src/vec/exec/vcross_join_node.cpp
@@ -60,7 +60,7 @@ Status VCrossJoinNode::construct_build_side(RuntimeState* 
state) {
         RETURN_IF_CANCELLED(state);
 
         Block block;
-        RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next(state, &block, &eos),
+        
RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next_after_projects(state, &block, 
&eos),
                                        child(1)->get_next_span(), eos);
         auto rows = block.rows();
         auto mem_usage = block.allocated_bytes();
@@ -117,7 +117,8 @@ Status VCrossJoinNode::get_next(RuntimeState* state, Block* 
block, bool* eos) {
                         release_block_memory(_left_block);
                         timer.stop();
                         RETURN_IF_ERROR_AND_CHECK_SPAN(
-                                child(0)->get_next(state, &_left_block, 
&_left_side_eos),
+                                child(0)->get_next_after_projects(state, 
&_left_block,
+                                                                  
&_left_side_eos),
                                 child(0)->get_next_span(), _left_side_eos);
                         timer.start();
                     } while (_left_block.rows() == 0 && !_left_side_eos);
diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp
index 3c180d417d..a2001cff15 100644
--- a/be/src/vec/exec/vrepeat_node.cpp
+++ b/be/src/vec/exec/vrepeat_node.cpp
@@ -167,7 +167,7 @@ Status VRepeatNode::get_next(RuntimeState* state, Block* 
block, bool* eos) {
     if (_child_block->rows() == 0) {
         while (_child_block->rows() == 0 && !_child_eos) {
             RETURN_IF_ERROR_AND_CHECK_SPAN(
-                    child(0)->get_next(state, _child_block.get(), &_child_eos),
+                    child(0)->get_next_after_projects(state, 
_child_block.get(), &_child_eos),
                     child(0)->get_next_span(), _child_eos);
         }
 
diff --git a/be/src/vec/exec/vselect_node.cpp b/be/src/vec/exec/vselect_node.cpp
index c69f997150..8f561f10a0 100644
--- a/be/src/vec/exec/vselect_node.cpp
+++ b/be/src/vec/exec/vselect_node.cpp
@@ -48,8 +48,9 @@ Status VSelectNode::get_next(RuntimeState* state, 
vectorized::Block* block, bool
     RETURN_IF_CANCELLED(state);
     do {
         RETURN_IF_CANCELLED(state);
-        RETURN_IF_ERROR_AND_CHECK_SPAN(_children[0]->get_next(state, block, 
&_child_eos),
-                                       _children[0]->get_next_span(), 
_child_eos);
+        RETURN_IF_ERROR_AND_CHECK_SPAN(
+                _children[0]->get_next_after_projects(state, block, 
&_child_eos),
+                _children[0]->get_next_span(), _child_eos);
         if (_child_eos) {
             *eos = true;
             break;
diff --git a/be/src/vec/exec/vset_operation_node.cpp 
b/be/src/vec/exec/vset_operation_node.cpp
index b46d9988c5..b95714513c 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -242,7 +242,7 @@ Status VSetOperationNode::hash_table_build(RuntimeState* 
state) {
         block.clear_column_data();
         SCOPED_TIMER(_build_timer);
         RETURN_IF_CANCELLED(state);
-        RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next(state, &block, &eos),
+        
RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, &block, 
&eos),
                                        child(0)->get_next_span(), eos);
 
         size_t allocated_bytes = block.allocated_bytes();
@@ -309,8 +309,9 @@ Status VSetOperationNode::process_probe_block(RuntimeState* 
state, int child_id,
     _probe_rows = 0;
 
     RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR_AND_CHECK_SPAN(child(child_id)->get_next(state, 
&_probe_block, eos),
-                                   child(child_id)->get_next_span(), *eos);
+    RETURN_IF_ERROR_AND_CHECK_SPAN(
+            child(child_id)->get_next_after_projects(state, &_probe_block, 
eos),
+            child(child_id)->get_next_span(), *eos);
     _probe_rows = _probe_block.rows();
     RETURN_IF_ERROR(extract_probe_column(_probe_block, _probe_columns, 
child_id));
     return Status::OK();
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index 73618b0b26..8749b2913e 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -125,7 +125,7 @@ Status VSortNode::sort_input(RuntimeState* state) {
     bool eos = false;
     do {
         Block block;
-        RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next(state, &block, &eos),
+        
RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, &block, 
&eos),
                                        child(0)->get_next_span(), eos);
         auto rows = block.rows();
 
diff --git a/be/src/vec/exec/vtable_function_node.cpp 
b/be/src/vec/exec/vtable_function_node.cpp
index 0852d9d966..49ff5d5a69 100644
--- a/be/src/vec/exec/vtable_function_node.cpp
+++ b/be/src/vec/exec/vtable_function_node.cpp
@@ -117,7 +117,7 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState* 
state, Block* output
         if (_child_block->rows() == 0) {
             while (_child_block->rows() == 0 && !_child_eos) {
                 RETURN_IF_ERROR_AND_CHECK_SPAN(
-                        child(0)->get_next(state, _child_block.get(), 
&_child_eos),
+                        child(0)->get_next_after_projects(state, 
_child_block.get(), &_child_eos),
                         child(0)->get_next_span(), _child_eos);
             }
             if (_child_eos && _child_block->rows() == 0) {
diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp
index 5ef738ba5b..e9de9c5bcb 100644
--- a/be/src/vec/exec/vunion_node.cpp
+++ b/be/src/vec/exec/vunion_node.cpp
@@ -106,8 +106,9 @@ Status VUnionNode::get_next_pass_through(RuntimeState* 
state, Block* block) {
         _child_eos = false;
     }
     DCHECK_EQ(block->rows(), 0);
-    RETURN_IF_ERROR_AND_CHECK_SPAN(child(_child_idx)->get_next(state, block, 
&_child_eos),
-                                   child(_child_idx)->get_next_span(), 
_child_eos);
+    RETURN_IF_ERROR_AND_CHECK_SPAN(
+            child(_child_idx)->get_next_after_projects(state, block, 
&_child_eos),
+            child(_child_idx)->get_next_span(), _child_eos);
     if (_child_eos) {
         // Even though the child is at eos, it's not OK to close() it here. 
Once we close
         // the child, the row batches that it produced are invalid. Marking 
the batch as
@@ -148,7 +149,7 @@ Status VUnionNode::get_next_materialized(RuntimeState* 
state, Block* block) {
         child_block.clear();
         // The first batch from each child is always fetched here.
         RETURN_IF_ERROR_AND_CHECK_SPAN(
-                child(_child_idx)->get_next(state, &child_block, &_child_eos),
+                child(_child_idx)->get_next_after_projects(state, 
&child_block, &_child_eos),
                 child(_child_idx)->get_next_span(), _child_eos);
         SCOPED_TIMER(_materialize_exprs_evaluate_timer);
         if (child_block.rows() > 0) {
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index 6d0de17dac..60a4ad5ce3 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -227,15 +227,15 @@ Status VExpr::create_expr_trees(ObjectPool* pool, const 
std::vector<doris::TExpr
 
 Status VExpr::prepare(const std::vector<VExprContext*>& ctxs, RuntimeState* 
state,
                       const RowDescriptor& row_desc) {
-    for (int i = 0; i < ctxs.size(); ++i) {
-        RETURN_IF_ERROR(ctxs[i]->prepare(state, row_desc));
+    for (auto ctx : ctxs) {
+        RETURN_IF_ERROR(ctx->prepare(state, row_desc));
     }
     return Status::OK();
 }
 
 void VExpr::close(const std::vector<VExprContext*>& ctxs, RuntimeState* state) 
{
-    for (int i = 0; i < ctxs.size(); ++i) {
-        ctxs[i]->close(state);
+    for (auto ctx : ctxs) {
+        ctx->close(state);
     }
 }
 
diff --git a/be/src/vec/functions/like.cpp b/be/src/vec/functions/like.cpp
index 306aa19a7e..86db32c0dd 100644
--- a/be/src/vec/functions/like.cpp
+++ b/be/src/vec/functions/like.cpp
@@ -135,15 +135,14 @@ Status FunctionLikeBase::regexp_fn(LikeSearchState* 
state, const StringValue& va
 Status FunctionLikeBase::hs_prepare(FunctionContext* context, const char* 
expression,
                                     hs_database_t** database, hs_scratch_t** 
scratch) {
     hs_compile_error_t* compile_err;
-
-    if (hs_compile(expression, HS_FLAG_DOTALL | HS_FLAG_ALLOWEMPTY, 
HS_MODE_BLOCK, NULL, database,
-                   &compile_err) != HS_SUCCESS) {
+    auto res = hs_compile(expression, HS_FLAG_DOTALL | HS_FLAG_ALLOWEMPTY, 
HS_MODE_BLOCK, NULL,
+                          database, &compile_err);
+    if (res != HS_SUCCESS) {
         *database = nullptr;
         if (context) context->set_error("hs_compile regex pattern error");
-        auto status = Status::RuntimeError("hs_compile regex pattern error:" +
-                                           std::string(compile_err->message));
+        return Status::RuntimeError("hs_compile regex pattern error:" +
+                                    std::string(compile_err->message));
         hs_free_compile_error(compile_err);
-        return status;
     }
     hs_free_compile_error(compile_err);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
index 22eae97c1d..a3fb63f0e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
@@ -338,6 +338,7 @@ public class SlotDescriptor {
         
builder.append(prefix).append("byteOffset=").append(byteOffset).append("\n");
         
builder.append(prefix).append("nullIndicatorByte=").append(nullIndicatorByte).append("\n");
         
builder.append(prefix).append("nullIndicatorBit=").append(nullIndicatorBit).append("\n");
+        
builder.append(prefix).append("nullable=").append(isNullable).append("\n");
         builder.append(prefix).append("slotIdx=").append(slotIdx).append("\n");
         return builder.toString();
     }
@@ -345,4 +346,9 @@ public class SlotDescriptor {
     public boolean isScanSlot() {
         return parent.getTable() instanceof OlapTable;
     }
+
+    public void setMaterialized(boolean materialized) {
+        isMaterialized = materialized;
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/ExpressionTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/ExpressionTranslator.java
index 5a40a8cb0b..43d9975e43 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/ExpressionTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/ExpressionTranslator.java
@@ -258,9 +258,10 @@ public class ExpressionTranslator extends 
DefaultExpressionVisitor<Expr, PlanTra
 
     @Override
     public Expr visitBinaryArithmetic(BinaryArithmetic binaryArithmetic, 
PlanTranslatorContext context) {
-        return new ArithmeticExpr(binaryArithmetic.getLegacyOperator(),
+        ArithmeticExpr arithmeticExpr =  new 
ArithmeticExpr(binaryArithmetic.getLegacyOperator(),
                 binaryArithmetic.child(0).accept(this, context),
                 binaryArithmetic.child(1).accept(this, context));
+        return arithmeticExpr;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 4090a09e5b..890b2e2cbf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.AggregateInfo;
 import org.apache.doris.analysis.BaseTableRef;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.SlotId;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.SortInfo;
 import org.apache.doris.analysis.TableName;
@@ -41,7 +42,6 @@ import 
org.apache.doris.nereids.trees.expressions.functions.AggregateFunction;
 import org.apache.doris.nereids.trees.plans.AggPhase;
 import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
 import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalAggregate;
@@ -126,24 +126,16 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         if (rootFragment.isPartitioned() && 
rootFragment.getPlanRoot().getNumInstances() > 1) {
             rootFragment = exchangeToMergeFragment(rootFragment, context);
         }
-        // TODO: trick here, we need push project down
-        if (physicalPlan.getType() == PlanType.PHYSICAL_PROJECT) {
-            PhysicalProject<Plan> physicalProject = (PhysicalProject<Plan>) 
physicalPlan;
-            List<Expr> outputExprs = physicalProject.getProjects().stream()
-                    .map(e -> ExpressionTranslator.translate(e, context))
-                    .collect(Collectors.toList());
-            rootFragment.setOutputExprs(outputExprs);
-        } else {
-            List<Expr> outputExprs = Lists.newArrayList();
-            physicalPlan.getOutput().stream().map(Slot::getExprId)
-                    .forEach(exprId -> 
outputExprs.add(context.findSlotRef(exprId)));
-            rootFragment.setOutputExprs(outputExprs);
-        }
+        List<Expr> outputExprs = Lists.newArrayList();
+        physicalPlan.getOutput().stream().map(Slot::getExprId)
+                .forEach(exprId -> 
outputExprs.add(context.findSlotRef(exprId)));
+        rootFragment.setOutputExprs(outputExprs);
         rootFragment.getPlanRoot().convertToVectoriezd();
         for (PlanFragment fragment : context.getPlanFragments()) {
             fragment.finalize(null);
         }
         Collections.reverse(context.getPlanFragments());
+        context.getDescTable().computeMemLayout();
         return rootFragment;
     }
 
@@ -240,10 +232,6 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         // Create OlapScanNode
         List<Slot> slotList = olapScan.getOutput();
         OlapTable olapTable = olapScan.getTable();
-        List<Expr> execConjunctsList = olapScan
-                .getExpressions()
-                .stream()
-                .map(e -> ExpressionTranslator.translate(e, 
context)).collect(Collectors.toList());
         TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, 
olapTable, context);
         tupleDescriptor.setTable(olapTable);
         OlapScanNode olapScanNode = new OlapScanNode(context.nextPlanNodeId(), 
tupleDescriptor, olapTable.getName());
@@ -259,12 +247,13 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             throw new AnalysisException(e.getMessage());
         }
         Utils.execWithUncheckedException(olapScanNode::init);
-        olapScanNode.addConjuncts(execConjunctsList);
         context.addScanNode(olapScanNode);
         // Create PlanFragment
         // TODO: add data partition after we have physical properties
         PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), 
olapScanNode, DataPartition.RANDOM);
         context.addPlanFragment(planFragment);
+        // TODO: Nereids support duplicate table only for now, remove this 
when support aggregate/unique table.
+        olapScanNode.setIsPreAggregation(true, "Nereids support duplicate 
table only for now");
         return planFragment;
     }
 
@@ -437,9 +426,9 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         }
     }
 
-    // TODO: generate expression mapping when be project could do in ExecNode
+    // TODO: generate expression mapping when be project could do in ExecNode.
     @Override
-    public PlanFragment visitPhysicalProject(PhysicalProject<Plan> project, 
PlanTranslatorContext context) {
+    public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> 
project, PlanTranslatorContext context) {
         PlanFragment inputFragment = project.child(0).accept(this, context);
 
         // TODO: handle p.child(0) is not NamedExpression.
@@ -453,28 +442,66 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                 .map(e -> ExpressionTranslator.translate(e, context))
                 .collect(Collectors.toList());
         // TODO: fix the project alias of an aliased relation.
+        List<Slot> slotList = project.getOutput();
+        TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, null, 
context);
         PlanNode inputPlanNode = inputFragment.getPlanRoot();
+        // For hash join node, use vSrcToOutputSMap to describe the expression 
calculation, use
+        // vIntermediateTupleDescList as input, and set vOutputTupleDesc as 
the final output.
+        // TODO: HashJoinNode's be implementation is not support projection 
yet, remove this after when supported.
+        if (inputPlanNode instanceof HashJoinNode) {
+            HashJoinNode hashJoinNode = (HashJoinNode) inputPlanNode;
+            hashJoinNode.setvOutputTupleDesc(tupleDescriptor);
+            hashJoinNode.setvSrcToOutputSMap(execExprList);
+            return inputFragment;
+        }
+        inputPlanNode.setProjectList(execExprList);
+        inputPlanNode.setOutputTupleDesc(tupleDescriptor);
+
         List<Expr> predicateList = inputPlanNode.getConjuncts();
         Set<Integer> requiredSlotIdList = new HashSet<>();
         for (Expr expr : predicateList) {
             extractExecSlot(expr, requiredSlotIdList);
         }
         for (Expr expr : execExprList) {
-            if (expr instanceof SlotRef) {
-                requiredSlotIdList.add(((SlotRef) 
expr).getDesc().getId().asInt());
-            }
+            extractExecSlot(expr, requiredSlotIdList);
+        }
+        if (inputPlanNode instanceof OlapScanNode) {
+            updateChildSlotsMaterialization(inputPlanNode, requiredSlotIdList, 
context);
         }
         return inputFragment;
     }
 
+    private void updateChildSlotsMaterialization(PlanNode execPlan,
+            Set<Integer> requiredSlotIdList,
+            PlanTranslatorContext context) {
+        Set<SlotRef> slotRefSet = new HashSet<>();
+        for (Expr expr : execPlan.getConjuncts()) {
+            expr.collect(SlotRef.class, slotRefSet);
+        }
+        Set<Integer> slotIdSet = slotRefSet.stream()
+                
.map(SlotRef::getSlotId).map(SlotId::asInt).collect(Collectors.toSet());
+        slotIdSet.addAll(requiredSlotIdList);
+        execPlan.getTupleIds().stream()
+                .map(context::getTupleDesc)
+                .map(TupleDescriptor::getSlots)
+                .flatMap(List::stream)
+                .forEach(s -> 
s.setIsMaterialized(slotIdSet.contains(s.getId().asInt())));
+    }
+
     @Override
     public PlanFragment visitPhysicalFilter(PhysicalFilter<Plan> filter, 
PlanTranslatorContext context) {
         PlanFragment inputFragment = filter.child(0).accept(this, context);
         PlanNode planNode = inputFragment.getPlanRoot();
+        addConjunctsToPlanNode(filter, planNode, context);
+        return inputFragment;
+    }
+
+    private void addConjunctsToPlanNode(PhysicalFilter<Plan> filter,
+            PlanNode planNode,
+            PlanTranslatorContext context) {
         Expression expression = filter.getPredicates();
         List<Expression> expressionList = 
ExpressionUtils.extractConjunction(expression);
         expressionList.stream().map(e -> ExpressionTranslator.translate(e, 
context)).forEach(planNode::addConjunct);
-        return inputFragment;
     }
 
     @Override
@@ -600,4 +627,5 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         context.addPlanFragment(fragment);
         return fragment;
     }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
index 806bc1475c..c412bedabe 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
@@ -22,10 +22,10 @@ import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotId;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.common.IdGenerator;
 import org.apache.doris.nereids.trees.expressions.ExprId;
-import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.PlanFragmentId;
@@ -122,17 +122,10 @@ public class PlanTranslatorContext {
         
slotDescriptor.setType(slotReference.getDataType().toCatalogDataType());
         slotDescriptor.setIsMaterialized(true);
         this.addExprIdSlotRefPair(slotReference.getExprId(), new 
SlotRef(slotDescriptor));
+        slotDescriptor.setIsNullable(slotReference.nullable());
         return slotDescriptor;
     }
 
-    /**
-     * Create slotDesc with Expression.
-     */
-    public void createSlotDesc(TupleDescriptor tupleDesc, Expression 
expression) {
-        SlotDescriptor slotDescriptor = this.addSlotDesc(tupleDesc);
-        slotDescriptor.setType(expression.getDataType().toCatalogDataType());
-    }
-
     /**
      * in Nereids, all node only has one TupleDescriptor, so we can use the 
first one.
      *
@@ -144,6 +137,10 @@ public class PlanTranslatorContext {
         return descTable.getTupleDesc(planNode.getOutputTupleIds().get(0));
     }
 
+    public TupleDescriptor getTupleDesc(TupleId tupleId) {
+        return descTable.getTupleDesc(tupleId);
+    }
+
     public DescriptorTable getDescTable() {
         return descTable;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/JobType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/JobType.java
index 856773028d..6531376177 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/JobType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/JobType.java
@@ -29,6 +29,6 @@ public enum JobType {
     APPLY_RULE,
     DERIVE_STATS,
     TOP_DOWN_REWRITE,
-    BOTTOM_UP_REWRITE,
+    BOTTOM_UP_REWRITE
     ;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/RewriteJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/RewriteJob.java
index 8a3e5b4c83..274929db75 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/RewriteJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/RewriteJob.java
@@ -21,6 +21,7 @@ import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.jobs.Job;
 import 
org.apache.doris.nereids.rules.expression.rewrite.ExpressionNormalization;
 import org.apache.doris.nereids.rules.rewrite.AggregateDisassemble;
+import org.apache.doris.nereids.rules.rewrite.logical.ColumnPruning;
 import org.apache.doris.nereids.rules.rewrite.logical.FindHashConditionForJoin;
 import org.apache.doris.nereids.rules.rewrite.logical.MergeConsecutiveFilters;
 import org.apache.doris.nereids.rules.rewrite.logical.MergeConsecutiveLimits;
@@ -50,6 +51,7 @@ public class RewriteJob extends BatchRulesJob {
                 .add(topDownBatch(ImmutableList.of(new 
FindHashConditionForJoin())))
                 .add(topDownBatch(ImmutableList.of(new 
PushPredicateThroughJoin())))
                 .add(topDownBatch(ImmutableList.of(new 
AggregateDisassemble())))
+                .add(topDownBatch(ImmutableList.of(new ColumnPruning())))
                 .add(topDownBatch(ImmutableList.of(new 
SwapFilterAndProject())))
                 .add(bottomUpBatch(ImmutableList.of(new 
MergeConsecutiveProjects())))
                 .add(topDownBatch(ImmutableList.of(new 
MergeConsecutiveFilters())))
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
index 6b03b68182..286dc12eff 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
@@ -172,7 +172,7 @@ public class StatsCalculator extends 
DefaultPlanVisitor<StatsDeriveResult, Void>
 
     // TODO: We should subtract those pruned column, and consider the 
expression transformations in the node.
     @Override
-    public StatsDeriveResult visitPhysicalProject(PhysicalProject<Plan> 
project, Void context) {
+    public StatsDeriveResult visitPhysicalProject(PhysicalProject<? extends 
Plan> project, Void context) {
         return computeProject(project);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
index 818ef3796e..173b363799 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
@@ -32,8 +32,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Objects;
@@ -45,8 +43,6 @@ public abstract class Expression extends 
AbstractTreeNode<Expression> {
 
     private static final String INPUT_CHECK_ERROR_MESSAGE = "argument %d 
requires %s type, however '%s' is of %s type";
 
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
     public Expression(Expression... children) {
         super(children);
     }
@@ -147,23 +143,4 @@ public abstract class Expression extends 
AbstractTreeNode<Expression> {
         return 0;
     }
 
-    /**
-     * Return true if all the SlotRef in the expr tree is bound to the same 
column.
-     */
-    public boolean boundToColumn(String column) {
-        for (Expression child : children) {
-            if (!child.boundToColumn(column)) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    public Expression leftMostNode() {
-        Expression leftChild = this;
-        while (leftChild.children.size() > 0) {
-            leftChild = leftChild.child(0);
-        }
-        return leftChild;
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java
index e8e04e901a..24a601a2e5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java
@@ -155,8 +155,4 @@ public class SlotReference extends Slot {
         return new SlotReference(exprId, name, dataType, nullable, qualifiers);
     }
 
-    @Override
-    public boolean boundToColumn(String name) {
-        return this.name.equals(name);
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java
index 5c7564b50d..06527a1b78 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java
@@ -20,6 +20,7 @@ package org.apache.doris.nereids.trees.plans;
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.trees.AbstractTreeNode;
+import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.statistics.StatsDeriveResult;
 
 import org.apache.commons.lang3.StringUtils;
@@ -119,4 +120,15 @@ public abstract class AbstractPlan extends 
AbstractTreeNode<Plan> implements Pla
     public int hashCode() {
         return Objects.hash(statsDeriveResult, logicalProperties);
     }
+
+    @Override
+    public List<Slot> getOutput() {
+        return logicalProperties.getOutput();
+    }
+
+    @Override
+    public Plan child(int index) {
+        return super.child(index);
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/GroupPlan.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/GroupPlan.java
index eaf5c56b44..cfc8d1daf2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/GroupPlan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/GroupPlan.java
@@ -98,4 +98,5 @@ public class GroupPlan extends LogicalLeaf {
     public String toString() {
         return "GroupPlan( " + group.getGroupId() + " )";
     }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java
index a6440252de..68cea493dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java
@@ -49,10 +49,6 @@ public interface Plan extends TreeNode<Plan> {
 
     List<Slot> getOutput();
 
-    default List<Slot> computeOutput(Plan... inputs) {
-        throw new IllegalStateException("Not support compute output for " + 
getClass().getName());
-    }
-
     String treeString();
 
     default Plan withOutput(List<Slot> output) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
index 4b2349fbca..a14797ec88 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
@@ -20,12 +20,10 @@ package org.apache.doris.nereids.trees.plans.physical;
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.properties.PhysicalProperties;
-import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.plans.AbstractPlan;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
 
-import java.util.List;
 import java.util.Optional;
 
 /**
@@ -59,11 +57,6 @@ public abstract class AbstractPhysicalPlan extends 
AbstractPlan implements Physi
         this.physicalProperties = PhysicalProperties.ANY;
     }
 
-    @Override
-    public List<Slot> getOutput() {
-        return logicalProperties.getOutput();
-    }
-
     @Override
     public LogicalProperties getLogicalProperties() {
         return logicalProperties;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
index 6d75bf5819..cc5d914c1a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
@@ -180,7 +180,7 @@ public abstract class PlanVisitor<R, C> {
         return visit(nestedLoopJoin, context);
     }
 
-    public R visitPhysicalProject(PhysicalProject<Plan> project, C context) {
+    public R visitPhysicalProject(PhysicalProject<? extends Plan> project, C 
context) {
         return visit(project, context);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index ad2cc2f5fc..5cae77c483 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -23,6 +23,7 @@ package org.apache.doris.planner;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.SortInfo;
+import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.VectorizedUtil;
@@ -83,6 +84,7 @@ public class ExchangeNode extends PlanNode {
             limit = inputNode.limit;
         }
         computeTupleIds();
+
     }
 
     public boolean isMergingExchange() {
@@ -94,8 +96,15 @@ public class ExchangeNode extends PlanNode {
 
     @Override
     public final void computeTupleIds() {
-        clearTupleIds();
-        tupleIds.addAll(getChild(0).getTupleIds());
+        PlanNode inputNode = getChild(0);
+        TupleDescriptor outputTupleDesc = inputNode.getOutputTupleDesc();
+        if (outputTupleDesc != null) {
+            tupleIds.clear();
+            tupleIds.add(outputTupleDesc.getId());
+        } else {
+            clearTupleIds();
+            tupleIds.addAll(getChild(0).getTupleIds());
+        }
         tblRefIds.addAll(getChild(0).getTblRefIds());
         nullableTupleIds.addAll(getChild(0).getNullableTupleIds());
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 9d0a13e24d..46e35775da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -1080,13 +1080,17 @@ public class HashJoinNode extends PlanNode {
         }
         if (vSrcToOutputSMap != null) {
             for (int i = 0; i < vSrcToOutputSMap.size(); i++) {
+                // TODO: Enable it after we support new optimizers
+                // if 
(ConnectContext.get().getSessionVariable().isEnableNereidsPlanner()) {
+                //     
msg.addToProjections(vSrcToOutputSMap.getLhs().get(i).treeToThrift());
+                // } else
                 
msg.hash_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift());
-                
msg.addToProjections(vSrcToOutputSMap.getLhs().get(i).treeToThrift());
             }
         }
         if (vOutputTupleDesc != null) {
             
msg.hash_join_node.setVoutputTupleId(vOutputTupleDesc.getId().asInt());
-            msg.setOutputTupleId(vOutputTupleDesc.getId().asInt());
+            // TODO Enable it after we support new optimizers
+            // msg.setOutputTupleId(vOutputTupleDesc.getId().asInt());
         }
         if (vIntermediateTupleDescList != null) {
             for (TupleDescriptor tupleDescriptor : vIntermediateTupleDescList) 
{
@@ -1257,4 +1261,25 @@ public class HashJoinNode extends PlanNode {
         }
         return true;
     }
+
+    /**
+     * Used by nereids.
+     */
+    public void setvOutputTupleDesc(TupleDescriptor vOutputTupleDesc) {
+        this.vOutputTupleDesc = vOutputTupleDesc;
+    }
+
+    /**
+     * Used by nereids.
+     */
+    public void setvIntermediateTupleDescList(List<TupleDescriptor> 
vIntermediateTupleDescList) {
+        this.vIntermediateTupleDescList = vIntermediateTupleDescList;
+    }
+
+    /**
+     * Used by nereids.
+     */
+    public void setvSrcToOutputSMap(List<Expr> lhs) {
+        this.vSrcToOutputSMap = new ExprSubstitutionMap(lhs, 
Collections.emptyList());
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 967895ee4e..3c82be05f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -319,7 +319,6 @@ public class OlapScanNode extends ScanNode {
             }
             situation = "The key type of table is aggregated.";
             update = false;
-            break CHECK;
         } // CHECKSTYLE IGNORE THIS LINE
 
         if (update) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index c25282da83..596897c947 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -49,8 +49,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -74,7 +72,6 @@ import java.util.Set;
  * its children (= are bound by tupleIds).
  */
 public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats 
{
-    private static final Logger LOG = LogManager.getLogger(PlanNode.class);
 
     protected String planNodeName;
 
@@ -142,6 +139,10 @@ public abstract class PlanNode extends TreeNode<PlanNode> 
implements PlanStats {
     protected StatisticalType statisticalType = StatisticalType.DEFAULT;
     protected StatsDeriveResult statsDeriveResult;
 
+    protected TupleDescriptor outputTupleDesc;
+
+    protected List<Expr> projectList;
+
     protected PlanNode(PlanNodeId id, ArrayList<TupleId> tupleIds, String 
planNodeName,
             StatisticalType statisticalType) {
         this.id = id;
@@ -550,6 +551,14 @@ public abstract class PlanNode extends TreeNode<PlanNode> 
implements PlanStats {
         }
         toThrift(msg);
         container.addToNodes(msg);
+        if (projectList != null) {
+            for (Expr expr : projectList) {
+                msg.addToProjections(expr.treeToThrift());
+            }
+        }
+        if (outputTupleDesc != null) {
+            msg.setOutputTupleId(outputTupleDesc.getId().asInt());
+        }
         if (this instanceof ExchangeNode) {
             msg.num_children = 0;
             return;
@@ -1009,4 +1018,20 @@ public abstract class PlanNode extends 
TreeNode<PlanNode> implements PlanStats {
     public void finalizeForNereids() {
 
     }
+
+    public void setOutputTupleDesc(TupleDescriptor outputTupleDesc) {
+        this.outputTupleDesc = outputTupleDesc;
+    }
+
+    public TupleDescriptor getOutputTupleDesc() {
+        return outputTupleDesc;
+    }
+
+    public void setProjectList(List<Expr> projectList) {
+        this.projectList = projectList;
+    }
+
+    public List<Expr> getProjectList() {
+        return projectList;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index b93f7f15c0..58e4a02435 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -1144,7 +1144,7 @@ public class SingleNodePlanner {
                 MaterializedViewSelector.BestIndexInfo bestIndexInfo
                         = materializedViewSelector.selectBestMV(olapScanNode);
                 if (bestIndexInfo == null) {
-                    selectFailed |= true;
+                    selectFailed = true;
                     TupleId tupleId = olapScanNode.getTupleId();
                     selectStmt.updateDisableTuplesMVRewriter(tupleId);
                     LOG.debug("MV rewriter of tuple [] will be disable", 
tupleId);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
new file mode 100644
index 0000000000..8762ce125b
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.glue.translator;
+
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.GreaterThan;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.types.IntegerType;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanNode;
+
+import mockit.Injectable;
+import mockit.Mocked;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class PhysicalPlanTranslatorTest {
+
+    @Test
+    public void testOlapPrune(@Mocked OlapTable t1, @Injectable 
LogicalProperties placeHolder) throws Exception {
+        List<String> qualifierList = new ArrayList<>();
+        qualifierList.add("test");
+        qualifierList.add("t1");
+        List<Slot> t1Output = new ArrayList<>();
+        SlotReference col1 = new SlotReference("col1", IntegerType.INSTANCE);
+        SlotReference col2 = new SlotReference("col2", IntegerType.INSTANCE);
+        SlotReference col3 = new SlotReference("col2", IntegerType.INSTANCE);
+        t1Output.add(col1);
+        t1Output.add(col2);
+        t1Output.add(col3);
+        LogicalProperties t1Properties = new LogicalProperties(() -> t1Output);
+        PhysicalOlapScan scan = new PhysicalOlapScan(t1, qualifierList, 0L,
+                Collections.emptyList(), Collections.emptyList(), null,
+                Optional.empty(),
+                t1Properties);
+        Literal t1FilterRight = new IntegerLiteral(1);
+        Expression t1FilterExpr = new GreaterThan(col1, t1FilterRight);
+        PhysicalFilter<PhysicalOlapScan> filter =
+                new PhysicalFilter(t1FilterExpr, placeHolder, scan);
+        List<NamedExpression> projList = new ArrayList<>();
+        projList.add(col2);
+        PhysicalProject<PhysicalFilter> project = new PhysicalProject(projList,
+                placeHolder, filter);
+        PlanTranslatorContext planTranslatorContext = new 
PlanTranslatorContext();
+        PhysicalPlanTranslator translator = new PhysicalPlanTranslator();
+        PlanFragment fragment = translator.visitPhysicalProject(project, 
planTranslatorContext);
+        PlanNode planNode = fragment.getPlanRoot();
+        List<OlapScanNode> scanNodeList = new ArrayList<>();
+        planNode.collect(OlapScanNode.class::isInstance, scanNodeList);
+        Assertions.assertEquals(2, 
scanNodeList.get(0).getTupleDesc().getMaterializedSlots().size());
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanRewriter.java 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanRewriter.java
index 2a4a9343e6..67f6bc43ea 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanRewriter.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanRewriter.java
@@ -40,14 +40,14 @@ public class PlanRewriter {
     public static Memo bottomUpRewriteMemo(Plan plan, ConnectContext 
connectContext, RuleFactory... rules) {
         return new Memo(plan)
                 .newCascadesContext(new StatementContext(connectContext, new 
OriginStatement("", 0)))
-                .topDownRewrite(rules)
+                .bottomUpRewrite(rules)
                 .getMemo();
     }
 
     public static Memo bottomUpRewriteMemo(Plan plan, ConnectContext 
connectContext, Rule... rules) {
         return new Memo(plan)
                 .newCascadesContext(new StatementContext(connectContext, new 
OriginStatement("", 0)))
-                .topDownRewrite(rules)
+                .bottomUpRewrite(rules)
                 .getMemo();
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java
index bd3d1f3866..2fe0085883 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java
@@ -90,6 +90,8 @@ public class DistributedPlannerTest {
         Deencapsulation.setField(inputPlanRoot, "conjuncts", 
Lists.newArrayList());
         new Expectations() {
             {
+                inputPlanRoot.getOutputTupleDesc();
+                result = null;
                 inputFragment.isPartitioned();
                 result = true;
                 plannerContext.getNextNodeId();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to