This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new f79c0ddaed9 [branch-4.0](pick)expr execution now returns a Column 
directly instead of inserting into a Block  (#58227)
f79c0ddaed9 is described below

commit f79c0ddaed9b3c370d7d88741520eeb01b43e1d3
Author: Mryange <[email protected]>
AuthorDate: Fri Nov 21 23:51:42 2025 +0800

    [branch-4.0](pick)expr execution now returns a Column directly instead of 
inserting into a Block  (#58227)
    
    ### What problem does this PR solve?
    https://github.com/apache/doris/pull/57486
    https://github.com/apache/doris/pull/57811
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/pipeline/exec/operator.cpp                  |  6 +-
 be/src/vec/core/block.h                            |  3 +
 be/src/vec/exec/scan/scanner.cpp                   |  7 +-
 be/src/vec/exprs/lambda_function/lambda_function.h |  4 +-
 .../lambda_function/varray_filter_function.cpp     | 40 ++++------
 .../exprs/lambda_function/varray_map_function.cpp  | 83 +++++++++-----------
 be/src/vec/exprs/vbitmap_predicate.cpp             | 25 +++---
 be/src/vec/exprs/vbitmap_predicate.h               |  3 +-
 be/src/vec/exprs/vbloom_predicate.cpp              | 25 +++---
 be/src/vec/exprs/vbloom_predicate.h                |  3 +-
 be/src/vec/exprs/vcase_expr.cpp                    | 32 +++-----
 be/src/vec/exprs/vcase_expr.h                      |  3 +-
 be/src/vec/exprs/vcast_expr.cpp                    | 74 +++++++++---------
 be/src/vec/exprs/vcast_expr.h                      |  6 +-
 be/src/vec/exprs/vcolumn_ref.h                     | 10 ++-
 be/src/vec/exprs/vcompound_pred.h                  | 91 ++++++++--------------
 be/src/vec/exprs/vdirect_in_predicate.h            | 39 ++++------
 be/src/vec/exprs/vectorized_fn_call.cpp            | 55 +++++++------
 be/src/vec/exprs/vectorized_fn_call.h              | 12 +--
 be/src/vec/exprs/vexpr.cpp                         | 21 ++---
 be/src/vec/exprs/vexpr.h                           | 32 +++++---
 be/src/vec/exprs/vexpr_context.cpp                 |  6 ++
 be/src/vec/exprs/vexpr_context.h                   |  2 +
 be/src/vec/exprs/vin_predicate.cpp                 | 33 ++++----
 be/src/vec/exprs/vin_predicate.h                   |  3 +-
 be/src/vec/exprs/vinfo_func.cpp                    |  8 +-
 be/src/vec/exprs/vinfo_func.h                      |  3 +-
 be/src/vec/exprs/virtual_slot_ref.cpp              | 24 +++---
 be/src/vec/exprs/virtual_slot_ref.h                |  3 +-
 be/src/vec/exprs/vlambda_function_call_expr.h      |  5 +-
 be/src/vec/exprs/vlambda_function_expr.h           |  9 ++-
 be/src/vec/exprs/vliteral.cpp                      |  8 +-
 be/src/vec/exprs/vliteral.h                        |  3 +-
 be/src/vec/exprs/vmatch_predicate.cpp              | 28 ++++---
 be/src/vec/exprs/vmatch_predicate.h                |  3 +-
 be/src/vec/exprs/vruntimefilter_wrapper.cpp        | 19 ++---
 be/src/vec/exprs/vruntimefilter_wrapper.h          |  3 +-
 be/src/vec/exprs/vsearch.cpp                       |  5 +-
 be/src/vec/exprs/vsearch.h                         |  3 +-
 be/src/vec/exprs/vslot_ref.cpp                     | 20 +++++
 be/src/vec/exprs/vslot_ref.h                       |  3 +
 be/src/vec/exprs/vtopn_pred.h                      | 38 +++++----
 be/src/vec/functions/cast/cast_to_variant.h        |  2 +-
 be/src/vec/functions/cast/function_cast.cpp        |  2 +-
 be/src/vec/functions/function.cpp                  | 71 ++++++++---------
 be/src/vec/functions/function.h                    | 30 +++----
 be/src/vec/functions/function_rpc.h                |  2 +-
 be/test/exprs/mock_vexpr.h                         |  2 +
 be/test/exprs/virtual_slot_ref_test.cpp            | 15 ++++
 be/test/olap/collection_statistics_test.cpp        |  5 ++
 be/test/vec/exprs/try_cast_expr_test.cpp           | 21 ++++-
 be/test/vec/exprs/vsearch_expr_test.cpp            |  4 +
 52 files changed, 493 insertions(+), 464 deletions(-)

diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 5d48c64e814..6f0eb562f0f 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -351,9 +351,9 @@ Status OperatorXBase::do_projections(RuntimeState* state, 
vectorized::Block* ori
         DCHECK_EQ(mutable_columns.size(), local_state->_projections.size()) << 
debug_string();
         for (int i = 0; i < mutable_columns.size(); ++i) {
             auto result_column_id = -1;
-            
RETURN_IF_ERROR(local_state->_projections[i]->execute(&input_block, 
&result_column_id));
-            auto column_ptr = input_block.get_by_position(result_column_id)
-                                      
.column->convert_to_full_column_if_const();
+            ColumnPtr column_ptr;
+            
RETURN_IF_ERROR(local_state->_projections[i]->execute(&input_block, 
column_ptr));
+            column_ptr = column_ptr->convert_to_full_column_if_const();
             if (result_column_id >= origin_columns_count) {
                 bytes_usage += column_ptr->allocated_bytes();
             }
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index b99f2ec09ad..eb3849fa31a 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -123,6 +123,9 @@ public:
         }
         return name_to_index_map;
     }
+    // Use this method only when you are certain index_by_name will not be used
+    // This is a temporary compromise; index_by_name may be removed in the 
future
+    void simple_insert(const ColumnWithTypeAndName& elem) { 
data.emplace_back(elem); }
 
     /// References are invalidated after calling functions above.
     ColumnWithTypeAndName& get_by_position(size_t position) {
diff --git a/be/src/vec/exec/scan/scanner.cpp b/be/src/vec/exec/scan/scanner.cpp
index c9391960c8e..d5ee62c2b79 100644
--- a/be/src/vec/exec/scan/scanner.cpp
+++ b/be/src/vec/exec/scan/scanner.cpp
@@ -177,10 +177,9 @@ Status Scanner::_do_projections(vectorized::Block* 
origin_block, vectorized::Blo
     DCHECK_EQ(mutable_columns.size(), _projections.size());
 
     for (int i = 0; i < mutable_columns.size(); ++i) {
-        auto result_column_id = -1;
-        RETURN_IF_ERROR(_projections[i]->execute(&input_block, 
&result_column_id));
-        auto column_ptr = input_block.get_by_position(result_column_id)
-                                  .column->convert_to_full_column_if_const();
+        ColumnPtr column_ptr;
+        RETURN_IF_ERROR(_projections[i]->execute(&input_block, column_ptr));
+        column_ptr = column_ptr->convert_to_full_column_if_const();
         if (mutable_columns[i]->is_nullable() != column_ptr->is_nullable()) {
             throw Exception(ErrorCode::INTERNAL_ERROR, "Nullable mismatch");
         }
diff --git a/be/src/vec/exprs/lambda_function/lambda_function.h 
b/be/src/vec/exprs/lambda_function/lambda_function.h
index 9e141549042..da83a662c73 100644
--- a/be/src/vec/exprs/lambda_function/lambda_function.h
+++ b/be/src/vec/exprs/lambda_function/lambda_function.h
@@ -36,8 +36,8 @@ public:
         return Status::OK();
     }
 
-    virtual doris::Status execute(VExprContext* context, 
doris::vectorized::Block* block,
-                                  int* result_column_id, const DataTypePtr& 
result_type,
+    virtual doris::Status execute(VExprContext* context, const 
doris::vectorized::Block* block,
+                                  ColumnPtr& result_column, const DataTypePtr& 
result_type,
                                   const VExprSPtrs& children) const = 0;
 
     int batch_size;
diff --git a/be/src/vec/exprs/lambda_function/varray_filter_function.cpp 
b/be/src/vec/exprs/lambda_function/varray_filter_function.cpp
index 4c3021a76be..23717eb6d10 100644
--- a/be/src/vec/exprs/lambda_function/varray_filter_function.cpp
+++ b/be/src/vec/exprs/lambda_function/varray_filter_function.cpp
@@ -52,24 +52,23 @@ public:
 
     std::string get_name() const override { return name; }
 
-    doris::Status execute(VExprContext* context, doris::vectorized::Block* 
block,
-                          int* result_column_id, const DataTypePtr& 
result_type,
+    doris::Status execute(VExprContext* context, const 
doris::vectorized::Block* block,
+                          ColumnPtr& result_column, const DataTypePtr& 
result_type,
                           const VExprSPtrs& children) const override {
         ///* array_filter(array, array<boolean>) *///
 
         //1. child[0:end]->execute(src_block)
-        doris::vectorized::ColumnNumbers arguments(children.size());
-        for (int i = 0; i < children.size(); ++i) {
-            int column_id = -1;
-            RETURN_IF_ERROR(children[i]->execute(context, block, &column_id));
-            arguments[i] = column_id;
-        }
+
+        DCHECK_EQ(children.size(), 2);
+        ColumnPtr column_ptr_0;
+        RETURN_IF_ERROR(children[0]->execute_column(context, block, 
column_ptr_0));
+        ColumnPtr column_ptr_1;
+        RETURN_IF_ERROR(children[1]->execute_column(context, block, 
column_ptr_1));
 
         //2. get first and second array column
-        auto first_column =
-                
block->get_by_position(arguments[0]).column->convert_to_full_column_if_const();
-        auto second_column =
-                
block->get_by_position(arguments[1]).column->convert_to_full_column_if_const();
+        auto first_column = column_ptr_0->convert_to_full_column_if_const();
+
+        auto second_column = column_ptr_1->convert_to_full_column_if_const();
 
         auto input_rows = first_column->size();
         auto first_outside_null_map = ColumnUInt8::create(input_rows, 0);
@@ -148,24 +147,19 @@ public:
         }
         
first_nested_nullable_column.append_data_by_selector(result_data_column, 
selector);
 
-        //4. insert the result column to block
-        ColumnWithTypeAndName result_arr;
+        //4. return result column
         if (result_type->is_nullable()) {
-            result_arr = {
+            result_column =
                     
ColumnNullable::create(ColumnArray::create(std::move(result_data_column),
                                                                
std::move(result_offset_column)),
-                                           std::move(first_outside_null_map)),
-                    result_type, "array_filter_result"};
-
+                                           std::move(first_outside_null_map));
         } else {
             DCHECK(!first_column->is_nullable());
             DCHECK(!second_column->is_nullable());
-            result_arr = {ColumnArray::create(std::move(result_data_column),
-                                              std::move(result_offset_column)),
-                          result_type, "array_filter_result"};
+            result_column = ColumnArray::create(std::move(result_data_column),
+                                                
std::move(result_offset_column));
         }
-        block->insert(std::move(result_arr));
-        *result_column_id = block->columns() - 1;
+
         return Status::OK();
     }
 };
diff --git a/be/src/vec/exprs/lambda_function/varray_map_function.cpp 
b/be/src/vec/exprs/lambda_function/varray_map_function.cpp
index 0b94568b84a..914ec6eb930 100644
--- a/be/src/vec/exprs/lambda_function/varray_map_function.cpp
+++ b/be/src/vec/exprs/lambda_function/varray_map_function.cpp
@@ -33,6 +33,7 @@
 #include "vec/core/block.h"
 #include "vec/core/column_numbers.h"
 #include "vec/core/column_with_type_and_name.h"
+#include "vec/core/columns_with_type_and_name.h"
 #include "vec/data_types/data_type.h"
 #include "vec/data_types/data_type_array.h"
 #include "vec/data_types/data_type_nullable.h"
@@ -76,7 +77,7 @@ public:
 
     std::string get_name() const override { return name; }
 
-    Status execute(VExprContext* context, vectorized::Block* block, int* 
result_column_id,
+    Status execute(VExprContext* context, const vectorized::Block* block, 
ColumnPtr& result_column,
                    const DataTypePtr& result_type, const VExprSPtrs& children) 
const override {
         LambdaArgs args_info;
         // collect used slot ref in lambda function body
@@ -107,19 +108,18 @@ public:
 
         ///* array_map(lambda,arg1,arg2,.....) *///
         //1. child[1:end]->execute(src_block)
-        doris::vectorized::ColumnNumbers arguments(children.size() - 1);
+        ColumnsWithTypeAndName arguments(children.size() - 1);
         for (int i = 1; i < children.size(); ++i) {
-            int column_id = -1;
-            RETURN_IF_ERROR(children[i]->execute(context, block, &column_id));
-            arguments[i - 1] = column_id;
+            ColumnPtr column;
+            RETURN_IF_ERROR(children[i]->execute_column(context, block, 
column));
+            arguments[i - 1].column = column;
+            arguments[i - 1].type = children[i]->execute_type(block);
+            arguments[i - 1].name = children[i]->expr_name();
         }
 
         // used for save column array outside null map
-        auto outside_null_map =
-                ColumnUInt8::create(block->get_by_position(arguments[0])
-                                            
.column->convert_to_full_column_if_const()
-                                            ->size(),
-                                    0);
+        auto outside_null_map = ColumnUInt8::create(
+                
arguments[0].column->convert_to_full_column_if_const()->size(), 0);
         // offset column
         MutableColumnPtr array_column_offset;
         size_t nested_array_column_rows = 0;
@@ -128,7 +128,7 @@ public:
         std::vector<ColumnPtr> lambda_datas(arguments.size());
 
         for (int i = 0; i < arguments.size(); ++i) {
-            const auto& array_column_type_name = 
block->get_by_position(arguments[i]);
+            const auto& array_column_type_name = arguments[i];
             auto column_array = 
array_column_type_name.column->convert_to_full_column_if_const();
             auto type_array = array_column_type_name.type;
             if (type_array->is_nullable()) {
@@ -185,7 +185,6 @@ public:
             data_types.push_back(col_type.get_nested_type());
         }
 
-        ColumnWithTypeAndName result_arr;
         // if column_array is NULL, we know the array_data_column will not 
write any data,
         // so the column is empty. eg : (x) -> concat('|',x + "1"). if still 
execute the lambda function, will cause the bolck rows are not equal
         // the x column is empty, but "|" is const literal, size of column is 
1, so the block rows is 1, but the x column is empty, will be coredump.
@@ -206,21 +205,17 @@ public:
                                                            
std::move(array_column_offset));
 
             if (is_nullable) {
-                result_arr = 
{ColumnNullable::create(std::move(result_array_column),
-                                                     
std::move(outside_null_map)),
-                              result_type, "Result"};
+                result_column = 
ColumnNullable::create(std::move(result_array_column),
+                                                       
std::move(outside_null_map));
             } else {
-                result_arr = {std::move(result_array_column), result_type, 
"Result"};
+                result_column = std::move(result_array_column);
             }
 
-            block->insert(result_arr);
-            *result_column_id = block->columns() - 1;
             return Status::OK();
         }
 
         MutableColumnPtr result_col = nullptr;
         DataTypePtr res_type;
-        std::string res_name;
 
         //process first row
         args_info.array_start = 
(*args_info.offsets_ptr)[args_info.current_row_idx - 1];
@@ -286,12 +281,12 @@ public:
                 }
             }
             //3. child[0]->execute(new_block)
-            RETURN_IF_ERROR(children[0]->execute(context, &lambda_block, 
result_column_id));
 
-            auto res_col = lambda_block.get_by_position(*result_column_id)
-                                   .column->convert_to_full_column_if_const();
-            res_type = lambda_block.get_by_position(*result_column_id).type;
-            res_name = lambda_block.get_by_position(*result_column_id).name;
+            ColumnPtr res_col;
+            RETURN_IF_ERROR(children[0]->execute_column(context, 
&lambda_block, res_col));
+            res_col = res_col->convert_to_full_column_if_const();
+            res_type = children[0]->execute_type(&lambda_block);
+
             if (!result_col) {
                 result_col = res_col->clone_empty();
             }
@@ -302,40 +297,32 @@ public:
         //4. get the result column after execution, reassemble it into a new 
array column, and return.
         if (result_type->is_nullable()) {
             if (res_type->is_nullable()) {
-                result_arr = {
-                        
ColumnNullable::create(ColumnArray::create(std::move(result_col),
-                                                                   
std::move(array_column_offset)),
-                                               std::move(outside_null_map)),
-                        result_type, res_name};
+                result_column = ColumnNullable::create(
+                        ColumnArray::create(std::move(result_col), 
std::move(array_column_offset)),
+                        std::move(outside_null_map));
             } else {
                 // deal with eg: select array_map(x -> x is null, [null, 1, 
2]);
                 // need to create the nested column null map for column array
                 auto nested_null_map = ColumnUInt8::create(result_col->size(), 
0);
-                result_arr = {ColumnNullable::create(
-                                      ColumnArray::create(
-                                              
ColumnNullable::create(std::move(result_col),
-                                                                     
std::move(nested_null_map)),
-                                              std::move(array_column_offset)),
-                                      std::move(outside_null_map)),
-                              result_type, res_name};
+
+                result_column = ColumnNullable::create(
+                        
ColumnArray::create(ColumnNullable::create(std::move(result_col),
+                                                                   
std::move(nested_null_map)),
+                                            std::move(array_column_offset)),
+                        std::move(outside_null_map));
             }
         } else {
             if (res_type->is_nullable()) {
-                result_arr = {
-                        ColumnArray::create(std::move(result_col), 
std::move(array_column_offset)),
-                        result_type, res_name};
+                result_column =
+                        ColumnArray::create(std::move(result_col), 
std::move(array_column_offset));
             } else {
                 auto nested_null_map = ColumnUInt8::create(result_col->size(), 
0);
-                result_arr = {
-                        
ColumnArray::create(ColumnNullable::create(std::move(result_col),
-                                                                   
std::move(nested_null_map)),
-                                            std::move(array_column_offset)),
-                        result_type, res_name};
+
+                result_column = ColumnArray::create(
+                        ColumnNullable::create(std::move(result_col), 
std::move(nested_null_map)),
+                        std::move(array_column_offset));
             }
         }
-        block->insert(std::move(result_arr));
-        *result_column_id = block->columns() - 1;
-
         return Status::OK();
     }
 
@@ -368,7 +355,7 @@ private:
         }
     }
 
-    void _extend_data(std::vector<MutableColumnPtr>& columns, Block* block,
+    void _extend_data(std::vector<MutableColumnPtr>& columns, const Block* 
block,
                       int current_repeat_times, int size, int64_t 
current_row_idx,
                       const std::vector<int>& output_slot_ref_indexs) const {
         if (!current_repeat_times || !size) {
diff --git a/be/src/vec/exprs/vbitmap_predicate.cpp 
b/be/src/vec/exprs/vbitmap_predicate.cpp
index f2744106f72..775cc5b6faf 100644
--- a/be/src/vec/exprs/vbitmap_predicate.cpp
+++ b/be/src/vec/exprs/vbitmap_predicate.cpp
@@ -75,23 +75,17 @@ doris::Status 
vectorized::VBitmapPredicate::open(doris::RuntimeState* state,
     return Status::OK();
 }
 
-doris::Status vectorized::VBitmapPredicate::execute(vectorized::VExprContext* 
context,
-                                                    doris::vectorized::Block* 
block,
-                                                    int* result_column_id) 
const {
+Status VBitmapPredicate::execute_column(VExprContext* context, const Block* 
block,
+                                        ColumnPtr& result_column) const {
     DCHECK(_open_finished || _getting_const_col);
-    doris::vectorized::ColumnNumbers arguments(_children.size());
-    for (int i = 0; i < _children.size(); ++i) {
-        int column_id = -1;
-        RETURN_IF_ERROR(_children[i]->execute(context, block, &column_id));
-        arguments[i] = column_id;
-    }
-    // call function
-    uint32_t num_columns_without_result = block->columns();
-    auto res_data_column = ColumnUInt8::create(block->rows());
+    DCHECK_EQ(_children.size(), 1);
+
+    ColumnPtr argument_column;
+    RETURN_IF_ERROR(_children[0]->execute_column(context, block, 
argument_column));
+    argument_column = argument_column->convert_to_full_column_if_const();
 
-    ColumnPtr argument_column =
-            
block->get_by_position(arguments[0]).column->convert_to_full_column_if_const();
     size_t sz = argument_column->size();
+    auto res_data_column = ColumnUInt8::create(block->rows());
     res_data_column->resize(sz);
     auto* ptr = res_data_column->get_data().data();
 
@@ -106,8 +100,7 @@ doris::Status 
vectorized::VBitmapPredicate::execute(vectorized::VExprContext* co
         _filter->find_batch(argument_column->get_raw_data().data, nullptr, sz, 
ptr);
     }
 
-    block->insert({std::move(res_data_column), _data_type, EXPR_NAME});
-    *result_column_id = num_columns_without_result;
+    result_column = std::move(res_data_column);
     return Status::OK();
 }
 
diff --git a/be/src/vec/exprs/vbitmap_predicate.h 
b/be/src/vec/exprs/vbitmap_predicate.h
index 704291c8010..1e313da6371 100644
--- a/be/src/vec/exprs/vbitmap_predicate.h
+++ b/be/src/vec/exprs/vbitmap_predicate.h
@@ -49,7 +49,8 @@ public:
 
     ~VBitmapPredicate() override = default;
 
-    Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override;
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override;
 
     Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override;
 
diff --git a/be/src/vec/exprs/vbloom_predicate.cpp 
b/be/src/vec/exprs/vbloom_predicate.cpp
index a9464a4eceb..45fc13e85e7 100644
--- a/be/src/vec/exprs/vbloom_predicate.cpp
+++ b/be/src/vec/exprs/vbloom_predicate.cpp
@@ -70,26 +70,23 @@ void VBloomPredicate::close(VExprContext* context, 
FunctionContext::FunctionStat
     VExpr::close(context, scope);
 }
 
-Status VBloomPredicate::execute(VExprContext* context, Block* block, int* 
result_column_id) const {
+Status VBloomPredicate::execute_column(VExprContext* context, const Block* 
block,
+                                       ColumnPtr& result_column) const {
     DCHECK(_open_finished || _getting_const_col);
-    doris::vectorized::ColumnNumbers arguments(_children.size());
-    for (int i = 0; i < _children.size(); ++i) {
-        int column_id = -1;
-        RETURN_IF_ERROR(_children[i]->execute(context, block, &column_id));
-        arguments[i] = column_id;
-    }
-    // call function
-    auto num_columns_without_result = block->columns();
-    auto res_data_column = ColumnUInt8::create(block->rows());
+    DCHECK_EQ(_children.size(), 1);
+
+    ColumnPtr argument_column;
+    RETURN_IF_ERROR(_children[0]->execute_column(context, block, 
argument_column));
+    argument_column = argument_column->convert_to_full_column_if_const();
 
-    ColumnPtr argument_column =
-            
block->get_by_position(arguments[0]).column->convert_to_full_column_if_const();
+    auto res_data_column = ColumnUInt8::create(block->rows());
     size_t sz = argument_column->size();
     res_data_column->resize(sz);
     auto* ptr = ((ColumnUInt8*)res_data_column.get())->get_data().data();
+
     _filter->find_fixed_len(argument_column, ptr);
-    block->insert({std::move(res_data_column), _data_type, EXPR_NAME});
-    *result_column_id = num_columns_without_result;
+
+    result_column = std::move(res_data_column);
     return Status::OK();
 }
 
diff --git a/be/src/vec/exprs/vbloom_predicate.h 
b/be/src/vec/exprs/vbloom_predicate.h
index b76fa274082..99b075f36e6 100644
--- a/be/src/vec/exprs/vbloom_predicate.h
+++ b/be/src/vec/exprs/vbloom_predicate.h
@@ -43,7 +43,8 @@ class VBloomPredicate final : public VExpr {
 public:
     VBloomPredicate(const TExprNode& node);
     ~VBloomPredicate() override = default;
-    Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override;
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override;
     Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override;
     Status open(RuntimeState* state, VExprContext* context,
                 FunctionContext::FunctionStateScope scope) override;
diff --git a/be/src/vec/exprs/vcase_expr.cpp b/be/src/vec/exprs/vcase_expr.cpp
index 534abad613d..2c52c6f4073 100644
--- a/be/src/vec/exprs/vcase_expr.cpp
+++ b/be/src/vec/exprs/vcase_expr.cpp
@@ -76,9 +76,11 @@ void VCaseExpr::close(VExprContext* context, 
FunctionContext::FunctionStateScope
     VExpr::close(context, scope);
 }
 
-Status VCaseExpr::execute(VExprContext* context, Block* block, int* 
result_column_id) const {
+Status VCaseExpr::execute_column(VExprContext* context, const Block* block,
+                                 ColumnPtr& result_column) const {
     if (is_const_and_have_executed()) { // const have execute in open function
-        return get_result_from_const(block, EXPR_NAME, result_column_id);
+        result_column = get_result_from_const(block);
+        return Status::OK();
     }
     DCHECK(_open_finished || _getting_const_col);
 
@@ -87,43 +89,33 @@ Status VCaseExpr::execute(VExprContext* context, Block* 
block, int* result_colum
     std::vector<ColumnPtr> then_columns;
 
     if (_has_else_expr) {
-        int column_id = -1;
-        RETURN_IF_ERROR(_children.back()->execute(context, block, &column_id));
-        auto else_column_ptr = block->get_by_position(column_id).column;
+        ColumnPtr else_column_ptr;
+        RETURN_IF_ERROR(_children.back()->execute_column(context, block, 
else_column_ptr));
         then_columns.emplace_back(else_column_ptr);
     } else {
         then_columns.emplace_back(nullptr);
     }
 
-    size_t origin_block_size = block->columns();
     for (int i = 0; i < _children.size() - _has_else_expr; i += 2) {
-        int column_id = -1;
-        RETURN_IF_ERROR(_children[i]->execute(context, block, &column_id));
-        auto when_column_ptr = block->get_by_position(column_id).column;
+        ColumnPtr when_column_ptr;
+        RETURN_IF_ERROR(_children[i]->execute_column(context, block, 
when_column_ptr));
         if (calculate_false_number(when_column_ptr) == rows_count) {
-            block->erase_tail(origin_block_size);
             continue;
         }
         when_columns.emplace_back(when_column_ptr);
-        RETURN_IF_ERROR(_children[i + 1]->execute(context, block, &column_id));
-        auto then_column_ptr = block->get_by_position(column_id).column;
+        ColumnPtr then_column_ptr;
+        RETURN_IF_ERROR(_children[i + 1]->execute_column(context, block, 
then_column_ptr));
         then_columns.emplace_back(then_column_ptr);
-        block->erase_tail(origin_block_size);
     }
 
     if (then_columns.size() > UINT16_MAX) {
         return Status::NotSupported(
                 "case when do not support more than UINT16_MAX then 
conditions");
     } else if (then_columns.size() > UINT8_MAX) {
-        block->insert({_execute_impl<uint16_t>(when_columns, then_columns, 
rows_count), _data_type,
-                       EXPR_NAME});
+        result_column = _execute_impl<uint16_t>(when_columns, then_columns, 
rows_count);
     } else {
-        block->insert({_execute_impl<uint8_t>(when_columns, then_columns, 
rows_count), _data_type,
-                       EXPR_NAME});
+        result_column = _execute_impl<uint8_t>(when_columns, then_columns, 
rows_count);
     }
-
-    *result_column_id = block->columns() - 1;
-
     return Status::OK();
 }
 
diff --git a/be/src/vec/exprs/vcase_expr.h b/be/src/vec/exprs/vcase_expr.h
index e267f216127..f77b895fe1d 100644
--- a/be/src/vec/exprs/vcase_expr.h
+++ b/be/src/vec/exprs/vcase_expr.h
@@ -52,7 +52,8 @@ class VCaseExpr final : public VExpr {
 public:
     VCaseExpr(const TExprNode& node);
     ~VCaseExpr() override = default;
-    Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override;
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override;
     Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override;
     Status open(RuntimeState* state, VExprContext* context,
                 FunctionContext::FunctionStateScope scope) override;
diff --git a/be/src/vec/exprs/vcast_expr.cpp b/be/src/vec/exprs/vcast_expr.cpp
index 98e94581f77..ca2bb9057b4 100644
--- a/be/src/vec/exprs/vcast_expr.cpp
+++ b/be/src/vec/exprs/vcast_expr.cpp
@@ -105,25 +105,26 @@ void VCastExpr::close(VExprContext* context, 
FunctionContext::FunctionStateScope
     VExpr::close(context, scope);
 }
 
-doris::Status VCastExpr::execute(VExprContext* context, 
doris::vectorized::Block* block,
-                                 int* result_column_id) const {
+Status VCastExpr::execute_column(VExprContext* context, const Block* block,
+                                 ColumnPtr& result_column) const {
     DCHECK(_open_finished || _getting_const_col)
             << _open_finished << _getting_const_col << _expr_name;
     if (is_const_and_have_executed()) { // const have executed in open function
-        return get_result_from_const(block, _expr_name, result_column_id);
+        result_column = get_result_from_const(block);
+        return Status::OK();
     }
     // for each child call execute
-    int column_id = 0;
-    RETURN_IF_ERROR(_children[0]->execute(context, block, &column_id));
-
-    // call function
-    uint32_t num_columns_without_result = block->columns();
-    // prepare a column to save result
-    block->insert({nullptr, _data_type, _expr_name});
-    RETURN_IF_ERROR(_function->execute(context->fn_context(_fn_context_index), 
*block,
-                                       {static_cast<uint32_t>(column_id)},
-                                       num_columns_without_result, 
block->rows()));
-    *result_column_id = num_columns_without_result;
+
+    ColumnPtr from_column;
+    RETURN_IF_ERROR(_children[0]->execute_column(context, block, from_column));
+
+    Block temp_block;
+    temp_block.insert({from_column, _children[0]->execute_type(block), 
_children[0]->expr_name()});
+    temp_block.insert({nullptr, _data_type, _expr_name});
+    RETURN_IF_ERROR(_function->execute(context->fn_context(_fn_context_index), 
temp_block, {0}, 1,
+                                       block->rows()));
+
+    result_column = temp_block.get_by_position(1).column;
     return Status::OK();
 }
 
@@ -144,37 +145,37 @@ DataTypePtr TryCastExpr::original_cast_return_type() 
const {
     }
 }
 
-Status TryCastExpr::execute(VExprContext* context, Block* block, int* 
result_column_id) const {
+Status TryCastExpr::execute_column(VExprContext* context, const Block* block,
+                                   ColumnPtr& result_column) const {
     DCHECK(_open_finished || _getting_const_col)
             << _open_finished << _getting_const_col << _expr_name;
     if (is_const_and_have_executed()) { // const have executed in open function
-        return get_result_from_const(block, _expr_name, result_column_id);
+        result_column = get_result_from_const(block);
+        return Status::OK();
     }
 
-    int input_column_id = 0;
-
     // For try_cast, try to execute it in batches first.
 
     // execute child first
-    RETURN_IF_ERROR(_children[0]->execute(context, block, &input_column_id));
+
+    ColumnPtr from_column;
+    RETURN_IF_ERROR(_children[0]->execute_column(context, block, from_column));
+    auto from_type = _children[0]->execute_type(block);
 
     // prepare block
-    int output_column_id = block->columns();
-    block->insert({nullptr, original_cast_return_type(), _expr_name});
+
+    Block temp_block;
+    temp_block.insert({from_column, from_type, _children[0]->expr_name()});
+    temp_block.insert({nullptr, original_cast_return_type(), _expr_name});
 
     // batch execute
-    auto batch_exec_status = 
_function->execute(context->fn_context(_fn_context_index), *block,
-                                                
{static_cast<uint32_t>(input_column_id)},
-                                                output_column_id, 
block->rows());
+    auto batch_exec_status = 
_function->execute(context->fn_context(_fn_context_index), temp_block,
+                                                {0}, 1, block->rows());
     // If batch is executed successfully,
     // it means that there is no error and it will be returned directly.
     if (batch_exec_status.ok()) {
-        // wrap nullable
-        block->get_by_position(output_column_id).column =
-                make_nullable(block->get_by_position(output_column_id).column);
-        block->get_by_position(output_column_id).type =
-                make_nullable(block->get_by_position(output_column_id).type);
-        *result_column_id = output_column_id;
+        result_column = temp_block.get_by_position(1).column;
+        result_column = make_nullable(result_column);
         return batch_exec_status;
     }
 
@@ -185,19 +186,16 @@ Status TryCastExpr::execute(VExprContext* context, Block* 
block, int* result_col
 
     // If there is an error that can be handled by try cast,
     // it will be converted into line execution.
-    auto& input_info = block->get_by_position(input_column_id);
-    ColumnPtr return_column;
+    ColumnWithTypeAndName input_info {from_column, from_type, 
_children[0]->expr_name()};
     // distinguish whether the return value of the original cast is nullable
     if (_original_cast_return_is_nullable) {
-        RETURN_IF_ERROR(single_row_execute<true>(context, input_info, 
return_column));
+        RETURN_IF_ERROR(single_row_execute<true>(context, input_info, 
result_column));
     } else {
-        RETURN_IF_ERROR(single_row_execute<false>(context, input_info, 
return_column));
+        RETURN_IF_ERROR(single_row_execute<false>(context, input_info, 
result_column));
     }
     // wrap nullable
-    block->get_by_position(output_column_id).column = return_column;
-    block->get_by_position(output_column_id).type =
-            make_nullable(block->get_by_position(output_column_id).type);
-    *result_column_id = output_column_id;
+    result_column = make_nullable(result_column);
+
     return Status::OK();
 }
 
diff --git a/be/src/vec/exprs/vcast_expr.h b/be/src/vec/exprs/vcast_expr.h
index 344d30e8c31..663f2c4b6ed 100644
--- a/be/src/vec/exprs/vcast_expr.h
+++ b/be/src/vec/exprs/vcast_expr.h
@@ -48,7 +48,8 @@ public:
 #endif
     VCastExpr(const TExprNode& node) : VExpr(node) {}
     ~VCastExpr() override = default;
-    Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override;
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override;
     Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override;
     Status open(RuntimeState* state, VExprContext* context,
                 FunctionContext::FunctionStateScope scope) override;
@@ -82,7 +83,8 @@ public:
 
     TryCastExpr(const TExprNode& node)
             : VCastExpr(node), 
_original_cast_return_is_nullable(node.is_cast_nullable) {}
-    Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override;
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override;
     ~TryCastExpr() override = default;
     std::string cast_name() const override { return "TRY CAST"; }
 
diff --git a/be/src/vec/exprs/vcolumn_ref.h b/be/src/vec/exprs/vcolumn_ref.h
index b04064accca..123b9785e71 100644
--- a/be/src/vec/exprs/vcolumn_ref.h
+++ b/be/src/vec/exprs/vcolumn_ref.h
@@ -57,12 +57,18 @@ public:
         return Status::OK();
     }
 
-    Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override {
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override {
         DCHECK(_open_finished || _getting_const_col);
-        *result_column_id = _column_id + _gap;
+        result_column = block->get_by_position(_column_id + _gap).column;
         return Status::OK();
     }
 
+    DataTypePtr execute_type(const Block* block) const override {
+        DCHECK(_open_finished || _getting_const_col);
+        return block->get_by_position(_column_id + _gap).type;
+    }
+
     bool is_constant() const override { return false; }
 
     int column_id() const { return _column_id; }
diff --git a/be/src/vec/exprs/vcompound_pred.h 
b/be/src/vec/exprs/vcompound_pred.h
index 0eeb2355bf2..126198f486c 100644
--- a/be/src/vec/exprs/vcompound_pred.h
+++ b/be/src/vec/exprs/vcompound_pred.h
@@ -24,6 +24,7 @@
 #include "common/status.h"
 #include "util/simd/bits.h"
 #include "vec/columns/column.h"
+#include "vec/columns/column_nullable.h"
 #include "vec/common/assert_cast.h"
 #include "vec/exprs/vectorized_fn_call.h"
 #include "vec/exprs/vexpr_context.h"
@@ -155,23 +156,20 @@ public:
         return Status::OK();
     }
 
-    Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override {
-        if (fast_execute(context, block, result_column_id)) {
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override {
+        if (fast_execute(context, result_column)) {
             return Status::OK();
         }
         if (get_num_children() == 1 || _has_const_child()) {
-            return VectorizedFnCall::execute(context, block, result_column_id);
+            return VectorizedFnCall::execute_column(context, block, 
result_column);
         }
 
-        int lhs_id = -1;
-        int rhs_id = -1;
-        bool lhs_mem_can_reuse = _children[0]->is_compound_predicate();
-        bool rhs_mem_can_reuse = _children[1]->is_compound_predicate();
-
-        RETURN_IF_ERROR(_children[0]->execute(context, block, &lhs_id));
-        ColumnPtr lhs_column =
-                
block->get_by_position(lhs_id).column->convert_to_full_column_if_const();
+        ColumnPtr lhs_column;
+        RETURN_IF_ERROR(_children[0]->execute_column(context, block, 
lhs_column));
+        lhs_column = lhs_column->convert_to_full_column_if_const();
         size_t size = lhs_column->size();
+
         bool lhs_is_nullable = lhs_column->is_nullable();
         auto [lhs_data_column, lhs_null_map] =
                 _get_raw_data_and_null_map(lhs_column, lhs_is_nullable);
@@ -195,10 +193,9 @@ public:
         bool result_is_nullable = _data_type->is_nullable();
 
         auto get_rhs_colum = [&]() {
-            if (rhs_id == -1) {
-                RETURN_IF_ERROR(_children[1]->execute(context, block, 
&rhs_id));
-                rhs_column =
-                        
block->get_by_position(rhs_id).column->convert_to_full_column_if_const();
+            if (!rhs_column) {
+                RETURN_IF_ERROR(_children[1]->execute_column(context, block, 
rhs_column));
+                rhs_column = rhs_column->convert_to_full_column_if_const();
                 rhs_is_nullable = rhs_column->is_nullable();
                 auto rhs_nullable_column = 
_get_raw_data_and_null_map(rhs_column, rhs_is_nullable);
                 rhs_data_column = rhs_nullable_column.first;
@@ -214,23 +211,11 @@ public:
             return Status::OK();
         };
 
-        auto return_result_column_id = [&](ColumnPtr res_column, int res_id,
-                                           bool mem_reuse) -> int {
-            if (!mem_reuse) {
-                res_column = res_column->clone_resized(size);
+        auto return_result_column_id = [&](ColumnPtr& arg_column) {
+            result_column = std::move(*arg_column).mutate();
+            if (result_is_nullable && !result_column->is_nullable()) {
+                result_column = make_nullable(result_column);
             }
-
-            if (result_is_nullable && !res_column->is_nullable()) {
-                auto result_column =
-                        ColumnNullable::create(res_column, 
ColumnUInt8::create(size, 0));
-                res_id = block->columns();
-                block->insert({std::move(result_column), _data_type, 
_expr_name});
-            } else if (!mem_reuse) {
-                res_id = block->columns();
-                block->insert({std::move(res_column), _data_type, _expr_name});
-            }
-
-            return res_id;
         };
 
         auto create_null_map_column = [&](ColumnPtr& null_map_column,
@@ -245,20 +230,17 @@ public:
         };
 
         auto vector_vector = [&]<bool is_and_op>() {
-            if (lhs_mem_can_reuse) {
-                *result_column_id = lhs_id;
-            } else if (rhs_mem_can_reuse) {
-                *result_column_id = rhs_id;
-
+            if (lhs_column->use_count() == 1) {
+                result_column = lhs_column;
+            } else if (rhs_column->use_count() == 1) {
+                result_column = rhs_column;
                 auto tmp_column = rhs_data_column;
                 rhs_data_column = lhs_data_column;
                 lhs_data_column = tmp_column;
             } else {
-                *result_column_id = block->columns();
-
                 auto col_res = lhs_column->clone_resized(size);
                 lhs_data_column = 
assert_cast<ColumnUInt8*>(col_res.get())->get_data().data();
-                block->insert({std::move(col_res), _data_type, _expr_name});
+                result_column = std::move(col_res);
             }
 
             if constexpr (is_and_op) {
@@ -299,9 +281,7 @@ public:
                     res_datas[i] = lhs_data_column_tmp[i] | 
rhs_data_column_tmp[i];
                 }
             }
-            auto result_column = ColumnNullable::create(std::move(col_res), 
std::move(col_nulls));
-            *result_column_id = block->columns();
-            block->insert({std::move(result_column), _data_type, _expr_name});
+            result_column = ColumnNullable::create(std::move(col_res), 
std::move(col_nulls));
         };
 
         // false and NULL ----> 0
@@ -311,25 +291,23 @@ public:
             //2. nullable column: null map all is not null
             if ((lhs_all_false && !lhs_is_nullable) || (lhs_all_false && 
lhs_all_is_not_null)) {
                 // false and any = false, return lhs
-                *result_column_id = return_result_column_id(lhs_column, 
lhs_id, lhs_mem_can_reuse);
+                return_result_column_id(lhs_column);
             } else {
                 RETURN_IF_ERROR(get_rhs_colum());
 
                 if ((lhs_all_true && !lhs_is_nullable) ||    //not null column
                     (lhs_all_true && lhs_all_is_not_null)) { //nullable column
-                    // true and any = any, return rhs
-                    *result_column_id =
-                            return_result_column_id(rhs_column, rhs_id, 
rhs_mem_can_reuse);
+                                                             // true and any = 
any, return rhs
+
+                    return_result_column_id(rhs_column);
                 } else if ((rhs_all_false && !rhs_is_nullable) ||
                            (rhs_all_false && rhs_all_is_not_null)) {
                     // any and false = false, return rhs
-                    *result_column_id =
-                            return_result_column_id(rhs_column, rhs_id, 
rhs_mem_can_reuse);
+                    return_result_column_id(rhs_column);
                 } else if ((rhs_all_true && !rhs_is_nullable) ||
                            (rhs_all_true && rhs_all_is_not_null)) {
                     // any and true = any, return lhs
-                    *result_column_id =
-                            return_result_column_id(lhs_column, lhs_id, 
lhs_mem_can_reuse);
+                    return_result_column_id(lhs_column);
                 } else {
                     if (!result_is_nullable) {
                         vector_vector.template operator()<true>();
@@ -343,23 +321,20 @@ public:
             // false or NULL ----> NULL
             if ((lhs_all_true && !lhs_is_nullable) || (lhs_all_true && 
lhs_all_is_not_null)) {
                 // true or any = true, return lhs
-                *result_column_id = return_result_column_id(lhs_column, 
lhs_id, lhs_mem_can_reuse);
+                return_result_column_id(lhs_column);
             } else {
                 RETURN_IF_ERROR(get_rhs_colum());
                 if ((lhs_all_false && !lhs_is_nullable) || (lhs_all_false && 
lhs_all_is_not_null)) {
                     // false or any = any, return rhs
-                    *result_column_id =
-                            return_result_column_id(rhs_column, rhs_id, 
rhs_mem_can_reuse);
+                    return_result_column_id(rhs_column);
                 } else if ((rhs_all_true && !rhs_is_nullable) ||
                            (rhs_all_true && rhs_all_is_not_null)) {
                     // any or true = true, return rhs
-                    *result_column_id =
-                            return_result_column_id(rhs_column, rhs_id, 
rhs_mem_can_reuse);
+                    return_result_column_id(rhs_column);
                 } else if ((rhs_all_false && !rhs_is_nullable) ||
                            (rhs_all_false && rhs_all_is_not_null)) {
                     // any or false = any, return lhs
-                    *result_column_id =
-                            return_result_column_id(lhs_column, lhs_id, 
lhs_mem_can_reuse);
+                    return_result_column_id(lhs_column);
                 } else {
                     if (!result_is_nullable) {
                         vector_vector.template operator()<false>();
@@ -375,8 +350,6 @@ public:
         return Status::OK();
     }
 
-    bool is_compound_predicate() const override { return true; }
-
 private:
     static inline constexpr uint8_t apply_and_null(UInt8 a, UInt8 l_null, 
UInt8 b, UInt8 r_null) {
         // (<> && false) is false, (true && NULL) is NULL
diff --git a/be/src/vec/exprs/vdirect_in_predicate.h 
b/be/src/vec/exprs/vdirect_in_predicate.h
index b82868eb74a..a7c57a9f72d 100644
--- a/be/src/vec/exprs/vdirect_in_predicate.h
+++ b/be/src/vec/exprs/vdirect_in_predicate.h
@@ -54,15 +54,14 @@ public:
         return Status::OK();
     }
 
-    Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override {
-        ColumnNumbers arguments;
-        return _do_execute(context, block, result_column_id, arguments);
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override {
+        return _do_execute(context, block, result_column, nullptr);
     }
 
-    Status execute_runtime_filter(doris::vectorized::VExprContext* context,
-                                  doris::vectorized::Block* block, int* 
result_column_id,
-                                  ColumnNumbers& args) override {
-        return _do_execute(context, block, result_column_id, args);
+    Status execute_runtime_filter(VExprContext* context, const Block* block,
+                                  ColumnPtr& result_column, ColumnPtr* 
arg_column) const override {
+        return _do_execute(context, block, result_column, arg_column);
     }
 
     const std::string& expr_name() const override { return _expr_name; }
@@ -108,21 +107,20 @@ public:
     }
 
 private:
-    Status _do_execute(VExprContext* context, Block* block, int* 
result_column_id,
-                       ColumnNumbers& arguments) const {
+    Status _do_execute(VExprContext* context, const Block* block, ColumnPtr& 
result_column,
+                       ColumnPtr* arg_column) const {
         DCHECK(_open_finished || _getting_const_col);
-        arguments.resize(_children.size());
-        for (int i = 0; i < _children.size(); ++i) {
-            int column_id = -1;
-            RETURN_IF_ERROR(_children[i]->execute(context, block, &column_id));
-            arguments[i] = column_id;
+
+        ColumnPtr argument_column;
+        RETURN_IF_ERROR(_children[0]->execute_column(context, block, 
argument_column));
+        argument_column = argument_column->convert_to_full_column_if_const();
+
+        if (arg_column != nullptr) {
+            *arg_column = argument_column;
         }
 
-        uint32_t num_columns_without_result = block->columns();
-        auto res_data_column = ColumnUInt8::create(block->rows());
-        ColumnPtr argument_column =
-                
block->get_by_position(arguments[0]).column->convert_to_full_column_if_const();
         size_t sz = argument_column->size();
+        auto res_data_column = ColumnUInt8::create(block->rows());
         res_data_column->resize(sz);
 
         if (argument_column->is_nullable()) {
@@ -136,10 +134,7 @@ private:
         }
 
         DCHECK(!_data_type->is_nullable());
-
-        block->insert({std::move(res_data_column), _data_type, _expr_name});
-
-        *result_column_id = num_columns_without_result;
+        result_column = std::move(res_data_column);
         return Status::OK();
     }
 
diff --git a/be/src/vec/exprs/vectorized_fn_call.cpp 
b/be/src/vec/exprs/vectorized_fn_call.cpp
index 582d6832283..c551254487a 100644
--- a/be/src/vec/exprs/vectorized_fn_call.cpp
+++ b/be/src/vec/exprs/vectorized_fn_call.cpp
@@ -184,13 +184,13 @@ Status 
VectorizedFnCall::evaluate_inverted_index(VExprContext* context, uint32_t
     return _evaluate_inverted_index(context, _function, segment_num_rows);
 }
 
-Status VectorizedFnCall::_do_execute(doris::vectorized::VExprContext* context,
-                                     doris::vectorized::Block* block, int* 
result_column_id,
-                                     ColumnNumbers& args) const {
+Status VectorizedFnCall::_do_execute(VExprContext* context, const Block* block,
+                                     ColumnPtr& result_column, ColumnPtr* 
arg_column) const {
     if (is_const_and_have_executed()) { // const have executed in open function
-        return get_result_from_const(block, _expr_name, result_column_id);
+        result_column = get_result_from_const(block);
+        return Status::OK();
     }
-    if (fast_execute(context, block, result_column_id)) {
+    if (fast_execute(context, result_column)) {
         return Status::OK();
     }
     DBUG_EXECUTE_IF("VectorizedFnCall.must_in_slow_path", {
@@ -212,19 +212,25 @@ Status 
VectorizedFnCall::_do_execute(doris::vectorized::VExprContext* context,
         }
     })
     DCHECK(_open_finished || _getting_const_col) << debug_string();
-    // TODO: not execute const expr again, but use the const column in 
function context
-    args.resize(_children.size());
+
+    Block temp_block;
+    ColumnNumbers args(_children.size());
+
     for (int i = 0; i < _children.size(); ++i) {
-        int column_id = -1;
-        RETURN_IF_ERROR(_children[i]->execute(context, block, &column_id));
-        args[i] = column_id;
+        ColumnPtr tmp_arg_column;
+        RETURN_IF_ERROR(_children[i]->execute_column(context, block, 
tmp_arg_column));
+        auto arg_type = _children[i]->execute_type(block);
+        temp_block.insert({tmp_arg_column, arg_type, 
_children[i]->expr_name()});
+        args[i] = i;
+
+        if (arg_column != nullptr && i == 0) {
+            *arg_column = tmp_arg_column;
+        }
     }
 
-    RETURN_IF_ERROR(check_constant(*block, args));
-    // call function
-    uint32_t num_columns_without_result = block->columns();
+    uint32_t num_columns_without_result = temp_block.columns();
     // prepare a column to save result
-    block->insert({nullptr, _data_type, _expr_name});
+    temp_block.insert({nullptr, _data_type, _expr_name});
 
     DBUG_EXECUTE_IF("VectorizedFnCall.wait_before_execute", {
         auto possibility = 
DebugPoints::instance()->get_debug_param_or_default<double>(
@@ -235,9 +241,9 @@ Status 
VectorizedFnCall::_do_execute(doris::vectorized::VExprContext* context,
         }
     });
 
-    RETURN_IF_ERROR(_function->execute(context->fn_context(_fn_context_index), 
*block, args,
-                                       num_columns_without_result, 
block->rows(), false));
-    *result_column_id = num_columns_without_result;
+    RETURN_IF_ERROR(_function->execute(context->fn_context(_fn_context_index), 
temp_block, args,
+                                       num_columns_without_result, 
block->rows()));
+    result_column = 
temp_block.get_by_position(num_columns_without_result).column;
     RETURN_IF_ERROR(block->check_type_and_column());
     return Status::OK();
 }
@@ -260,16 +266,15 @@ size_t VectorizedFnCall::estimate_memory(const size_t 
rows) {
     return estimate_size;
 }
 
-Status 
VectorizedFnCall::execute_runtime_filter(doris::vectorized::VExprContext* 
context,
-                                                doris::vectorized::Block* 
block,
-                                                int* result_column_id, 
ColumnNumbers& args) {
-    return _do_execute(context, block, result_column_id, args);
+Status VectorizedFnCall::execute_runtime_filter(VExprContext* context, const 
Block* block,
+                                                ColumnPtr& result_column,
+                                                ColumnPtr* arg_column) const {
+    return _do_execute(context, block, result_column, arg_column);
 }
 
-Status VectorizedFnCall::execute(VExprContext* context, vectorized::Block* 
block,
-                                 int* result_column_id) const {
-    ColumnNumbers arguments;
-    return _do_execute(context, block, result_column_id, arguments);
+Status VectorizedFnCall::execute_column(VExprContext* context, const Block* 
block,
+                                        ColumnPtr& result_column) const {
+    return _do_execute(context, block, result_column, nullptr);
 }
 
 const std::string& VectorizedFnCall::expr_name() const {
diff --git a/be/src/vec/exprs/vectorized_fn_call.h 
b/be/src/vec/exprs/vectorized_fn_call.h
index 06328b00019..cc5ee1d030a 100644
--- a/be/src/vec/exprs/vectorized_fn_call.h
+++ b/be/src/vec/exprs/vectorized_fn_call.h
@@ -52,10 +52,10 @@ public:
     VectorizedFnCall() = default;
 #endif
     VectorizedFnCall(const TExprNode& node);
-    Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override;
-    Status execute_runtime_filter(doris::vectorized::VExprContext* context,
-                                  doris::vectorized::Block* block, int* 
result_column_id,
-                                  ColumnNumbers& args) override;
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override;
+    Status execute_runtime_filter(VExprContext* context, const Block* block,
+                                  ColumnPtr& result_column, ColumnPtr* 
arg_column) const override;
     Status evaluate_inverted_index(VExprContext* context, uint32_t 
segment_num_rows) override;
     Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override;
     Status open(RuntimeState* state, VExprContext* context,
@@ -101,8 +101,8 @@ protected:
     std::string _function_name;
 
 private:
-    Status _do_execute(doris::vectorized::VExprContext* context, 
doris::vectorized::Block* block,
-                       int* result_column_id, ColumnNumbers& args) const;
+    Status _do_execute(VExprContext* context, const Block* block, ColumnPtr& 
result_column,
+                       ColumnPtr* arg_column) const;
 };
 
 #include "common/compile_check_end.h"
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index c1def5065f4..f4076ad01a1 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -803,12 +803,8 @@ Status VExpr::check_constant(const Block& block, 
ColumnNumbers arguments) const
     return Status::OK();
 }
 
-Status VExpr::get_result_from_const(vectorized::Block* block, const 
std::string& expr_name,
-                                    int* result_column_id) const {
-    *result_column_id = block->columns();
-    auto column = ColumnConst::create(_constant_col->column_ptr, 
block->rows());
-    block->insert({std::move(column), _data_type, expr_name});
-    return Status::OK();
+ColumnPtr VExpr::get_result_from_const(const Block* block) const {
+    return ColumnConst::create(_constant_col->column_ptr, block->rows());
 }
 
 Status VExpr::_evaluate_inverted_index(VExprContext* context, const 
FunctionBasePtr& function,
@@ -954,22 +950,15 @@ size_t VExpr::estimate_memory(const size_t rows) {
     return estimate_size;
 }
 
-bool VExpr::fast_execute(doris::vectorized::VExprContext* context, 
doris::vectorized::Block* block,
-                         int* result_column_id) const {
+bool VExpr::fast_execute(VExprContext* context, ColumnPtr& result_column) 
const {
     if (context->get_inverted_index_context() &&
         
context->get_inverted_index_context()->get_inverted_index_result_column().contains(this))
 {
-        uint32_t num_columns_without_result = block->columns();
         // prepare a column to save result
-        auto result_column =
+        result_column =
                 
context->get_inverted_index_context()->get_inverted_index_result_column()[this];
         if (_data_type->is_nullable()) {
-            block->insert(
-                    {ColumnNullable::create(result_column, 
ColumnUInt8::create(block->rows(), 0)),
-                     _data_type, expr_name()});
-        } else {
-            block->insert({result_column, _data_type, expr_name()});
+            result_column = make_nullable(result_column);
         }
-        *result_column_id = num_columns_without_result;
         return true;
     }
     return false;
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index 0f304f7724d..26accb70205 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -30,6 +30,7 @@
 #include <utility>
 #include <vector>
 
+#include "common/be_mock_util.h"
 #include "common/status.h"
 #include "olap/rowset/segment_v2/ann_index/ann_search_params.h"
 #include "olap/rowset/segment_v2/column_reader.h"
@@ -130,7 +131,22 @@ public:
         return Status::InternalError(expr_name() + " is not ready when 
execute");
     }
 
-    virtual Status execute(VExprContext* context, Block* block, int* 
result_column_id) const = 0;
+    virtual Status execute(VExprContext* context, Block* block, int* 
result_column_id) const {
+        ColumnPtr result_column;
+        RETURN_IF_ERROR(execute_column(context, block, result_column));
+        *result_column_id = block->columns();
+        block->insert({result_column, execute_type(block), expr_name()});
+        return Status::OK();
+    }
+
+    // execute current expr and return result column
+    virtual Status execute_column(VExprContext* context, const Block* block,
+                                  ColumnPtr& result_column) const = 0;
+
+    // Currently, due to fe planning issues, for slot-ref expressions the type 
of the returned Column may not match data_type.
+    // Therefore we need a function like this to return the actual type 
produced by execution.
+    virtual DataTypePtr execute_type(const Block* block) const { return 
_data_type; }
+
     // `is_blockable` means this expr will be blocked in `execute` (e.g. AI 
Function, Remote Function)
     [[nodiscard]] virtual bool is_blockable() const {
         return std::any_of(_children.begin(), _children.end(),
@@ -149,9 +165,9 @@ public:
 
     // Only the 4th parameter is used in the runtime filter. In and MinMax 
need overwrite the
     // interface
-    virtual Status execute_runtime_filter(VExprContext* context, Block* block,
-                                          int* result_column_id, 
ColumnNumbers& args) {
-        return execute(context, block, result_column_id);
+    virtual Status execute_runtime_filter(VExprContext* context, const Block* 
block,
+                                          ColumnPtr& result_column, ColumnPtr* 
arg_column) const {
+        return execute_column(context, block, result_column);
     };
 
     /// Subclasses overriding this function should call VExpr::Close().
@@ -233,8 +249,6 @@ public:
 
     bool is_and_expr() const { return _fn.name.function_name == "and"; }
 
-    virtual bool is_compound_predicate() const { return false; }
-
     const TFunction& fn() const { return _fn; }
 
     /// Returns true if expr doesn't contain slotrefs, i.e., can be evaluated
@@ -278,8 +292,7 @@ public:
     }
 
     // fast_execute can direct copy expr filter result which build by apply 
index in segment_iterator
-    bool fast_execute(doris::vectorized::VExprContext* context, 
doris::vectorized::Block* block,
-                      int* result_column_id) const;
+    bool fast_execute(VExprContext* context, ColumnPtr& result_column) const;
 
     virtual bool can_push_down_to_index() const { return false; }
     virtual bool equals(const VExpr& other);
@@ -350,8 +363,7 @@ protected:
         return (is_constant() && (_constant_col != nullptr));
     }
 
-    Status get_result_from_const(vectorized::Block* block, const std::string& 
expr_name,
-                                 int* result_column_id) const;
+    ColumnPtr get_result_from_const(const Block* block) const;
 
     Status check_constant(const Block& block, ColumnNumbers arguments) const;
 
diff --git a/be/src/vec/exprs/vexpr_context.cpp 
b/be/src/vec/exprs/vexpr_context.cpp
index 3d3afa9feff..67514ea364c 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -73,6 +73,12 @@ Status VExprContext::execute(vectorized::Block* block, int* 
result_column_id) {
     return st;
 }
 
+Status VExprContext::execute(Block* block, ColumnPtr& result_column) {
+    Status st;
+    RETURN_IF_CATCH_EXCEPTION({ st = _root->execute_column(this, block, 
result_column); });
+    return st;
+}
+
 bool VExprContext::is_blockable() const {
     return _root->is_blockable();
 }
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index 1331e0526f6..f02296dc268 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -34,6 +34,7 @@
 #include "runtime/runtime_state.h"
 #include "runtime/types.h"
 #include "udf/udf.h"
+#include "vec/columns/column.h"
 #include "vec/core/block.h"
 #include "vec/exprs/vexpr_fwd.h"
 
@@ -172,6 +173,7 @@ public:
     [[nodiscard]] Status open(RuntimeState* state);
     [[nodiscard]] Status clone(RuntimeState* state, VExprContextSPtr& new_ctx);
     [[nodiscard]] Status execute(Block* block, int* result_column_id);
+    [[nodiscard]] Status execute(Block* block, ColumnPtr& result_column);
     [[nodiscard]] bool is_blockable() const;
 
     VExprSPtr root() { return _root; }
diff --git a/be/src/vec/exprs/vin_predicate.cpp 
b/be/src/vec/exprs/vin_predicate.cpp
index f6f50b0ca6b..a35d401cbe2 100644
--- a/be/src/vec/exprs/vin_predicate.cpp
+++ b/be/src/vec/exprs/vin_predicate.cpp
@@ -114,11 +114,13 @@ Status 
VInPredicate::evaluate_inverted_index(VExprContext* context, uint32_t seg
     return _evaluate_inverted_index(context, _function, segment_num_rows);
 }
 
-Status VInPredicate::execute(VExprContext* context, Block* block, int* 
result_column_id) const {
+Status VInPredicate::execute_column(VExprContext* context, const Block* block,
+                                    ColumnPtr& result_column) const {
     if (is_const_and_have_executed()) { // const have execute in open function
-        return get_result_from_const(block, _expr_name, result_column_id);
+        result_column = get_result_from_const(block);
+        return Status::OK();
     }
-    if (fast_execute(context, block, result_column_id)) {
+    if (fast_execute(context, result_column)) {
         return Status::OK();
     }
     DCHECK(_open_finished || _getting_const_col);
@@ -129,20 +131,21 @@ Status VInPredicate::execute(VExprContext* context, 
Block* block, int* result_co
     //  Here, _children[0] is colA
     const size_t args_size = _is_args_all_constant ? 1 : _children.size();
 
-    doris::vectorized::ColumnNumbers arguments(args_size);
+    ColumnNumbers arguments(args_size);
+    Block temp_block;
     for (int i = 0; i < args_size; ++i) {
-        int column_id = -1;
-        RETURN_IF_ERROR(_children[i]->execute(context, block, &column_id));
-        arguments[i] = column_id;
+        ColumnPtr column;
+        RETURN_IF_ERROR(_children[i]->execute_column(context, block, column));
+        arguments.push_back(i);
+        temp_block.insert({column, _children[i]->execute_type(block), 
_children[i]->expr_name()});
     }
-    // call function
-    uint32_t num_columns_without_result = block->columns();
-    // prepare a column to save result
-    block->insert({nullptr, _data_type, _expr_name});
-
-    RETURN_IF_ERROR(_function->execute(context->fn_context(_fn_context_index), 
*block, arguments,
-                                       num_columns_without_result, 
block->rows(), false));
-    *result_column_id = num_columns_without_result;
+
+    int num_columns_without_result = temp_block.columns();
+    temp_block.insert({nullptr, _data_type, _expr_name});
+
+    RETURN_IF_ERROR(_function->execute(context->fn_context(_fn_context_index), 
temp_block,
+                                       arguments, num_columns_without_result, 
block->rows()));
+    result_column = 
temp_block.get_by_position(num_columns_without_result).column;
     return Status::OK();
 }
 
diff --git a/be/src/vec/exprs/vin_predicate.h b/be/src/vec/exprs/vin_predicate.h
index 04e24798e0c..cae926c8ac3 100644
--- a/be/src/vec/exprs/vin_predicate.h
+++ b/be/src/vec/exprs/vin_predicate.h
@@ -45,7 +45,8 @@ public:
     VInPredicate() = default;
 #endif
     ~VInPredicate() override = default;
-    Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override;
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override;
     size_t estimate_memory(const size_t rows) override;
     Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override;
     Status open(RuntimeState* state, VExprContext* context,
diff --git a/be/src/vec/exprs/vinfo_func.cpp b/be/src/vec/exprs/vinfo_func.cpp
index 3c18fbe1bf8..d631daa64bb 100644
--- a/be/src/vec/exprs/vinfo_func.cpp
+++ b/be/src/vec/exprs/vinfo_func.cpp
@@ -55,11 +55,11 @@ VInfoFunc::VInfoFunc(const TExprNode& node) : VExpr(node) {
     this->_column_ptr = _data_type->create_column_const(1, field);
 }
 
-Status VInfoFunc::execute(VExprContext* context, vectorized::Block* block,
-                          int* result_column_id) const {
+Status VInfoFunc::execute_column(VExprContext* context, const Block* block,
+                                 ColumnPtr& result_column) const {
     // Info function should return least one row, e.g. select current_user().
-    size_t row_size = std::max(block->rows(), 1UL);
-    *result_column_id = VExpr::insert_param(block, {_column_ptr, _data_type, 
_expr_name}, row_size);
+    size_t row_size = std::max(block->rows(), _column_ptr->size());
+    result_column = _column_ptr->clone_resized(row_size);
     return Status::OK();
 }
 
diff --git a/be/src/vec/exprs/vinfo_func.h b/be/src/vec/exprs/vinfo_func.h
index d804e697045..8aa8d37e3a2 100644
--- a/be/src/vec/exprs/vinfo_func.h
+++ b/be/src/vec/exprs/vinfo_func.h
@@ -39,7 +39,8 @@ public:
     ~VInfoFunc() override = default;
 
     const std::string& expr_name() const override { return _expr_name; }
-    Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override;
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override;
 
 private:
     const std::string _expr_name = "vinfofunc expr";
diff --git a/be/src/vec/exprs/virtual_slot_ref.cpp 
b/be/src/vec/exprs/virtual_slot_ref.cpp
index eed6b56ef9a..034f76cf9eb 100644
--- a/be/src/vec/exprs/virtual_slot_ref.cpp
+++ b/be/src/vec/exprs/virtual_slot_ref.cpp
@@ -110,7 +110,8 @@ Status VirtualSlotRef::open(RuntimeState* state, 
VExprContext* context,
     return Status::OK();
 }
 
-Status VirtualSlotRef::execute(VExprContext* context, Block* block, int* 
result_column_id) const {
+Status VirtualSlotRef::execute_column(VExprContext* context, const Block* 
block,
+                                      ColumnPtr& result_column) const {
     if (_column_id >= 0 && _column_id >= block->columns()) {
         return Status::Error<ErrorCode::INTERNAL_ERROR>(
                 "input block not contain slot column {}, column_id={}, 
block={}", *_column_name,
@@ -118,6 +119,7 @@ Status VirtualSlotRef::execute(VExprContext* context, 
Block* block, int* result_
     }
 
     ColumnWithTypeAndName col_type_name = block->get_by_position(_column_id);
+    result_column = col_type_name.column;
 
     if (!col_type_name.column) {
         // Maybe we need to create a column in this situation.
@@ -126,25 +128,21 @@ Status VirtualSlotRef::execute(VExprContext* context, 
Block* block, int* result_
                 *_column_name);
     }
 
-    const vectorized::ColumnNothing* col_nothing =
-            check_and_get_column<ColumnNothing>(col_type_name.column.get());
+    const auto* col_nothing = 
check_and_get_column<ColumnNothing>(col_type_name.column.get());
 
     if (this->_virtual_column_expr != nullptr) {
         if (col_nothing != nullptr) {
             // Virtual column is not materialized, so we need to materialize 
it.
             // Note: After executing 'execute', we cannot use the column from 
line 120 in subsequent code,
             // because the vector might be resized during execution, causing 
previous references to become invalid.
-            int tmp_column_id = -1;
-            RETURN_IF_ERROR(_virtual_column_expr->execute(context, block, 
&tmp_column_id));
-
-            // Maybe do clone.
-            block->replace_by_position(_column_id,
-                                       
std::move(block->get_by_position(tmp_column_id).column));
+            ColumnPtr tmp_column;
+            RETURN_IF_ERROR(_virtual_column_expr->execute_column(context, 
block, tmp_column));
+            result_column = std::move(tmp_column);
 
             VLOG_DEBUG << fmt::format(
                     "Materialization of virtual column, slot_id {}, column_id 
{}, "
-                    "tmp_column_id {}, column_name {}, column size {}",
-                    _slot_id, _column_id, tmp_column_id, *_column_name,
+                    "column_name {}, column size {}",
+                    _slot_id, _column_id, *_column_name,
                     block->get_by_position(_column_id).column->size());
         }
 
@@ -168,10 +166,6 @@ Status VirtualSlotRef::execute(VExprContext* context, 
Block* block, int* result_
             return Status::OK();
         }
     }
-
-    *result_column_id = _column_id;
-    VLOG_DEBUG << fmt::format("VirtualSlotRef execute, slot_id {}, column_id 
{}, column_name {}",
-                              _slot_id, _column_id, *_column_name);
     return Status::OK();
 }
 
diff --git a/be/src/vec/exprs/virtual_slot_ref.h 
b/be/src/vec/exprs/virtual_slot_ref.h
index 3640542e713..0de2772486b 100644
--- a/be/src/vec/exprs/virtual_slot_ref.h
+++ b/be/src/vec/exprs/virtual_slot_ref.h
@@ -31,7 +31,8 @@ public:
     Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override;
     Status open(RuntimeState* state, VExprContext* context,
                 FunctionContext::FunctionStateScope scope) override;
-    Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override;
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override;
     const std::string& expr_name() const override;
     std::string expr_label() override;
     std::string debug_string() const override;
diff --git a/be/src/vec/exprs/vlambda_function_call_expr.h 
b/be/src/vec/exprs/vlambda_function_call_expr.h
index dab888977d4..830e4922e86 100644
--- a/be/src/vec/exprs/vlambda_function_call_expr.h
+++ b/be/src/vec/exprs/vlambda_function_call_expr.h
@@ -63,9 +63,10 @@ public:
         return Status::OK();
     }
 
-    Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override {
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override {
         DCHECK(_open_finished || _getting_const_col);
-        return _lambda_function->execute(context, block, result_column_id, 
_data_type, _children);
+        return _lambda_function->execute(context, block, result_column, 
_data_type, _children);
     }
 
     std::string debug_string() const override {
diff --git a/be/src/vec/exprs/vlambda_function_expr.h 
b/be/src/vec/exprs/vlambda_function_expr.h
index 3dce02d786b..766ebdb9af8 100644
--- a/be/src/vec/exprs/vlambda_function_expr.h
+++ b/be/src/vec/exprs/vlambda_function_expr.h
@@ -42,9 +42,14 @@ public:
         return Status::OK();
     }
 
-    Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override {
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override {
         DCHECK(_open_finished || _getting_const_col);
-        return get_child(0)->execute(context, block, result_column_id);
+        return get_child(0)->execute_column(context, block, result_column);
+    }
+
+    DataTypePtr execute_type(const Block* block) const override {
+        return get_child(0)->execute_type(block);
     }
 
     const std::string& expr_name() const override { return _expr_name; }
diff --git a/be/src/vec/exprs/vliteral.cpp b/be/src/vec/exprs/vliteral.cpp
index 115a9d368a4..b2017f577e1 100644
--- a/be/src/vec/exprs/vliteral.cpp
+++ b/be/src/vec/exprs/vliteral.cpp
@@ -49,12 +49,10 @@ Status VLiteral::prepare(RuntimeState* state, const 
RowDescriptor& desc, VExprCo
     return Status::OK();
 }
 
-Status VLiteral::execute(VExprContext* context, vectorized::Block* block,
-                         int* result_column_id) const {
-    // Literal expr should return least one row.
-    // sometimes we just use a VLiteral without open or prepare. so can't 
check it at this moment
+Status VLiteral::execute_column(VExprContext* context, const Block* block,
+                                ColumnPtr& result_column) const {
     size_t row_size = std::max(block->rows(), _column_ptr->size());
-    *result_column_id = VExpr::insert_param(block, {_column_ptr, _data_type, 
_expr_name}, row_size);
+    result_column = _column_ptr->clone_resized(row_size);
     return Status::OK();
 }
 
diff --git a/be/src/vec/exprs/vliteral.h b/be/src/vec/exprs/vliteral.h
index 2c8e32592fa..d416da571f6 100644
--- a/be/src/vec/exprs/vliteral.h
+++ b/be/src/vec/exprs/vliteral.h
@@ -48,7 +48,8 @@ public:
 #endif
 
     Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override;
-    Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override;
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override;
 
     const std::string& expr_name() const override { return _expr_name; }
     std::string debug_string() const override;
diff --git a/be/src/vec/exprs/vmatch_predicate.cpp 
b/be/src/vec/exprs/vmatch_predicate.cpp
index 1b561ad1909..83698f1e43d 100644
--- a/be/src/vec/exprs/vmatch_predicate.cpp
+++ b/be/src/vec/exprs/vmatch_predicate.cpp
@@ -135,9 +135,10 @@ Status 
VMatchPredicate::evaluate_inverted_index(VExprContext* context, uint32_t
     return _evaluate_inverted_index(context, _function, segment_num_rows);
 }
 
-Status VMatchPredicate::execute(VExprContext* context, Block* block, int* 
result_column_id) const {
+Status VMatchPredicate::execute_column(VExprContext* context, const Block* 
block,
+                                       ColumnPtr& result_column) const {
     DCHECK(_open_finished || _getting_const_col);
-    if (fast_execute(context, block, result_column_id)) {
+    if (fast_execute(context, result_column)) {
         return Status::OK();
     }
     DBUG_EXECUTE_IF("VMatchPredicate.execute", {
@@ -159,19 +160,22 @@ Status VMatchPredicate::execute(VExprContext* context, 
Block* block, int* result
                     "column {} should in slow path while 
VMatchPredicate::execute.", column_name);
         }
     })
-    doris::vectorized::ColumnNumbers arguments(_children.size());
+    ColumnNumbers arguments(_children.size());
+    Block temp_block;
     for (int i = 0; i < _children.size(); ++i) {
-        int column_id = -1;
-        RETURN_IF_ERROR(_children[i]->execute(context, block, &column_id));
-        arguments[i] = column_id;
+        ColumnPtr arg_column;
+        RETURN_IF_ERROR(_children[i]->execute_column(context, block, 
arg_column));
+        auto arg_type = _children[i]->execute_type(block);
+        temp_block.insert({arg_column, arg_type, _children[i]->expr_name()});
+        arguments[i] = i;
     }
-    // call function
-    uint32_t num_columns_without_result = block->columns();
+    uint32_t num_columns_without_result = temp_block.columns();
     // prepare a column to save result
-    block->insert({nullptr, _data_type, _expr_name});
-    RETURN_IF_ERROR(_function->execute(context->fn_context(_fn_context_index), 
*block, arguments,
-                                       num_columns_without_result, 
block->rows(), false));
-    *result_column_id = num_columns_without_result;
+    temp_block.insert({nullptr, _data_type, _expr_name});
+
+    RETURN_IF_ERROR(_function->execute(context->fn_context(_fn_context_index), 
temp_block,
+                                       arguments, num_columns_without_result, 
block->rows()));
+    result_column = 
temp_block.get_by_position(num_columns_without_result).column;
     return Status::OK();
 }
 
diff --git a/be/src/vec/exprs/vmatch_predicate.h 
b/be/src/vec/exprs/vmatch_predicate.h
index 68ddb4c6a9e..78dc06728c3 100644
--- a/be/src/vec/exprs/vmatch_predicate.h
+++ b/be/src/vec/exprs/vmatch_predicate.h
@@ -49,7 +49,8 @@ class VMatchPredicate final : public VExpr {
 public:
     VMatchPredicate(const TExprNode& node);
     ~VMatchPredicate() override;
-    Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override;
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override;
     Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override;
     Status open(RuntimeState* state, VExprContext* context,
                 FunctionContext::FunctionStateScope scope) override;
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp 
b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
index f2fd4ce9b6d..8e915ffff67 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
@@ -87,35 +87,32 @@ void VRuntimeFilterWrapper::close(VExprContext* context,
     _impl->close(context, scope);
 }
 
-Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block,
-                                      int* result_column_id) const {
+Status VRuntimeFilterWrapper::execute_column(VExprContext* context, const 
Block* block,
+                                             ColumnPtr& result_column) const {
     DCHECK(_open_finished || _getting_const_col);
     if (_judge_counter.fetch_sub(1) == 0) {
         reset_judge_selectivity();
     }
     if (_always_true) {
         size_t size = block->rows();
-        block->insert({create_always_true_column(size, 
_data_type->is_nullable()), _data_type,
-                       expr_name()});
-        *result_column_id = block->columns() - 1;
+        result_column = create_always_true_column(size, 
_data_type->is_nullable());
         COUNTER_UPDATE(_always_true_filter_rows, size);
         return Status::OK();
     } else {
         if (_getting_const_col) {
             _impl->set_getting_const_col(true);
         }
-        ColumnNumbers args;
-        RETURN_IF_ERROR(_impl->execute_runtime_filter(context, block, 
result_column_id, args));
+
+        ColumnPtr arg_column = nullptr;
+        RETURN_IF_ERROR(_impl->execute_runtime_filter(context, block, 
result_column, &arg_column));
         if (_getting_const_col) {
             _impl->set_getting_const_col(false);
         }
 
-        ColumnWithTypeAndName& result_column = 
block->get_by_position(*result_column_id);
-
         // bloom filter will handle null aware inside itself
         if (_null_aware && TExprNodeType::BLOOM_PRED != node_type()) {
-            DCHECK_GE(args.size(), 1);
-            change_null_to_true(result_column.column, 
block->get_by_position(args[0]).column);
+            DCHECK(arg_column);
+            change_null_to_true(result_column->assume_mutable(), arg_column);
         }
 
         return Status::OK();
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h 
b/be/src/vec/exprs/vruntimefilter_wrapper.h
index fa060255e06..cdf30cff814 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.h
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.h
@@ -53,7 +53,8 @@ public:
     VRuntimeFilterWrapper(const TExprNode& node, VExprSPtr impl, double 
ignore_thredhold,
                           bool null_aware, int filter_id);
     ~VRuntimeFilterWrapper() override = default;
-    Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override;
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override;
     Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override;
     Status open(RuntimeState* state, VExprContext* context,
                 FunctionContext::FunctionStateScope scope) override;
diff --git a/be/src/vec/exprs/vsearch.cpp b/be/src/vec/exprs/vsearch.cpp
index def87bccc81..a0ea7a13306 100644
--- a/be/src/vec/exprs/vsearch.cpp
+++ b/be/src/vec/exprs/vsearch.cpp
@@ -125,8 +125,9 @@ const std::string& VSearchExpr::expr_name() const {
     return name;
 }
 
-Status VSearchExpr::execute(VExprContext* context, Block* block, int* 
result_column_id) const {
-    if (fast_execute(context, block, result_column_id)) {
+Status VSearchExpr::execute_column(VExprContext* context, const Block* block,
+                                   ColumnPtr& result_column) const {
+    if (fast_execute(context, result_column)) {
         return Status::OK();
     }
 
diff --git a/be/src/vec/exprs/vsearch.h b/be/src/vec/exprs/vsearch.h
index ede53c6872a..602524fe300 100644
--- a/be/src/vec/exprs/vsearch.h
+++ b/be/src/vec/exprs/vsearch.h
@@ -26,7 +26,8 @@ class VSearchExpr : public VExpr {
 public:
     VSearchExpr(const TExprNode& node);
     ~VSearchExpr() override = default;
-    Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override;
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override;
     const std::string& expr_name() const override;
     Status evaluate_inverted_index(VExprContext* context, uint32_t 
segment_num_rows) override;
 
diff --git a/be/src/vec/exprs/vslot_ref.cpp b/be/src/vec/exprs/vslot_ref.cpp
index b896e2f1df7..086be05cea4 100644
--- a/be/src/vec/exprs/vslot_ref.cpp
+++ b/be/src/vec/exprs/vslot_ref.cpp
@@ -97,6 +97,26 @@ Status VSlotRef::execute(VExprContext* context, Block* 
block, int* result_column
     return Status::OK();
 }
 
+Status VSlotRef::execute_column(VExprContext* context, const Block* block,
+                                ColumnPtr& result_column) const {
+    if (_column_id >= 0 && _column_id >= block->columns()) {
+        return Status::Error<ErrorCode::INTERNAL_ERROR>(
+                "input block not contain slot column {}, column_id={}, 
block={}", *_column_name,
+                _column_id, block->dump_structure());
+    }
+    result_column = block->get_by_position(_column_id).column;
+    return Status::OK();
+}
+
+DataTypePtr VSlotRef::execute_type(const Block* block) const {
+    if (_column_id >= 0 && _column_id >= block->columns()) {
+        throw doris::Exception(ErrorCode::INTERNAL_ERROR,
+                               "input block not contain slot column {}, 
column_id={}, block={}",
+                               *_column_name, _column_id, 
block->dump_structure());
+    }
+    return block->get_by_position(_column_id).type;
+}
+
 const std::string& VSlotRef::expr_name() const {
     return *_column_name;
 }
diff --git a/be/src/vec/exprs/vslot_ref.h b/be/src/vec/exprs/vslot_ref.h
index f468ecbdb37..eb101a780a1 100644
--- a/be/src/vec/exprs/vslot_ref.h
+++ b/be/src/vec/exprs/vslot_ref.h
@@ -47,6 +47,9 @@ public:
     Status open(RuntimeState* state, VExprContext* context,
                 FunctionContext::FunctionStateScope scope) override;
     Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override;
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override;
+    DataTypePtr execute_type(const Block* block) const override;
 
     const std::string& expr_name() const override;
     std::string expr_label() override;
diff --git a/be/src/vec/exprs/vtopn_pred.h b/be/src/vec/exprs/vtopn_pred.h
index a22c936e530..409b054bd51 100644
--- a/be/src/vec/exprs/vtopn_pred.h
+++ b/be/src/vec/exprs/vtopn_pred.h
@@ -81,35 +81,43 @@ public:
         return Status::OK();
     }
 
-    Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override {
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override {
         if (!_predicate->has_value()) {
-            block->insert({create_always_true_column(block->rows(), 
_data_type->is_nullable()),
-                           _data_type, _expr_name});
-            *result_column_id = block->columns() - 1;
+            result_column = create_always_true_column(block->rows(), 
_data_type->is_nullable());
             return Status::OK();
         }
 
+        Block temp_block;
+
+        // slot
+        ColumnPtr slot_column;
+        RETURN_IF_ERROR(_children[0]->execute_column(context, block, 
slot_column));
+        auto slot_type = _children[0]->execute_type(block);
+        temp_block.insert({slot_column, slot_type, _children[0]->expr_name()});
+        int slot_id = 0;
+
+        // topn value
         Field field = _predicate->get_value();
         auto column_ptr = _children[0]->data_type()->create_column_const(1, 
field);
-        size_t row_size = std::max(block->rows(), column_ptr->size());
-        int topn_value_id = VExpr::insert_param(
-                block, {column_ptr, _children[0]->data_type(), _expr_name}, 
row_size);
+        int topn_value_id = VExpr::insert_param(&temp_block,
+                                                {column_ptr, 
_children[0]->data_type(), _expr_name},
+                                                std::max(block->rows(), 
column_ptr->size()));
 
-        int slot_id = -1;
-        RETURN_IF_ERROR(_children[0]->execute(context, block, &slot_id));
         // if error(slot_id == -1), will return.
         ColumnNumbers arguments = {static_cast<uint32_t>(slot_id),
                                    static_cast<uint32_t>(topn_value_id)};
 
-        uint32_t num_columns_without_result = block->columns();
-        block->insert({nullptr, _data_type, _expr_name});
-        RETURN_IF_ERROR(_function->execute(nullptr, *block, arguments, 
num_columns_without_result,
-                                           block->rows(), false));
-        *result_column_id = num_columns_without_result;
+        uint32_t num_columns_without_result = temp_block.columns();
+        // prepare a column to save result
+        temp_block.insert({nullptr, _data_type, _expr_name});
 
+        RETURN_IF_ERROR(_function->execute(nullptr, temp_block, arguments,
+                                           num_columns_without_result, 
block->rows()));
+        result_column = 
std::move(temp_block.get_by_position(num_columns_without_result).column);
         if (is_nullable() && _predicate->nulls_first()) {
             // null values ​​are always not filtered
-            
change_null_to_true(block->get_by_position(num_columns_without_result).column);
+            change_null_to_true(result_column);
         }
         return Status::OK();
     }
diff --git a/be/src/vec/functions/cast/cast_to_variant.h 
b/be/src/vec/functions/cast/cast_to_variant.h
index c4722375d8f..cc236b1dec8 100644
--- a/be/src/vec/functions/cast/cast_to_variant.h
+++ b/be/src/vec/functions/cast/cast_to_variant.h
@@ -67,7 +67,7 @@ struct CastFromVariant {
                 // Note: here we should return the nullable result column
                 col_to = wrap_in_nullable(
                         col_to, Block({{nested, nested_from_type, ""}, 
{col_to, data_type_to, ""}}),
-                        {0}, 1, input_rows_count);
+                        {0}, input_rows_count);
             }
         } else {
             if (variant.only_have_default_values()) {
diff --git a/be/src/vec/functions/cast/function_cast.cpp 
b/be/src/vec/functions/cast/function_cast.cpp
index d5bb7422a57..7478ab6d4c0 100644
--- a/be/src/vec/functions/cast/function_cast.cpp
+++ b/be/src/vec/functions/cast/function_cast.cpp
@@ -193,7 +193,7 @@ WrapperType prepare_remove_nullable(FunctionContext* 
context, const DataTypePtr&
 
             block.get_by_position(result).column =
                     
wrap_in_nullable(block.get_by_position(nested_result_index).column, block,
-                                     arguments, result, input_rows_count);
+                                     arguments, input_rows_count);
 
             block.erase(nested_source_index);
             block.erase(nested_result_index);
diff --git a/be/src/vec/functions/function.cpp 
b/be/src/vec/functions/function.cpp
index 17ce7b846ee..dea9eab86b2 100644
--- a/be/src/vec/functions/function.cpp
+++ b/be/src/vec/functions/function.cpp
@@ -43,7 +43,7 @@
 namespace doris::vectorized {
 #include "common/compile_check_begin.h"
 ColumnPtr wrap_in_nullable(const ColumnPtr& src, const Block& block, const 
ColumnNumbers& args,
-                           uint32_t result, size_t input_rows_count) {
+                           size_t input_rows_count) {
     ColumnPtr result_null_map_column;
     /// If result is already nullable.
     ColumnPtr src_not_nullable = src;
@@ -105,26 +105,23 @@ bool have_null_column(const ColumnsWithTypeAndName& args) 
{
     return std::ranges::any_of(args, [](const auto& elem) { return 
elem.type->is_nullable(); });
 }
 
-inline Status PreparedFunctionImpl::_execute_skipped_constant_deal(
-        FunctionContext* context, Block& block, const ColumnNumbers& args, 
uint32_t result,
-        size_t input_rows_count, bool dry_run) const {
+inline Status 
PreparedFunctionImpl::_execute_skipped_constant_deal(FunctionContext* context,
+                                                                   Block& 
block,
+                                                                   const 
ColumnNumbers& args,
+                                                                   uint32_t 
result,
+                                                                   size_t 
input_rows_count) const {
     bool executed = false;
     RETURN_IF_ERROR(default_implementation_for_nulls(context, block, args, 
result, input_rows_count,
-                                                     dry_run, &executed));
+                                                     &executed));
     if (executed) {
         return Status::OK();
     }
-
-    if (dry_run) {
-        return execute_impl_dry_run(context, block, args, result, 
input_rows_count);
-    } else {
-        return execute_impl(context, block, args, result, input_rows_count);
-    }
+    return execute_impl(context, block, args, result, input_rows_count);
 }
 
 Status PreparedFunctionImpl::default_implementation_for_constant_arguments(
         FunctionContext* context, Block& block, const ColumnNumbers& args, 
uint32_t result,
-        size_t input_rows_count, bool dry_run, bool* executed) const {
+        size_t input_rows_count, bool* executed) const {
     *executed = false;
     ColumnNumbers args_expect_const = get_arguments_that_are_always_constant();
 
@@ -153,15 +150,15 @@ Status 
PreparedFunctionImpl::default_implementation_for_constant_arguments(
         // If we unpack it, there will be unnecessary cost of virtual judge.
         if (args_expect_const.end() !=
             std::find(args_expect_const.begin(), args_expect_const.end(), 
arg_num)) {
-            temporary_block.insert({column.column, column.type, column.name});
+            temporary_block.simple_insert({column.column, column.type, 
column.name});
         } else {
-            temporary_block.insert(
+            temporary_block.simple_insert(
                     {assert_cast<const 
ColumnConst*>(column.column.get())->get_data_column_ptr(),
                      column.type, column.name});
         }
     }
 
-    temporary_block.insert(block.get_by_position(result));
+    temporary_block.simple_insert(block.get_by_position(result));
 
     ColumnNumbers temporary_argument_numbers(arguments_size);
     for (int i = 0; i < arguments_size; ++i) {
@@ -170,7 +167,7 @@ Status 
PreparedFunctionImpl::default_implementation_for_constant_arguments(
 
     RETURN_IF_ERROR(_execute_skipped_constant_deal(context, temporary_block,
                                                    temporary_argument_numbers, 
arguments_size,
-                                                   temporary_block.rows(), 
dry_run));
+                                                   temporary_block.rows()));
 
     ColumnPtr result_column;
     /// extremely rare case, when we have function with completely const 
arguments
@@ -188,7 +185,7 @@ Status 
PreparedFunctionImpl::default_implementation_for_constant_arguments(
 
 Status PreparedFunctionImpl::default_implementation_for_nulls(
         FunctionContext* context, Block& block, const ColumnNumbers& args, 
uint32_t result,
-        size_t input_rows_count, bool dry_run, bool* executed) const {
+        size_t input_rows_count, bool* executed) const {
     *executed = false;
     if (args.empty() || !use_default_implementation_for_nulls()) {
         return Status::OK();
@@ -207,47 +204,47 @@ Status 
PreparedFunctionImpl::default_implementation_for_nulls(
         bool need_to_default = need_replace_null_data_to_default();
         // extract nested column from nulls
         ColumnNumbers new_args;
-        for (auto arg : args) {
-            new_args.push_back(block.columns());
-            
block.insert(block.get_by_position(arg).unnest_nullable(need_to_default));
-            
DCHECK(!block.get_by_position(new_args.back()).column->is_nullable());
+        Block new_block;
+
+        for (int i = 0; i < args.size(); ++i) {
+            uint32_t arg = args[i];
+            new_args.push_back(i);
+            
new_block.simple_insert(block.get_by_position(arg).unnest_nullable(need_to_default));
         }
-        RETURN_IF_ERROR(execute_without_low_cardinality_columns(context, 
block, new_args, result,
-                                                                block.rows(), 
dry_run));
+        new_block.simple_insert(block.get_by_position(result));
+        int new_result = new_block.columns() - 1;
+
+        RETURN_IF_ERROR(default_execute(context, new_block, new_args, 
new_result, block.rows()));
         // After run with nested, wrap them in null. Before this, 
block.get_by_position(result).type
         // is not compatible with get_by_position(result).column
+
         block.get_by_position(result).column = wrap_in_nullable(
-                block.get_by_position(result).column, block, args, result, 
input_rows_count);
+                new_block.get_by_position(new_result).column, block, args, 
input_rows_count);
 
-        while (!new_args.empty()) {
-            block.erase(new_args.back());
-            new_args.pop_back();
-        }
         *executed = true;
         return Status::OK();
     }
     return Status::OK();
 }
 
-Status PreparedFunctionImpl::execute_without_low_cardinality_columns(
-        FunctionContext* context, Block& block, const ColumnNumbers& args, 
uint32_t result,
-        size_t input_rows_count, bool dry_run) const {
+Status PreparedFunctionImpl::default_execute(FunctionContext* context, Block& 
block,
+                                             const ColumnNumbers& args, 
uint32_t result,
+                                             size_t input_rows_count) const {
     bool executed = false;
 
-    RETURN_IF_ERROR(default_implementation_for_constant_arguments(
-            context, block, args, result, input_rows_count, dry_run, 
&executed));
+    RETURN_IF_ERROR(default_implementation_for_constant_arguments(context, 
block, args, result,
+                                                                  
input_rows_count, &executed));
     if (executed) {
         return Status::OK();
     }
 
-    return _execute_skipped_constant_deal(context, block, args, result, 
input_rows_count, dry_run);
+    return _execute_skipped_constant_deal(context, block, args, result, 
input_rows_count);
 }
 
 Status PreparedFunctionImpl::execute(FunctionContext* context, Block& block,
                                      const ColumnNumbers& args, uint32_t 
result,
-                                     size_t input_rows_count, bool dry_run) 
const {
-    return execute_without_low_cardinality_columns(context, block, args, 
result, input_rows_count,
-                                                   dry_run);
+                                     size_t input_rows_count) const {
+    return default_execute(context, block, args, result, input_rows_count);
 }
 
 void FunctionBuilderImpl::check_number_of_arguments(size_t 
number_of_arguments) const {
diff --git a/be/src/vec/functions/function.h b/be/src/vec/functions/function.h
index e944ae15a09..3c9c389aa8a 100644
--- a/be/src/vec/functions/function.h
+++ b/be/src/vec/functions/function.h
@@ -98,7 +98,7 @@ public:
     virtual String get_name() const = 0;
 
     virtual Status execute(FunctionContext* context, Block& block, const 
ColumnNumbers& arguments,
-                           uint32_t result, size_t input_rows_count, bool 
dry_run) const = 0;
+                           uint32_t result, size_t input_rows_count) const = 0;
 };
 
 using PreparedFunctionPtr = std::shared_ptr<IPreparedFunction>;
@@ -106,7 +106,7 @@ using PreparedFunctionPtr = 
std::shared_ptr<IPreparedFunction>;
 class PreparedFunctionImpl : public IPreparedFunction {
 public:
     Status execute(FunctionContext* context, Block& block, const 
ColumnNumbers& arguments,
-                   uint32_t result, size_t input_rows_count, bool dry_run = 
false) const final;
+                   uint32_t result, size_t input_rows_count) const final;
 
     /** If the function have non-zero number of arguments,
       *  and if all arguments are constant, that we could automatically 
provide default implementation:
@@ -122,12 +122,6 @@ public:
     virtual bool need_replace_null_data_to_default() const { return false; }
 
 protected:
-    virtual Status execute_impl_dry_run(FunctionContext* context, Block& block,
-                                        const ColumnNumbers& arguments, 
uint32_t result,
-                                        size_t input_rows_count) const {
-        return execute_impl(context, block, arguments, result, 
input_rows_count);
-    }
-
     virtual Status execute_impl(FunctionContext* context, Block& block,
                                 const ColumnNumbers& arguments, uint32_t 
result,
                                 size_t input_rows_count) const = 0;
@@ -150,18 +144,16 @@ protected:
 private:
     Status default_implementation_for_nulls(FunctionContext* context, Block& 
block,
                                             const ColumnNumbers& args, 
uint32_t result,
-                                            size_t input_rows_count, bool 
dry_run,
-                                            bool* executed) const;
+                                            size_t input_rows_count, bool* 
executed) const;
     Status default_implementation_for_constant_arguments(FunctionContext* 
context, Block& block,
                                                          const ColumnNumbers& 
args, uint32_t result,
-                                                         size_t 
input_rows_count, bool dry_run,
+                                                         size_t 
input_rows_count,
                                                          bool* executed) const;
-    Status execute_without_low_cardinality_columns(FunctionContext* context, 
Block& block,
-                                                   const ColumnNumbers& 
arguments, uint32_t result,
-                                                   size_t input_rows_count, 
bool dry_run) const;
+    Status default_execute(FunctionContext* context, Block& block, const 
ColumnNumbers& arguments,
+                           uint32_t result, size_t input_rows_count) const;
     Status _execute_skipped_constant_deal(FunctionContext* context, Block& 
block,
                                           const ColumnNumbers& args, uint32_t 
result,
-                                          size_t input_rows_count, bool 
dry_run) const;
+                                          size_t input_rows_count) const;
 };
 
 /// Function with known arguments and return type.
@@ -187,10 +179,10 @@ public:
     }
 
     Status execute(FunctionContext* context, Block& block, const 
ColumnNumbers& arguments,
-                   uint32_t result, size_t input_rows_count, bool dry_run = 
false) const {
+                   uint32_t result, size_t input_rows_count) const {
         try {
             return prepare(context, block, arguments, result)
-                    ->execute(context, block, arguments, result, 
input_rows_count, dry_run);
+                    ->execute(context, block, arguments, result, 
input_rows_count);
         } catch (const Exception& e) {
             return e.to_status();
         }
@@ -391,7 +383,6 @@ public:
     }
 
     using PreparedFunctionImpl::execute;
-    using PreparedFunctionImpl::execute_impl_dry_run;
     using FunctionBuilderImpl::get_return_type_impl;
     using FunctionBuilderImpl::get_variadic_argument_types_impl;
     using FunctionBuilderImpl::get_return_type;
@@ -540,11 +531,10 @@ private:
 };
 
 using FunctionPtr = std::shared_ptr<IFunction>;
-
 /** Return ColumnNullable of src, with null map as OR-ed null maps of args 
columns in blocks.
   * Or ColumnConst(ColumnNullable) if the result is always NULL or if the 
result is constant and always not NULL.
   */
 ColumnPtr wrap_in_nullable(const ColumnPtr& src, const Block& block, const 
ColumnNumbers& args,
-                           uint32_t result, size_t input_rows_count);
+                           size_t input_rows_count);
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/functions/function_rpc.h 
b/be/src/vec/functions/function_rpc.h
index 5d6a3432392..862654c73a4 100644
--- a/be/src/vec/functions/function_rpc.h
+++ b/be/src/vec/functions/function_rpc.h
@@ -72,7 +72,7 @@ public:
     String get_name() const override { return "RPCPreparedFunction: "; }
 
     Status execute(FunctionContext* context, Block& block, const 
ColumnNumbers& arguments,
-                   uint32_t result, size_t input_rows_count, bool dry_run) 
const override {
+                   uint32_t result, size_t input_rows_count) const override {
         auto* fn = reinterpret_cast<RPCFnImpl*>(
                 context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
         return fn->vec_call(context, block, arguments, result, 
input_rows_count);
diff --git a/be/test/exprs/mock_vexpr.h b/be/test/exprs/mock_vexpr.h
index 720e2f0dd62..09f5daf4963 100644
--- a/be/test/exprs/mock_vexpr.h
+++ b/be/test/exprs/mock_vexpr.h
@@ -31,6 +31,8 @@ public:
     MOCK_CONST_METHOD0(expr_name, const std::string&());
     MOCK_CONST_METHOD3(execute, Status(VExprContext* context, 
vectorized::Block* block,
                                        int* result_column_id));
+    MOCK_CONST_METHOD3(execute_column, Status(VExprContext* context, const 
vectorized::Block* block,
+                                              ColumnPtr& result_column));
 }; // class MockVExpr
 
 } // namespace vectorized
diff --git a/be/test/exprs/virtual_slot_ref_test.cpp 
b/be/test/exprs/virtual_slot_ref_test.cpp
index 420b44c32d3..d1a378a06bb 100644
--- a/be/test/exprs/virtual_slot_ref_test.cpp
+++ b/be/test/exprs/virtual_slot_ref_test.cpp
@@ -171,6 +171,11 @@ TEST_F(VirtualSlotRefTest, 
EqualsFunction_WithDifferentTypes) {
         Status execute(VExprContext* context, Block* block, int* 
result_column_id) const override {
             return Status::OK();
         }
+        Status execute_column(VExprContext* context, const Block* block,
+                              ColumnPtr& result_column) const override {
+            return Status::OK();
+        }
+
         const std::string& expr_name() const override {
             static std::string name = "mock";
             return name;
@@ -286,6 +291,12 @@ TEST_F(VirtualSlotRefTest, EqualsFunction_TestAllBranches) 
{
         Status execute(VExprContext* context, Block* block, int* 
result_column_id) const override {
             return Status::OK();
         }
+
+        Status execute_column(VExprContext* context, const Block* block,
+                              ColumnPtr& result_column) const override {
+            return Status::OK();
+        }
+
         const std::string& expr_name() const override {
             static std::string name = "different";
             return name;
@@ -306,6 +317,10 @@ TEST_F(VirtualSlotRefTest, EqualsFunction_TestAllBranches) 
{
         Status execute(VExprContext* context, Block* block, int* 
result_column_id) const override {
             return Status::OK();
         }
+        Status execute_column(VExprContext* context, const Block* block,
+                              ColumnPtr& result_column) const override {
+            return Status::OK();
+        }
         const std::string& expr_name() const override {
             static std::string name = "non_virtual_slot_ref";
             return name;
diff --git a/be/test/olap/collection_statistics_test.cpp 
b/be/test/olap/collection_statistics_test.cpp
index 669f041fc2f..87c7e9d5c05 100644
--- a/be/test/olap/collection_statistics_test.cpp
+++ b/be/test/olap/collection_statistics_test.cpp
@@ -52,6 +52,11 @@ public:
         return Status::OK();
     }
 
+    Status execute_column(vectorized::VExprContext* context, const 
vectorized::Block* block,
+                          vectorized::ColumnPtr& result_column) const override 
{
+        return Status::OK();
+    }
+
     Status prepare(RuntimeState* state, const RowDescriptor& desc,
                    vectorized::VExprContext* context) override {
         return Status::OK();
diff --git a/be/test/vec/exprs/try_cast_expr_test.cpp 
b/be/test/vec/exprs/try_cast_expr_test.cpp
index acf66dfb4a7..75c2feb64b2 100644
--- a/be/test/vec/exprs/try_cast_expr_test.cpp
+++ b/be/test/vec/exprs/try_cast_expr_test.cpp
@@ -20,6 +20,7 @@
 #include <memory>
 
 #include "runtime/primitive_type.h"
+#include "udf/udf.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/core/field.h"
 #include "vec/core/types.h"
@@ -125,7 +126,7 @@ class MockVExprForTryCast : public VExpr {
 public:
     MockVExprForTryCast() = default;
     MOCK_CONST_METHOD0(clone, VExprSPtr());
-    MOCK_CONST_METHOD0(expr_name, const std::string&());
+    const std::string& expr_name() const override { return _expr_name; }
 
     Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override {
         auto int_type = std::make_shared<DataTypeInt32>();
@@ -138,6 +139,24 @@ public:
         *result_column_id = 0;
         return Status::OK();
     }
+
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override {
+        auto int_type = std::make_shared<DataTypeInt32>();
+        auto int_column = int_type->create_column();
+        for (int i = 0; i < 3; i++) {
+            Int32 x = i;
+            int_column->insert_data((const char*)&x, sizeof(Int32));
+        }
+        result_column = std::move(int_column);
+        return Status::OK();
+    }
+
+    DataTypePtr execute_type(const Block* block) const override {
+        return std::make_shared<DataTypeInt32>();
+    }
+
+    std::string _expr_name;
 };
 
 struct TryCastExprTest : public ::testing::Test {
diff --git a/be/test/vec/exprs/vsearch_expr_test.cpp 
b/be/test/vec/exprs/vsearch_expr_test.cpp
index e342b7b1645..7b33ca121d2 100644
--- a/be/test/vec/exprs/vsearch_expr_test.cpp
+++ b/be/test/vec/exprs/vsearch_expr_test.cpp
@@ -70,6 +70,10 @@ public:
     }
 
     Status execute(VExprContext*, Block*, int*) const override { return 
Status::OK(); }
+    Status execute_column(VExprContext* context, const Block* block,
+                          ColumnPtr& result_column) const override {
+        return Status::OK();
+    }
 };
 
 const std::string& intern_column_name(const std::string& name) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to