This is an automated email from the ASF dual-hosted git repository.

Mryange pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c27fd0ba968 [refactor](be) Add operator IO wrappers (#64139)
c27fd0ba968 is described below

commit c27fd0ba968daffe8329186205315f8b1cd147b7
Author: Mryange <[email protected]>
AuthorDate: Sat Jun 6 17:46:46 2026 +0800

    [refactor](be) Add operator IO wrappers (#64139)
    
    ### What problem does this PR solve?
    
    Issue Number: N/A
    
    Problem Summary:
    
    Pipeline operator source and sink paths need a common place to validate
    output and input blocks. Before this change, `sink` and `get_block` were
    the virtual override points, so common validation either had to stay in
    call sites or be duplicated across operator implementations.
    
    Root cause: the public operator data-flow entry points were also the
    polymorphic implementation hooks, which left no wrapper layer for shared
    checks.
    
    This change makes `DataSinkOperatorXBase::sink` and
    `OperatorXBase::get_block` non-virtual wrappers. The wrappers run
    `Block::check_type_and_column()` at the source/sink boundary and then
    dispatch to the new virtual `sink_impl` and `get_block_impl` methods.
    All pipeline operator implementations, exchange operators, scan
    operators, and related BE test mocks are migrated to the new impl
    methods. The scan projection path is updated to call the base
    `get_block` wrapper so the shared checks still apply.
---
 .../exec/exchange/local_exchange_sink_operator.cpp |  2 +-
 .../exec/exchange/local_exchange_sink_operator.h   |  2 +-
 .../exchange/local_exchange_source_operator.cpp    |  2 +-
 .../exec/exchange/local_exchange_source_operator.h |  2 +-
 be/src/exec/operator/aggregation_sink_operator.cpp |  2 +-
 be/src/exec/operator/aggregation_sink_operator.h   |  2 +-
 .../exec/operator/aggregation_source_operator.cpp  |  2 +-
 be/src/exec/operator/aggregation_source_operator.h |  2 +-
 be/src/exec/operator/analytic_sink_operator.cpp    |  2 +-
 be/src/exec/operator/analytic_sink_operator.h      |  2 +-
 be/src/exec/operator/analytic_source_operator.cpp  |  3 ++-
 be/src/exec/operator/analytic_source_operator.h    |  2 +-
 be/src/exec/operator/blackhole_sink_operator.cpp   |  2 +-
 be/src/exec/operator/blackhole_sink_operator.h     |  2 +-
 .../bucketed_aggregation_sink_operator.cpp         |  2 +-
 .../operator/bucketed_aggregation_sink_operator.h  |  2 +-
 .../bucketed_aggregation_source_operator.cpp       |  2 +-
 .../bucketed_aggregation_source_operator.h         |  2 +-
 be/src/exec/operator/cache_sink_operator.cpp       |  2 +-
 be/src/exec/operator/cache_sink_operator.h         |  2 +-
 be/src/exec/operator/cache_source_operator.cpp     |  2 +-
 be/src/exec/operator/cache_source_operator.h       |  2 +-
 be/src/exec/operator/datagen_operator.cpp          |  2 +-
 be/src/exec/operator/datagen_operator.h            |  2 +-
 be/src/exec/operator/dict_sink_operator.cpp        |  2 +-
 be/src/exec/operator/dict_sink_operator.h          |  2 +-
 be/src/exec/operator/empty_set_operator.cpp        |  2 +-
 be/src/exec/operator/empty_set_operator.h          |  2 +-
 be/src/exec/operator/exchange_sink_operator.cpp    |  2 +-
 be/src/exec/operator/exchange_sink_operator.h      |  2 +-
 be/src/exec/operator/exchange_source_operator.cpp  |  2 +-
 be/src/exec/operator/exchange_source_operator.h    |  2 +-
 .../operator/group_commit_block_sink_operator.cpp  |  2 +-
 .../operator/group_commit_block_sink_operator.h    |  2 +-
 .../exec/operator/group_commit_scan_operator.cpp   |  2 +-
 be/src/exec/operator/group_commit_scan_operator.h  |  2 +-
 be/src/exec/operator/hashjoin_build_sink.cpp       |  2 +-
 be/src/exec/operator/hashjoin_build_sink.h         |  2 +-
 be/src/exec/operator/hive_table_sink_operator.h    |  2 +-
 .../exec/operator/iceberg_delete_sink_operator.h   |  2 +-
 be/src/exec/operator/iceberg_merge_sink_operator.h |  2 +-
 be/src/exec/operator/iceberg_table_sink_operator.h |  2 +-
 be/src/exec/operator/jdbc_table_sink_operator.cpp  |  2 +-
 be/src/exec/operator/jdbc_table_sink_operator.h    |  2 +-
 .../operator/local_merge_sort_source_operator.cpp  |  2 +-
 .../operator/local_merge_sort_source_operator.h    |  2 +-
 .../exec/operator/maxcompute_table_sink_operator.h |  2 +-
 .../exec/operator/memory_scratch_sink_operator.cpp |  2 +-
 .../exec/operator/memory_scratch_sink_operator.h   |  2 +-
 be/src/exec/operator/mock_operator.h               |  2 +-
 be/src/exec/operator/mock_scan_operator.h          |  2 +-
 .../exec/operator/multi_cast_data_stream_sink.cpp  |  2 +-
 be/src/exec/operator/multi_cast_data_stream_sink.h |  2 +-
 .../operator/multi_cast_data_stream_source.cpp     |  4 ++--
 .../exec/operator/multi_cast_data_stream_source.h  |  2 +-
 .../operator/nested_loop_join_build_operator.cpp   |  3 ++-
 .../operator/nested_loop_join_build_operator.h     |  2 +-
 be/src/exec/operator/olap_table_sink_operator.h    |  2 +-
 be/src/exec/operator/olap_table_sink_v2_operator.h |  2 +-
 be/src/exec/operator/operator.cpp                  |  6 ++++--
 be/src/exec/operator/operator.h                    | 23 ++++++++++++++++------
 .../exec/operator/partition_sort_sink_operator.cpp |  2 +-
 .../exec/operator/partition_sort_sink_operator.h   |  2 +-
 .../operator/partition_sort_source_operator.cpp    |  4 ++--
 .../exec/operator/partition_sort_source_operator.h |  2 +-
 .../partitioned_aggregation_sink_operator.cpp      |  3 ++-
 .../partitioned_aggregation_sink_operator.h        |  2 +-
 .../partitioned_aggregation_source_operator.cpp    |  2 +-
 .../partitioned_aggregation_source_operator.h      |  2 +-
 .../partitioned_hash_join_probe_operator.cpp       |  3 ++-
 .../partitioned_hash_join_probe_operator.h         |  2 +-
 .../partitioned_hash_join_sink_operator.cpp        |  2 +-
 .../operator/partitioned_hash_join_sink_operator.h |  2 +-
 .../exec/operator/rec_cte_anchor_sink_operator.h   |  2 +-
 be/src/exec/operator/rec_cte_scan_operator.h       |  2 +-
 be/src/exec/operator/rec_cte_sink_operator.h       |  2 +-
 be/src/exec/operator/rec_cte_source_operator.h     |  2 +-
 be/src/exec/operator/result_file_sink_operator.cpp |  2 +-
 be/src/exec/operator/result_file_sink_operator.h   |  2 +-
 be/src/exec/operator/result_sink_operator.cpp      |  2 +-
 be/src/exec/operator/result_sink_operator.h        |  2 +-
 be/src/exec/operator/scan_operator.cpp             |  2 +-
 be/src/exec/operator/scan_operator.h               |  4 ++--
 be/src/exec/operator/schema_scan_operator.cpp      |  2 +-
 be/src/exec/operator/schema_scan_operator.h        |  2 +-
 be/src/exec/operator/set_probe_sink_operator.cpp   |  3 ++-
 be/src/exec/operator/set_probe_sink_operator.h     |  2 +-
 be/src/exec/operator/set_sink_operator.cpp         |  2 +-
 be/src/exec/operator/set_sink_operator.h           |  2 +-
 be/src/exec/operator/set_source_operator.cpp       |  3 ++-
 be/src/exec/operator/set_source_operator.h         |  2 +-
 be/src/exec/operator/sort_sink_operator.cpp        |  2 +-
 be/src/exec/operator/sort_sink_operator.h          |  2 +-
 be/src/exec/operator/sort_source_operator.cpp      |  2 +-
 be/src/exec/operator/sort_source_operator.h        |  2 +-
 .../operator/spill_iceberg_table_sink_operator.cpp |  2 +-
 .../operator/spill_iceberg_table_sink_operator.h   |  2 +-
 be/src/exec/operator/spill_sort_sink_operator.cpp  |  2 +-
 be/src/exec/operator/spill_sort_sink_operator.h    |  2 +-
 .../exec/operator/spill_sort_source_operator.cpp   |  2 +-
 be/src/exec/operator/spill_sort_source_operator.h  |  2 +-
 be/src/exec/operator/tvf_table_sink_operator.h     |  2 +-
 be/src/exec/operator/union_sink_operator.cpp       |  2 +-
 be/src/exec/operator/union_sink_operator.h         |  2 +-
 be/src/exec/operator/union_source_operator.cpp     |  2 +-
 be/src/exec/operator/union_source_operator.h       |  2 +-
 be/src/exec/pipeline/pipeline_task.cpp             |  3 +--
 be/test/exec/operator/agg_operator_test.cpp        |  2 +-
 .../exec/operator/analytic_sink_operator_test.cpp  |  4 +++-
 .../operator/partition_sort_sink_operator_test.cpp |  4 +++-
 .../operator/partitioned_aggregation_test_helper.h |  4 +++-
 .../operator/partitioned_hash_join_test_helper.h   |  4 +++-
 .../exec/operator/query_cache_operator_test.cpp    |  4 +++-
 be/test/exec/operator/sort_operator_test.cpp       |  4 +++-
 be/test/exec/operator/spill_sort_test_helper.h     |  2 +-
 .../exec/operator/streaming_agg_operator_test.cpp  |  4 +++-
 .../exec/operator/table_function_operator_test.cpp |  4 +++-
 be/test/testutil/mock/mock_operators.h             |  4 ++--
 be/test/util/profile_spec_test.cpp                 |  4 ++--
 119 files changed, 165 insertions(+), 131 deletions(-)

diff --git a/be/src/exec/exchange/local_exchange_sink_operator.cpp 
b/be/src/exec/exchange/local_exchange_sink_operator.cpp
index 2f9443208e7..0a2aeccd811 100644
--- a/be/src/exec/exchange/local_exchange_sink_operator.cpp
+++ b/be/src/exec/exchange/local_exchange_sink_operator.cpp
@@ -142,7 +142,7 @@ std::string LocalExchangeSinkLocalState::debug_string(int 
indentation_level) con
     return fmt::to_string(debug_string_buffer);
 }
 
-Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, Block* in_block, 
bool eos) {
+Status LocalExchangeSinkOperatorX::sink_impl(RuntimeState* state, Block* 
in_block, bool eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
diff --git a/be/src/exec/exchange/local_exchange_sink_operator.h 
b/be/src/exec/exchange/local_exchange_sink_operator.h
index f1ca1935457..ddaeb906b26 100644
--- a/be/src/exec/exchange/local_exchange_sink_operator.h
+++ b/be/src/exec/exchange/local_exchange_sink_operator.h
@@ -103,7 +103,7 @@ public:
 
     Status prepare(RuntimeState* state) override;
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
 
     void set_low_memory_mode(RuntimeState* state) override {
         auto& local_state = get_local_state(state);
diff --git a/be/src/exec/exchange/local_exchange_source_operator.cpp 
b/be/src/exec/exchange/local_exchange_source_operator.cpp
index ad092656f21..ace8573d704 100644
--- a/be/src/exec/exchange/local_exchange_source_operator.cpp
+++ b/be/src/exec/exchange/local_exchange_source_operator.cpp
@@ -89,7 +89,7 @@ std::string LocalExchangeSourceLocalState::debug_string(int 
indentation_level) c
     return fmt::to_string(debug_string_buffer);
 }
 
-Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, Block* 
block, bool* eos) {
+Status LocalExchangeSourceOperatorX::get_block_impl(RuntimeState* state, 
Block* block, bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     RETURN_IF_ERROR(local_state._exchanger->get_block(
diff --git a/be/src/exec/exchange/local_exchange_source_operator.h 
b/be/src/exec/exchange/local_exchange_source_operator.h
index 3fdf90b50f0..c315680901f 100644
--- a/be/src/exec/exchange/local_exchange_source_operator.h
+++ b/be/src/exec/exchange/local_exchange_source_operator.h
@@ -78,7 +78,7 @@ public:
     RowDescriptor& row_descriptor() override { return 
_child->row_descriptor(); }
     const RowDescriptor& row_desc() const override { return 
_child->row_desc(); }
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
 
     bool is_source() const override { return true; }
 
diff --git a/be/src/exec/operator/aggregation_sink_operator.cpp 
b/be/src/exec/operator/aggregation_sink_operator.cpp
index 8e40a53d3d4..4adaecec6a9 100644
--- a/be/src/exec/operator/aggregation_sink_operator.cpp
+++ b/be/src/exec/operator/aggregation_sink_operator.cpp
@@ -988,7 +988,7 @@ Status AggSinkOperatorX::_check_agg_fn_output() {
     return Status::OK();
 }
 
-Status AggSinkOperatorX::sink(doris::RuntimeState* state, Block* in_block, 
bool eos) {
+Status AggSinkOperatorX::sink_impl(doris::RuntimeState* state, Block* 
in_block, bool eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/aggregation_sink_operator.h 
b/be/src/exec/operator/aggregation_sink_operator.h
index 8d3273dde2a..4d103d54119 100644
--- a/be/src/exec/operator/aggregation_sink_operator.h
+++ b/be/src/exec/operator/aggregation_sink_operator.h
@@ -154,7 +154,7 @@ public:
 
     Status prepare(RuntimeState* state) override;
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
 
     DataDistribution required_data_distribution(RuntimeState* state) const 
override {
         if (_partition_exprs.empty()) {
diff --git a/be/src/exec/operator/aggregation_source_operator.cpp 
b/be/src/exec/operator/aggregation_source_operator.cpp
index 0699d6a6002..945fb24abe8 100644
--- a/be/src/exec/operator/aggregation_source_operator.cpp
+++ b/be/src/exec/operator/aggregation_source_operator.cpp
@@ -562,7 +562,7 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, 
const TPlanNode& tnode,
           _needs_finalize(tnode.agg_node.need_finalize),
           _without_key(tnode.agg_node.grouping_exprs.empty()) {}
 
-Status AggSourceOperatorX::get_block(RuntimeState* state, Block* block, bool* 
eos) {
+Status AggSourceOperatorX::get_block_impl(RuntimeState* state, Block* block, 
bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
diff --git a/be/src/exec/operator/aggregation_source_operator.h 
b/be/src/exec/operator/aggregation_source_operator.h
index c9348826ca4..090f493ae28 100644
--- a/be/src/exec/operator/aggregation_source_operator.h
+++ b/be/src/exec/operator/aggregation_source_operator.h
@@ -102,7 +102,7 @@ public:
     AggSourceOperatorX() = default;
 #endif
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
 
     bool is_source() const override { return true; }
 
diff --git a/be/src/exec/operator/analytic_sink_operator.cpp 
b/be/src/exec/operator/analytic_sink_operator.cpp
index f4fb437d31c..cd15757a6eb 100644
--- a/be/src/exec/operator/analytic_sink_operator.cpp
+++ b/be/src/exec/operator/analytic_sink_operator.cpp
@@ -744,7 +744,7 @@ Status AnalyticSinkOperatorX::prepare(RuntimeState* state) {
     return Status::OK();
 }
 
-Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, Block* 
input_block, bool eos) {
+Status AnalyticSinkOperatorX::sink_impl(doris::RuntimeState* state, Block* 
input_block, bool eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)input_block->rows());
diff --git a/be/src/exec/operator/analytic_sink_operator.h 
b/be/src/exec/operator/analytic_sink_operator.h
index 9a64ba1d0b7..62d8bcde692 100644
--- a/be/src/exec/operator/analytic_sink_operator.h
+++ b/be/src/exec/operator/analytic_sink_operator.h
@@ -209,7 +209,7 @@ public:
 
     Status prepare(RuntimeState* state) override;
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
         if (_partition_by_eq_expr_ctxs.empty()) {
             return {ExchangeType::PASSTHROUGH};
diff --git a/be/src/exec/operator/analytic_source_operator.cpp 
b/be/src/exec/operator/analytic_source_operator.cpp
index 3d25b20c7a4..d3bb0de3975 100644
--- a/be/src/exec/operator/analytic_source_operator.cpp
+++ b/be/src/exec/operator/analytic_source_operator.cpp
@@ -42,7 +42,8 @@ AnalyticSourceOperatorX::AnalyticSourceOperatorX(ObjectPool* 
pool, const TPlanNo
                                                  int operator_id, const 
DescriptorTbl& descs)
         : OperatorX<AnalyticLocalState>(pool, tnode, operator_id, descs) {}
 
-Status AnalyticSourceOperatorX::get_block(RuntimeState* state, Block* 
output_block, bool* eos) {
+Status AnalyticSourceOperatorX::get_block_impl(RuntimeState* state, Block* 
output_block,
+                                               bool* eos) {
     RETURN_IF_CANCELLED(state);
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/exec/operator/analytic_source_operator.h 
b/be/src/exec/operator/analytic_source_operator.h
index 696ec74cb69..72273ebd98a 100644
--- a/be/src/exec/operator/analytic_source_operator.h
+++ b/be/src/exec/operator/analytic_source_operator.h
@@ -46,7 +46,7 @@ public:
 #ifdef BE_TEST
     AnalyticSourceOperatorX() = default;
 #endif
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
 
     bool is_source() const override { return true; }
 
diff --git a/be/src/exec/operator/blackhole_sink_operator.cpp 
b/be/src/exec/operator/blackhole_sink_operator.cpp
index 0745c3285cc..e8daabec852 100644
--- a/be/src/exec/operator/blackhole_sink_operator.cpp
+++ b/be/src/exec/operator/blackhole_sink_operator.cpp
@@ -44,7 +44,7 @@ Status BlackholeSinkOperatorX::init(const TDataSink& tsink) {
     return Status::OK();
 }
 
-Status BlackholeSinkOperatorX::sink(RuntimeState* state, Block* block, bool 
eos) {
+Status BlackholeSinkOperatorX::sink_impl(RuntimeState* state, Block* block, 
bool eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
diff --git a/be/src/exec/operator/blackhole_sink_operator.h 
b/be/src/exec/operator/blackhole_sink_operator.h
index 23a2e995306..8bb32e3d4f7 100644
--- a/be/src/exec/operator/blackhole_sink_operator.h
+++ b/be/src/exec/operator/blackhole_sink_operator.h
@@ -68,7 +68,7 @@ public:
 
     Status init(const TDataSink& tsink) override;
 
-    Status sink(RuntimeState* state, Block* block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* block, bool eos) override;
 
     Status close(RuntimeState* state) override;
 
diff --git a/be/src/exec/operator/bucketed_aggregation_sink_operator.cpp 
b/be/src/exec/operator/bucketed_aggregation_sink_operator.cpp
index 8cb58b2d532..aee32541c5a 100644
--- a/be/src/exec/operator/bucketed_aggregation_sink_operator.cpp
+++ b/be/src/exec/operator/bucketed_aggregation_sink_operator.cpp
@@ -472,7 +472,7 @@ Status BucketedAggSinkOperatorX::prepare(RuntimeState* 
state) {
     return Status::OK();
 }
 
-Status BucketedAggSinkOperatorX::sink(RuntimeState* state, Block* in_block, 
bool eos) {
+Status BucketedAggSinkOperatorX::sink_impl(RuntimeState* state, Block* 
in_block, bool eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/bucketed_aggregation_sink_operator.h 
b/be/src/exec/operator/bucketed_aggregation_sink_operator.h
index 34b8103155e..4f2abe0a96a 100644
--- a/be/src/exec/operator/bucketed_aggregation_sink_operator.h
+++ b/be/src/exec/operator/bucketed_aggregation_sink_operator.h
@@ -110,7 +110,7 @@ public:
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status prepare(RuntimeState* state) override;
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
 
     // No local exchange needed — each instance builds its own hash tables 
independently.
     DataDistribution required_data_distribution(RuntimeState* state) const 
override {
diff --git a/be/src/exec/operator/bucketed_aggregation_source_operator.cpp 
b/be/src/exec/operator/bucketed_aggregation_source_operator.cpp
index e1bd7108955..ac571248b5d 100644
--- a/be/src/exec/operator/bucketed_aggregation_source_operator.cpp
+++ b/be/src/exec/operator/bucketed_aggregation_source_operator.cpp
@@ -730,7 +730,7 @@ 
BucketedAggSourceOperatorX::BucketedAggSourceOperatorX(ObjectPool* pool, const T
         : Base(pool, tnode, operator_id, descs),
           _needs_finalize(tnode.bucketed_agg_node.need_finalize) {}
 
-Status BucketedAggSourceOperatorX::get_block(RuntimeState* state, Block* 
block, bool* eos) {
+Status BucketedAggSourceOperatorX::get_block_impl(RuntimeState* state, Block* 
block, bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
 
diff --git a/be/src/exec/operator/bucketed_aggregation_source_operator.h 
b/be/src/exec/operator/bucketed_aggregation_source_operator.h
index 1a90c464ce7..454b6f067a2 100644
--- a/be/src/exec/operator/bucketed_aggregation_source_operator.h
+++ b/be/src/exec/operator/bucketed_aggregation_source_operator.h
@@ -108,7 +108,7 @@ public:
                                const DescriptorTbl& descs);
     ~BucketedAggSourceOperatorX() override = default;
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
 
     bool is_source() const override { return true; }
 
diff --git a/be/src/exec/operator/cache_sink_operator.cpp 
b/be/src/exec/operator/cache_sink_operator.cpp
index 88c1580f8d2..97c42b99c25 100644
--- a/be/src/exec/operator/cache_sink_operator.cpp
+++ b/be/src/exec/operator/cache_sink_operator.cpp
@@ -49,7 +49,7 @@ CacheSinkOperatorX::CacheSinkOperatorX(int sink_id, int 
child_id, int dest_id)
     _name = "CACHE_SINK_OPERATOR";
 }
 
-Status CacheSinkOperatorX::sink(RuntimeState* state, Block* in_block, bool 
eos) {
+Status CacheSinkOperatorX::sink_impl(RuntimeState* state, Block* in_block, 
bool eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/cache_sink_operator.h 
b/be/src/exec/operator/cache_sink_operator.h
index a644a89416c..6204b20eb95 100644
--- a/be/src/exec/operator/cache_sink_operator.h
+++ b/be/src/exec/operator/cache_sink_operator.h
@@ -57,7 +57,7 @@ public:
                                      
DataSinkOperatorX<CacheSinkLocalState>::_name);
     }
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
 
     std::shared_ptr<BasicSharedState> create_shared_state() const override {
         std::shared_ptr<BasicSharedState> ss = 
std::make_shared<DataQueueSharedState>();
diff --git a/be/src/exec/operator/cache_source_operator.cpp 
b/be/src/exec/operator/cache_source_operator.cpp
index 6f2dc9e084e..ec7d947680a 100644
--- a/be/src/exec/operator/cache_source_operator.cpp
+++ b/be/src/exec/operator/cache_source_operator.cpp
@@ -118,7 +118,7 @@ std::string CacheSourceLocalState::debug_string(int 
indentation_level) const {
     return fmt::to_string(debug_string_buffer);
 }
 
-Status CacheSourceOperatorX::get_block(RuntimeState* state, Block* block, 
bool* eos) {
+Status CacheSourceOperatorX::get_block_impl(RuntimeState* state, Block* block, 
bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
 
diff --git a/be/src/exec/operator/cache_source_operator.h 
b/be/src/exec/operator/cache_source_operator.h
index fed5e3bb4f2..3a68bd337fc 100644
--- a/be/src/exec/operator/cache_source_operator.h
+++ b/be/src/exec/operator/cache_source_operator.h
@@ -78,7 +78,7 @@ public:
 #endif
 
     ~CacheSourceOperatorX() override = default;
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
 
     bool is_source() const override { return true; }
 
diff --git a/be/src/exec/operator/datagen_operator.cpp 
b/be/src/exec/operator/datagen_operator.cpp
index a51b4fc978d..a9e0003c779 100644
--- a/be/src/exec/operator/datagen_operator.cpp
+++ b/be/src/exec/operator/datagen_operator.cpp
@@ -60,7 +60,7 @@ Status DataGenSourceOperatorX::prepare(RuntimeState* state) {
     return Status::OK();
 }
 
-Status DataGenSourceOperatorX::get_block(RuntimeState* state, Block* block, 
bool* eos) {
+Status DataGenSourceOperatorX::get_block_impl(RuntimeState* state, Block* 
block, bool* eos) {
     if (state == nullptr || block == nullptr) {
         return Status::InternalError("input is NULL pointer");
     }
diff --git a/be/src/exec/operator/datagen_operator.h 
b/be/src/exec/operator/datagen_operator.h
index 376f5960483..7b2a5d4f69c 100644
--- a/be/src/exec/operator/datagen_operator.h
+++ b/be/src/exec/operator/datagen_operator.h
@@ -58,7 +58,7 @@ public:
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status prepare(RuntimeState* state) override;
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
 
     [[nodiscard]] bool is_source() const override { return true; }
 
diff --git a/be/src/exec/operator/dict_sink_operator.cpp 
b/be/src/exec/operator/dict_sink_operator.cpp
index bb0d1d9d146..8f8a5a13685 100644
--- a/be/src/exec/operator/dict_sink_operator.cpp
+++ b/be/src/exec/operator/dict_sink_operator.cpp
@@ -158,7 +158,7 @@ Status DictSinkOperatorX::prepare(RuntimeState* state) {
     return Status::OK();
 }
 
-Status DictSinkOperatorX::sink(RuntimeState* state, Block* in_block, bool eos) 
{
+Status DictSinkOperatorX::sink_impl(RuntimeState* state, Block* in_block, bool 
eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/dict_sink_operator.h 
b/be/src/exec/operator/dict_sink_operator.h
index 2ae82b67a89..23747a99601 100644
--- a/be/src/exec/operator/dict_sink_operator.h
+++ b/be/src/exec/operator/dict_sink_operator.h
@@ -49,7 +49,7 @@ public:
                       const std::vector<TExpr>& dict_input_expr, const 
TDictionarySink& dict_sink);
     Status prepare(RuntimeState* state) override;
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
 
 private:
     friend class DictSinkLocalState;
diff --git a/be/src/exec/operator/empty_set_operator.cpp 
b/be/src/exec/operator/empty_set_operator.cpp
index 58c69bf91bc..72c91997f4c 100644
--- a/be/src/exec/operator/empty_set_operator.cpp
+++ b/be/src/exec/operator/empty_set_operator.cpp
@@ -23,7 +23,7 @@
 
 namespace doris {
 
-Status EmptySetSourceOperatorX::get_block(RuntimeState* state, Block* block, 
bool* eos) {
+Status EmptySetSourceOperatorX::get_block_impl(RuntimeState* state, Block* 
block, bool* eos) {
     *eos = true;
     return Status::OK();
 }
diff --git a/be/src/exec/operator/empty_set_operator.h 
b/be/src/exec/operator/empty_set_operator.h
index 62dd855ea21..0f16236373c 100644
--- a/be/src/exec/operator/empty_set_operator.h
+++ b/be/src/exec/operator/empty_set_operator.h
@@ -42,7 +42,7 @@ public:
     EmptySetSourceOperatorX() = default;
 #endif
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
 
     [[nodiscard]] bool is_source() const override { return true; }
 };
diff --git a/be/src/exec/operator/exchange_sink_operator.cpp 
b/be/src/exec/operator/exchange_sink_operator.cpp
index e65dd979ad2..2011a44d2ca 100644
--- a/be/src/exec/operator/exchange_sink_operator.cpp
+++ b/be/src/exec/operator/exchange_sink_operator.cpp
@@ -392,7 +392,7 @@ Status 
ExchangeSinkOperatorX::_handle_eof_channel(RuntimeState* state, ChannelPt
     return channel->close(state);
 }
 
-Status ExchangeSinkOperatorX::sink(RuntimeState* state, Block* block, bool 
eos) {
+Status ExchangeSinkOperatorX::sink_impl(RuntimeState* state, Block* block, 
bool eos) {
     auto& local_state = get_local_state(state);
     COUNTER_UPDATE(local_state.rows_input_counter(),
                    (int64_t)block->rows()); // for auto-partition, may decease 
when do_partitioning
diff --git a/be/src/exec/operator/exchange_sink_operator.h 
b/be/src/exec/operator/exchange_sink_operator.h
index ea224ed99bd..10351154d1d 100644
--- a/be/src/exec/operator/exchange_sink_operator.h
+++ b/be/src/exec/operator/exchange_sink_operator.h
@@ -198,7 +198,7 @@ public:
     // TaskExecutionContext.
     Status prepare(RuntimeState* state) override;
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
 
     bool is_serial_operator() const override { return true; }
     void set_low_memory_mode(RuntimeState* state) override {
diff --git a/be/src/exec/operator/exchange_source_operator.cpp 
b/be/src/exec/operator/exchange_source_operator.cpp
index e008d599078..1e4d24de8e8 100644
--- a/be/src/exec/operator/exchange_source_operator.cpp
+++ b/be/src/exec/operator/exchange_source_operator.cpp
@@ -146,7 +146,7 @@ Status ExchangeSourceOperatorX::prepare(RuntimeState* 
state) {
     return Status::OK();
 }
 
-Status ExchangeSourceOperatorX::get_block(RuntimeState* state, Block* block, 
bool* eos) {
+Status ExchangeSourceOperatorX::get_block_impl(RuntimeState* state, Block* 
block, bool* eos) {
     auto& local_state = get_local_state(state);
     Defer is_eos([&]() {
         if (*eos) {
diff --git a/be/src/exec/operator/exchange_source_operator.h 
b/be/src/exec/operator/exchange_source_operator.h
index da00e088586..0c4aa1e43eb 100644
--- a/be/src/exec/operator/exchange_source_operator.h
+++ b/be/src/exec/operator/exchange_source_operator.h
@@ -100,7 +100,7 @@ public:
 
     Status reset(RuntimeState* state) override;
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
 
     std::string debug_string(int indentation_level = 0) const override;
 
diff --git a/be/src/exec/operator/group_commit_block_sink_operator.cpp 
b/be/src/exec/operator/group_commit_block_sink_operator.cpp
index a72755720d5..38ea6d3c18a 100644
--- a/be/src/exec/operator/group_commit_block_sink_operator.cpp
+++ b/be/src/exec/operator/group_commit_block_sink_operator.cpp
@@ -280,7 +280,7 @@ Status GroupCommitBlockSinkOperatorX::prepare(RuntimeState* 
state) {
     return VExpr::open(_output_vexpr_ctxs, state);
 }
 
-Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, Block* 
input_block, bool eos) {
+Status GroupCommitBlockSinkOperatorX::sink_impl(RuntimeState* state, Block* 
input_block, bool eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)input_block->rows());
diff --git a/be/src/exec/operator/group_commit_block_sink_operator.h 
b/be/src/exec/operator/group_commit_block_sink_operator.h
index 644100d42b0..854f2f3cc1d 100644
--- a/be/src/exec/operator/group_commit_block_sink_operator.h
+++ b/be/src/exec/operator/group_commit_block_sink_operator.h
@@ -102,7 +102,7 @@ public:
 
     Status prepare(RuntimeState* state) override;
 
-    Status sink(RuntimeState* state, Block* block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* block, bool eos) override;
 
 private:
     friend class GroupCommitBlockSinkLocalState;
diff --git a/be/src/exec/operator/group_commit_scan_operator.cpp 
b/be/src/exec/operator/group_commit_scan_operator.cpp
index e591ed22edd..209481d86bf 100644
--- a/be/src/exec/operator/group_commit_scan_operator.cpp
+++ b/be/src/exec/operator/group_commit_scan_operator.cpp
@@ -28,7 +28,7 @@ GroupCommitOperatorX::GroupCommitOperatorX(ObjectPool* pool, 
const TPlanNode& tn
     _output_tuple_id = tnode.file_scan_node.tuple_id;
 }
 
-Status GroupCommitOperatorX::get_block(RuntimeState* state, Block* block, 
bool* eos) {
+Status GroupCommitOperatorX::get_block_impl(RuntimeState* state, Block* block, 
bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     bool find_node = false;
diff --git a/be/src/exec/operator/group_commit_scan_operator.h 
b/be/src/exec/operator/group_commit_scan_operator.h
index 7ed01c16931..679b46b1125 100644
--- a/be/src/exec/operator/group_commit_scan_operator.h
+++ b/be/src/exec/operator/group_commit_scan_operator.h
@@ -54,7 +54,7 @@ public:
     GroupCommitOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
                          const DescriptorTbl& descs, int parallel_tasks);
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
 
 protected:
     friend class GroupCommitLocalState;
diff --git a/be/src/exec/operator/hashjoin_build_sink.cpp 
b/be/src/exec/operator/hashjoin_build_sink.cpp
index 7d295588710..14772cf3662 100644
--- a/be/src/exec/operator/hashjoin_build_sink.cpp
+++ b/be/src/exec/operator/hashjoin_build_sink.cpp
@@ -814,7 +814,7 @@ Status HashJoinBuildSinkOperatorX::prepare(RuntimeState* 
state) {
     return VExpr::open(_build_expr_ctxs, state);
 }
 
-Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, Block* in_block, 
bool eos) {
+Status HashJoinBuildSinkOperatorX::sink_impl(RuntimeState* state, Block* 
in_block, bool eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/hashjoin_build_sink.h 
b/be/src/exec/operator/hashjoin_build_sink.h
index 3c3faabcdb5..dda24cef91b 100644
--- a/be/src/exec/operator/hashjoin_build_sink.h
+++ b/be/src/exec/operator/hashjoin_build_sink.h
@@ -118,7 +118,7 @@ public:
 
     Status prepare(RuntimeState* state) override;
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
 
     size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
 
diff --git a/be/src/exec/operator/hive_table_sink_operator.h 
b/be/src/exec/operator/hive_table_sink_operator.h
index a18af89f5df..51161809e68 100644
--- a/be/src/exec/operator/hive_table_sink_operator.h
+++ b/be/src/exec/operator/hive_table_sink_operator.h
@@ -65,7 +65,7 @@ public:
         return VExpr::open(_output_vexpr_ctxs, state);
     }
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override {
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
         auto& local_state = get_local_state(state);
         SCOPED_TIMER(local_state.exec_time_counter());
         COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/iceberg_delete_sink_operator.h 
b/be/src/exec/operator/iceberg_delete_sink_operator.h
index f1d292a8b59..12f93bfc8b6 100644
--- a/be/src/exec/operator/iceberg_delete_sink_operator.h
+++ b/be/src/exec/operator/iceberg_delete_sink_operator.h
@@ -64,7 +64,7 @@ public:
         return VExpr::open(_output_vexpr_ctxs, state);
     }
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override {
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
         auto& local_state = get_local_state(state);
         SCOPED_TIMER(local_state.exec_time_counter());
         COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/iceberg_merge_sink_operator.h 
b/be/src/exec/operator/iceberg_merge_sink_operator.h
index 6de1fbe59eb..0cf64681c1d 100644
--- a/be/src/exec/operator/iceberg_merge_sink_operator.h
+++ b/be/src/exec/operator/iceberg_merge_sink_operator.h
@@ -63,7 +63,7 @@ public:
         return VExpr::open(_output_vexpr_ctxs, state);
     }
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override {
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
         auto& local_state = get_local_state(state);
         SCOPED_TIMER(local_state.exec_time_counter());
         COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/iceberg_table_sink_operator.h 
b/be/src/exec/operator/iceberg_table_sink_operator.h
index 2a9b59d7e36..0dec306edeb 100644
--- a/be/src/exec/operator/iceberg_table_sink_operator.h
+++ b/be/src/exec/operator/iceberg_table_sink_operator.h
@@ -64,7 +64,7 @@ public:
         return VExpr::open(_output_vexpr_ctxs, state);
     }
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override {
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
         auto& local_state = get_local_state(state);
         SCOPED_TIMER(local_state.exec_time_counter());
         COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/jdbc_table_sink_operator.cpp 
b/be/src/exec/operator/jdbc_table_sink_operator.cpp
index ac5354df988..9ee8828714d 100644
--- a/be/src/exec/operator/jdbc_table_sink_operator.cpp
+++ b/be/src/exec/operator/jdbc_table_sink_operator.cpp
@@ -46,7 +46,7 @@ Status JdbcTableSinkOperatorX::prepare(RuntimeState* state) {
     return Status::OK();
 }
 
-Status JdbcTableSinkOperatorX::sink(RuntimeState* state, Block* block, bool 
eos) {
+Status JdbcTableSinkOperatorX::sink_impl(RuntimeState* state, Block* block, 
bool eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
diff --git a/be/src/exec/operator/jdbc_table_sink_operator.h 
b/be/src/exec/operator/jdbc_table_sink_operator.h
index 58a8e52cc7f..a557ec80a8d 100644
--- a/be/src/exec/operator/jdbc_table_sink_operator.h
+++ b/be/src/exec/operator/jdbc_table_sink_operator.h
@@ -45,7 +45,7 @@ public:
     Status init(const TDataSink& thrift_sink) override;
     Status prepare(RuntimeState* state) override;
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
 
 private:
     friend class JdbcTableSinkLocalState;
diff --git a/be/src/exec/operator/local_merge_sort_source_operator.cpp 
b/be/src/exec/operator/local_merge_sort_source_operator.cpp
index dacd97357d3..dc3bf09d7f8 100644
--- a/be/src/exec/operator/local_merge_sort_source_operator.cpp
+++ b/be/src/exec/operator/local_merge_sort_source_operator.cpp
@@ -116,7 +116,7 @@ void 
LocalMergeSortSourceOperatorX::init_dependencies_and_sorter() {
     _sorters.resize(_parallel_tasks);
 }
 
-Status LocalMergeSortSourceOperatorX::get_block(RuntimeState* state, Block* 
block, bool* eos) {
+Status LocalMergeSortSourceOperatorX::get_block_impl(RuntimeState* state, 
Block* block, bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
diff --git a/be/src/exec/operator/local_merge_sort_source_operator.h 
b/be/src/exec/operator/local_merge_sort_source_operator.h
index 6219edc4224..b22193c3686 100644
--- a/be/src/exec/operator/local_merge_sort_source_operator.h
+++ b/be/src/exec/operator/local_merge_sort_source_operator.h
@@ -83,7 +83,7 @@ public:
     LocalMergeSortSourceOperatorX() : _merge_by_exchange(false), _offset(0) {}
 #endif
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status prepare(RuntimeState* state) override;
diff --git a/be/src/exec/operator/maxcompute_table_sink_operator.h 
b/be/src/exec/operator/maxcompute_table_sink_operator.h
index e1ec3b20ab1..3332143080b 100644
--- a/be/src/exec/operator/maxcompute_table_sink_operator.h
+++ b/be/src/exec/operator/maxcompute_table_sink_operator.h
@@ -62,7 +62,7 @@ public:
         return VExpr::open(_output_vexpr_ctxs, state);
     }
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override {
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
         auto& local_state = get_local_state(state);
         SCOPED_TIMER(local_state.exec_time_counter());
         COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/memory_scratch_sink_operator.cpp 
b/be/src/exec/operator/memory_scratch_sink_operator.cpp
index ff9e4d4f508..5635a65168f 100644
--- a/be/src/exec/operator/memory_scratch_sink_operator.cpp
+++ b/be/src/exec/operator/memory_scratch_sink_operator.cpp
@@ -86,7 +86,7 @@ Status MemoryScratchSinkOperatorX::prepare(RuntimeState* 
state) {
     return Status::OK();
 }
 
-Status MemoryScratchSinkOperatorX::sink(RuntimeState* state, Block* 
input_block, bool eos) {
+Status MemoryScratchSinkOperatorX::sink_impl(RuntimeState* state, Block* 
input_block, bool eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     if (nullptr == input_block || 0 == input_block->rows()) {
diff --git a/be/src/exec/operator/memory_scratch_sink_operator.h 
b/be/src/exec/operator/memory_scratch_sink_operator.h
index 77fadc0c8da..f12e118e75e 100644
--- a/be/src/exec/operator/memory_scratch_sink_operator.h
+++ b/be/src/exec/operator/memory_scratch_sink_operator.h
@@ -57,7 +57,7 @@ public:
     Status init(const TDataSink& thrift_sink) override;
     Status prepare(RuntimeState* state) override;
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
 
 private:
     friend class MemoryScratchSinkLocalState;
diff --git a/be/src/exec/operator/mock_operator.h 
b/be/src/exec/operator/mock_operator.h
index a83b5de5448..3c8d133def4 100644
--- a/be/src/exec/operator/mock_operator.h
+++ b/be/src/exec/operator/mock_operator.h
@@ -42,7 +42,7 @@ public:
     ENABLE_FACTORY_CREATOR(MockOperatorX);
     MockOperatorX() = default;
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override {
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override {
         if (_outout_blocks.empty()) {
             *eos = true;
             return Status::OK();
diff --git a/be/src/exec/operator/mock_scan_operator.h 
b/be/src/exec/operator/mock_scan_operator.h
index 3250a4219b3..2217dc6f87b 100644
--- a/be/src/exec/operator/mock_scan_operator.h
+++ b/be/src/exec/operator/mock_scan_operator.h
@@ -88,7 +88,7 @@ public:
         _output_blocks.push_back(std::move(block));
     }
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override {
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override {
         if (_output_blocks.empty()) {
             *eos = true;
             return Status::OK();
diff --git a/be/src/exec/operator/multi_cast_data_stream_sink.cpp 
b/be/src/exec/operator/multi_cast_data_stream_sink.cpp
index 2d2ddcce458..7a13a05eaeb 100644
--- a/be/src/exec/operator/multi_cast_data_stream_sink.cpp
+++ b/be/src/exec/operator/multi_cast_data_stream_sink.cpp
@@ -66,7 +66,7 @@ std::string 
MultiCastDataStreamSinkLocalState::debug_string(int indentation_leve
     return fmt::to_string(debug_string_buffer);
 }
 
-Status MultiCastDataStreamSinkOperatorX::sink(RuntimeState* state, Block* 
in_block, bool eos) {
+Status MultiCastDataStreamSinkOperatorX::sink_impl(RuntimeState* state, Block* 
in_block, bool eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     if (in_block->rows() > 0 || eos) {
diff --git a/be/src/exec/operator/multi_cast_data_stream_sink.h 
b/be/src/exec/operator/multi_cast_data_stream_sink.h
index d1fcc878924..0b0dc7ca537 100644
--- a/be/src/exec/operator/multi_cast_data_stream_sink.h
+++ b/be/src/exec/operator/multi_cast_data_stream_sink.h
@@ -56,7 +56,7 @@ public:
               _num_dests(sources.size()) {}
     ~MultiCastDataStreamSinkOperatorX() override = default;
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
 
     std::shared_ptr<BasicSharedState> create_shared_state() const override;
 
diff --git a/be/src/exec/operator/multi_cast_data_stream_source.cpp 
b/be/src/exec/operator/multi_cast_data_stream_source.cpp
index 8a669d3fbff..2b9e663ce07 100644
--- a/be/src/exec/operator/multi_cast_data_stream_source.cpp
+++ b/be/src/exec/operator/multi_cast_data_stream_source.cpp
@@ -79,8 +79,8 @@ Status 
MultiCastDataStreamSourceLocalState::close(RuntimeState* state) {
     return Base::close(state);
 }
 
-Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state, 
Block* block,
-                                                       bool* eos) {
+Status MultiCastDataStreamerSourceOperatorX::get_block_impl(RuntimeState* 
state, Block* block,
+                                                            bool* eos) {
     //auto& local_state = get_local_state(state);
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/exec/operator/multi_cast_data_stream_source.h 
b/be/src/exec/operator/multi_cast_data_stream_source.h
index f7ed78e376c..31488df78da 100644
--- a/be/src/exec/operator/multi_cast_data_stream_source.h
+++ b/be/src/exec/operator/multi_cast_data_stream_source.h
@@ -104,7 +104,7 @@ public:
         return Status::OK();
     }
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
 
     bool is_source() const override { return true; }
 
diff --git a/be/src/exec/operator/nested_loop_join_build_operator.cpp 
b/be/src/exec/operator/nested_loop_join_build_operator.cpp
index b55b22d1a58..857ac9318f2 100644
--- a/be/src/exec/operator/nested_loop_join_build_operator.cpp
+++ b/be/src/exec/operator/nested_loop_join_build_operator.cpp
@@ -93,7 +93,8 @@ Status 
NestedLoopJoinBuildSinkOperatorX::prepare(RuntimeState* state) {
     return VExpr::open(_filter_src_expr_ctxs, state);
 }
 
-Status NestedLoopJoinBuildSinkOperatorX::sink(doris::RuntimeState* state, 
Block* block, bool eos) {
+Status NestedLoopJoinBuildSinkOperatorX::sink_impl(doris::RuntimeState* state, 
Block* block,
+                                                   bool eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
diff --git a/be/src/exec/operator/nested_loop_join_build_operator.h 
b/be/src/exec/operator/nested_loop_join_build_operator.h
index 265a8767b3e..5984aeedf05 100644
--- a/be/src/exec/operator/nested_loop_join_build_operator.h
+++ b/be/src/exec/operator/nested_loop_join_build_operator.h
@@ -64,7 +64,7 @@ public:
 
     Status prepare(RuntimeState* state) override;
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
 
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
         if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
diff --git a/be/src/exec/operator/olap_table_sink_operator.h 
b/be/src/exec/operator/olap_table_sink_operator.h
index e8261daf46b..9567b82e082 100644
--- a/be/src/exec/operator/olap_table_sink_operator.h
+++ b/be/src/exec/operator/olap_table_sink_operator.h
@@ -57,7 +57,7 @@ public:
         return VExpr::open(_output_vexpr_ctxs, state);
     }
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override {
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
         auto& local_state = get_local_state(state);
         SCOPED_TIMER(local_state.exec_time_counter());
         COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/olap_table_sink_v2_operator.h 
b/be/src/exec/operator/olap_table_sink_v2_operator.h
index 358cb4c10e2..038484c83ee 100644
--- a/be/src/exec/operator/olap_table_sink_v2_operator.h
+++ b/be/src/exec/operator/olap_table_sink_v2_operator.h
@@ -58,7 +58,7 @@ public:
         return VExpr::open(_output_vexpr_ctxs, state);
     }
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override {
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
         auto& local_state = get_local_state(state);
         SCOPED_TIMER(local_state.exec_time_counter());
         COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/operator.cpp 
b/be/src/exec/operator/operator.cpp
index 1ce7dc8727d..b833ca83ff3 100644
--- a/be/src/exec/operator/operator.cpp
+++ b/be/src/exec/operator/operator.cpp
@@ -729,13 +729,15 @@ Status 
PipelineXSinkLocalState<SharedState>::close(RuntimeState* state, Status e
 }
 
 template <typename LocalStateType>
-Status StreamingOperatorX<LocalStateType>::get_block(RuntimeState* state, 
Block* block, bool* eos) {
+Status StreamingOperatorX<LocalStateType>::get_block_impl(RuntimeState* state, 
Block* block,
+                                                          bool* eos) {
     
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child->get_block_after_projects(state,
 block, eos));
     return pull(state, block, eos);
 }
 
 template <typename LocalStateType>
-Status StatefulOperatorX<LocalStateType>::get_block(RuntimeState* state, 
Block* block, bool* eos) {
+Status StatefulOperatorX<LocalStateType>::get_block_impl(RuntimeState* state, 
Block* block,
+                                                         bool* eos) {
     auto& local_state = get_local_state(state);
     if (need_more_input_data(state)) {
         local_state._child_block->clear_column_data(
diff --git a/be/src/exec/operator/operator.h b/be/src/exec/operator/operator.h
index 565a650a0da..a09638f6d71 100644
--- a/be/src/exec/operator/operator.h
+++ b/be/src/exec/operator/operator.h
@@ -620,7 +620,12 @@ public:
         return result.value()->is_finished();
     }
 
-    [[nodiscard]] virtual Status sink(RuntimeState* state, Block* block, bool 
eos) = 0;
+    [[nodiscard]] Status sink(RuntimeState* state, Block* block, bool eos) {
+        RETURN_IF_ERROR(block->check_type_and_column());
+        return sink_impl(state, block, eos);
+    }
+
+    [[nodiscard]] virtual Status sink_impl(RuntimeState* state, Block* block, 
bool eos) = 0;
 
     [[nodiscard]] virtual Status setup_local_state(RuntimeState* state,
                                                    LocalSinkStateInfo& info) = 
0;
@@ -877,7 +882,13 @@ public:
     Status prepare(RuntimeState* state) override;
 
     Status terminate(RuntimeState* state) override;
-    [[nodiscard]] virtual Status get_block(RuntimeState* state, Block* block, 
bool* eos) = 0;
+    [[nodiscard]] Status get_block(RuntimeState* state, Block* block, bool* 
eos) {
+        RETURN_IF_ERROR(get_block_impl(state, block, eos));
+        RETURN_IF_ERROR(block->check_type_and_column());
+        return Status::OK();
+    }
+
+    [[nodiscard]] virtual Status get_block_impl(RuntimeState* state, Block* 
block, bool* eos) = 0;
 
     Status close(RuntimeState* state) override;
 
@@ -1070,7 +1081,7 @@ public:
 
     virtual ~StreamingOperatorX() = default;
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
 
     virtual Status pull(RuntimeState* state, Block* block, bool* eos) = 0;
 };
@@ -1096,7 +1107,7 @@ public:
 
     using OperatorX<LocalStateType>::get_local_state;
 
-    [[nodiscard]] Status get_block(RuntimeState* state, Block* block, bool* 
eos) override;
+    [[nodiscard]] Status get_block_impl(RuntimeState* state, Block* block, 
bool* eos) override;
 
     [[nodiscard]] virtual Status pull(RuntimeState* state, Block* block, bool* 
eos) const = 0;
     [[nodiscard]] virtual Status push(RuntimeState* state, Block* input_block, 
bool eos) const = 0;
@@ -1170,7 +1181,7 @@ public:
 
     [[nodiscard]] bool is_source() const override { return true; }
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override {
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override {
         *eos = _eos;
         return Status::OK();
     }
@@ -1225,7 +1236,7 @@ class DummySinkOperatorX final : public 
DataSinkOperatorX<DummySinkLocalState> {
 public:
     DummySinkOperatorX(int op_id, int node_id, int dest_id)
             : DataSinkOperatorX<DummySinkLocalState>(op_id, node_id, dest_id) 
{}
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override {
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
         return _return_eof ? Status::Error<ErrorCode::END_OF_FILE>("source 
have closed")
                            : Status::OK();
     }
diff --git a/be/src/exec/operator/partition_sort_sink_operator.cpp 
b/be/src/exec/operator/partition_sort_sink_operator.cpp
index 76695b8ee32..0f358fb70ad 100644
--- a/be/src/exec/operator/partition_sort_sink_operator.cpp
+++ b/be/src/exec/operator/partition_sort_sink_operator.cpp
@@ -111,7 +111,7 @@ Status PartitionSortSinkOperatorX::prepare(RuntimeState* 
state) {
     return Status::OK();
 }
 
-Status PartitionSortSinkOperatorX::sink(RuntimeState* state, Block* 
input_block, bool eos) {
+Status PartitionSortSinkOperatorX::sink_impl(RuntimeState* state, Block* 
input_block, bool eos) {
     auto& local_state = get_local_state(state);
     auto current_rows = input_block->rows();
     SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/exec/operator/partition_sort_sink_operator.h 
b/be/src/exec/operator/partition_sort_sink_operator.h
index 48b709d3f1f..68a698dff8f 100644
--- a/be/src/exec/operator/partition_sort_sink_operator.h
+++ b/be/src/exec/operator/partition_sort_sink_operator.h
@@ -92,7 +92,7 @@ public:
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
 
     Status prepare(RuntimeState* state) override;
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
         if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) {
             return DataDistribution(ExchangeType::HASH_SHUFFLE, 
_distribute_exprs);
diff --git a/be/src/exec/operator/partition_sort_source_operator.cpp 
b/be/src/exec/operator/partition_sort_source_operator.cpp
index 3db89fb4cd2..7e07cd9d307 100644
--- a/be/src/exec/operator/partition_sort_source_operator.cpp
+++ b/be/src/exec/operator/partition_sort_source_operator.cpp
@@ -32,8 +32,8 @@ Status PartitionSortSourceLocalState::init(RuntimeState* 
state, LocalStateInfo&
     return Status::OK();
 }
 
-Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, Block* 
output_block,
-                                               bool* eos) {
+Status PartitionSortSourceOperatorX::get_block_impl(RuntimeState* state, 
Block* output_block,
+                                                    bool* eos) {
     RETURN_IF_CANCELLED(state);
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/exec/operator/partition_sort_source_operator.h 
b/be/src/exec/operator/partition_sort_source_operator.h
index 82bf052f1c0..def4cab8ea0 100644
--- a/be/src/exec/operator/partition_sort_source_operator.h
+++ b/be/src/exec/operator/partition_sort_source_operator.h
@@ -52,7 +52,7 @@ public:
 #ifdef BE_TEST
     PartitionSortSourceOperatorX() = default;
 #endif
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
 
     bool is_source() const override { return true; }
 
diff --git a/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp 
b/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
index 42739b9b2ac..5563eafdc50 100644
--- a/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
@@ -149,7 +149,8 @@ Status PartitionedAggSinkOperatorX::prepare(RuntimeState* 
state) {
     return _agg_sink_operator->prepare(state);
 }
 
-Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, Block* 
in_block, bool eos) {
+Status PartitionedAggSinkOperatorX::sink_impl(doris::RuntimeState* state, 
Block* in_block,
+                                              bool eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/partitioned_aggregation_sink_operator.h 
b/be/src/exec/operator/partitioned_aggregation_sink_operator.h
index 56834757bd3..b7916f755c4 100644
--- a/be/src/exec/operator/partitioned_aggregation_sink_operator.h
+++ b/be/src/exec/operator/partitioned_aggregation_sink_operator.h
@@ -114,7 +114,7 @@ public:
 
     Status prepare(RuntimeState* state) override;
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
 
     void update_operator(const TPlanNode& tnode, bool 
followed_by_shuffled_operator,
                          bool require_bucket_distribution) override {
diff --git a/be/src/exec/operator/partitioned_aggregation_source_operator.cpp 
b/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
index 4d69c49bb7a..c23d3c83dda 100644
--- a/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
+++ b/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
@@ -226,7 +226,7 @@ Status 
PartitionedAggSourceOperatorX::revoke_memory(RuntimeState* state) {
     return Status::OK();
 }
 
-Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, Block* 
block, bool* eos) {
+Status PartitionedAggSourceOperatorX::get_block_impl(RuntimeState* state, 
Block* block, bool* eos) {
     auto& local_state = get_local_state(state);
     Status status;
 
diff --git a/be/src/exec/operator/partitioned_aggregation_source_operator.h 
b/be/src/exec/operator/partitioned_aggregation_source_operator.h
index 3e631d7c10c..540b60064bb 100644
--- a/be/src/exec/operator/partitioned_aggregation_source_operator.h
+++ b/be/src/exec/operator/partitioned_aggregation_source_operator.h
@@ -126,7 +126,7 @@ public:
 
     Status close(RuntimeState* state) override;
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
 
     bool is_source() const override { return true; }
 
diff --git a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp 
b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
index 4810aca89fc..410556911f0 100644
--- a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
@@ -1013,7 +1013,8 @@ Status 
PartitionedHashJoinProbeOperatorX::revoke_memory(RuntimeState* state) {
     return local_state.revoke_build_data(state);
 }
 
-Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, 
Block* block, bool* eos) {
+Status PartitionedHashJoinProbeOperatorX::get_block_impl(RuntimeState* state, 
Block* block,
+                                                         bool* eos) {
     *eos = false;
     auto& local_state = get_local_state(state);
     const bool is_spilled = local_state._shared_state->_is_spilled;
diff --git a/be/src/exec/operator/partitioned_hash_join_probe_operator.h 
b/be/src/exec/operator/partitioned_hash_join_probe_operator.h
index 15767504d85..01787014c8d 100644
--- a/be/src/exec/operator/partitioned_hash_join_probe_operator.h
+++ b/be/src/exec/operator/partitioned_hash_join_probe_operator.h
@@ -222,7 +222,7 @@ public:
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status prepare(RuntimeState* state) override;
 
-    [[nodiscard]] Status get_block(RuntimeState* state, Block* block, bool* 
eos) override;
+    [[nodiscard]] Status get_block_impl(RuntimeState* state, Block* block, 
bool* eos) override;
 
     Status push(RuntimeState* state, Block* input_block, bool eos) const 
override;
     Status pull(doris::RuntimeState* state, Block* output_block, bool* eos) 
const override;
diff --git a/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp 
b/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp
index 4a0d4d0f42e..11c137ba8c2 100644
--- a/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp
@@ -513,7 +513,7 @@ void 
PartitionedHashJoinSinkLocalState::update_profile_from_inner() {
 
 #undef UPDATE_COUNTER_FROM_INNER
 
-Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, Block* 
in_block, bool eos) {
+Status PartitionedHashJoinSinkOperatorX::sink_impl(RuntimeState* state, Block* 
in_block, bool eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     const auto rows = in_block->rows();
diff --git a/be/src/exec/operator/partitioned_hash_join_sink_operator.h 
b/be/src/exec/operator/partitioned_hash_join_sink_operator.h
index c6dc2cdfb94..f48fe8f7137 100644
--- a/be/src/exec/operator/partitioned_hash_join_sink_operator.h
+++ b/be/src/exec/operator/partitioned_hash_join_sink_operator.h
@@ -115,7 +115,7 @@ public:
 
     Status prepare(RuntimeState* state) override;
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
 
     bool should_dry_run(RuntimeState* state) override { return false; }
 
diff --git a/be/src/exec/operator/rec_cte_anchor_sink_operator.h 
b/be/src/exec/operator/rec_cte_anchor_sink_operator.h
index 42661ee4181..4e8596497ad 100644
--- a/be/src/exec/operator/rec_cte_anchor_sink_operator.h
+++ b/be/src/exec/operator/rec_cte_anchor_sink_operator.h
@@ -79,7 +79,7 @@ public:
         return Base::close(state);
     }
 
-    Status sink(RuntimeState* state, Block* input_block, bool eos) override {
+    Status sink_impl(RuntimeState* state, Block* input_block, bool eos) 
override {
         auto& local_state = get_local_state(state);
 
         RETURN_IF_ERROR(_notify_rec_side_ready_if_needed(state));
diff --git a/be/src/exec/operator/rec_cte_scan_operator.h 
b/be/src/exec/operator/rec_cte_scan_operator.h
index adea08c4188..c374fe509ca 100644
--- a/be/src/exec/operator/rec_cte_scan_operator.h
+++ b/be/src/exec/operator/rec_cte_scan_operator.h
@@ -68,7 +68,7 @@ public:
                         const DescriptorTbl& descs)
             : OperatorX<RecCTEScanLocalState>(pool, tnode, operator_id, descs) 
{}
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override {
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override {
         auto& local_state = get_local_state(state);
 
         if (local_state._blocks.empty()) {
diff --git a/be/src/exec/operator/rec_cte_sink_operator.h 
b/be/src/exec/operator/rec_cte_sink_operator.h
index a071cf55761..a351effb34a 100644
--- a/be/src/exec/operator/rec_cte_sink_operator.h
+++ b/be/src/exec/operator/rec_cte_sink_operator.h
@@ -80,7 +80,7 @@ public:
         return {ExchangeType::NOOP};
     }
 
-    Status sink(RuntimeState* state, Block* input_block, bool eos) override {
+    Status sink_impl(RuntimeState* state, Block* input_block, bool eos) 
override {
         auto& local_state = get_local_state(state);
 
         COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)input_block->rows());
diff --git a/be/src/exec/operator/rec_cte_source_operator.h 
b/be/src/exec/operator/rec_cte_source_operator.h
index 83e55937998..07f1c2c39a6 100644
--- a/be/src/exec/operator/rec_cte_source_operator.h
+++ b/be/src/exec/operator/rec_cte_source_operator.h
@@ -209,7 +209,7 @@ public:
         return {ExchangeType::NOOP};
     }
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override {
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override {
         auto& local_state = get_local_state(state);
         auto& ctx = local_state._shared_state;
         ctx->update_ready_to_return();
diff --git a/be/src/exec/operator/result_file_sink_operator.cpp 
b/be/src/exec/operator/result_file_sink_operator.cpp
index edc12412fce..7207a0dc503 100644
--- a/be/src/exec/operator/result_file_sink_operator.cpp
+++ b/be/src/exec/operator/result_file_sink_operator.cpp
@@ -150,7 +150,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, 
Status exec_status)
     return Base::close(state, exec_status);
 }
 
-Status ResultFileSinkOperatorX::sink(RuntimeState* state, Block* in_block, 
bool eos) {
+Status ResultFileSinkOperatorX::sink_impl(RuntimeState* state, Block* 
in_block, bool eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/result_file_sink_operator.h 
b/be/src/exec/operator/result_file_sink_operator.h
index 8c759456b8e..6151918df9b 100644
--- a/be/src/exec/operator/result_file_sink_operator.h
+++ b/be/src/exec/operator/result_file_sink_operator.h
@@ -61,7 +61,7 @@ public:
 
     Status prepare(RuntimeState* state) override;
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
 
 private:
     friend class ResultFileSinkLocalState;
diff --git a/be/src/exec/operator/result_sink_operator.cpp 
b/be/src/exec/operator/result_sink_operator.cpp
index 2384a2e5f60..021ffb60983 100644
--- a/be/src/exec/operator/result_sink_operator.cpp
+++ b/be/src/exec/operator/result_sink_operator.cpp
@@ -132,7 +132,7 @@ Status ResultSinkOperatorX::prepare(RuntimeState* state) {
     return VExpr::open(_output_vexpr_ctxs, state);
 }
 
-Status ResultSinkOperatorX::sink(RuntimeState* state, Block* block, bool eos) {
+Status ResultSinkOperatorX::sink_impl(RuntimeState* state, Block* block, bool 
eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
diff --git a/be/src/exec/operator/result_sink_operator.h 
b/be/src/exec/operator/result_sink_operator.h
index b3a7c504a5d..4ead2985d85 100644
--- a/be/src/exec/operator/result_sink_operator.h
+++ b/be/src/exec/operator/result_sink_operator.h
@@ -159,7 +159,7 @@ public:
                         const std::vector<TExpr>& select_exprs, const 
TResultSink& sink);
     Status prepare(RuntimeState* state) override;
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
 
 private:
     friend class ResultSinkLocalState;
diff --git a/be/src/exec/operator/scan_operator.cpp 
b/be/src/exec/operator/scan_operator.cpp
index 7904413f77b..8498512b16b 100644
--- a/be/src/exec/operator/scan_operator.cpp
+++ b/be/src/exec/operator/scan_operator.cpp
@@ -1348,7 +1348,7 @@ Status ScanLocalState<Derived>::close(RuntimeState* 
state) {
 }
 
 template <typename LocalStateType>
-Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, Block* 
block, bool* eos) {
+Status ScanOperatorX<LocalStateType>::get_block_impl(RuntimeState* state, 
Block* block, bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
 
diff --git a/be/src/exec/operator/scan_operator.h 
b/be/src/exec/operator/scan_operator.h
index f3d65df35ad..bebcc7ec708 100644
--- a/be/src/exec/operator/scan_operator.h
+++ b/be/src/exec/operator/scan_operator.h
@@ -360,9 +360,9 @@ class ScanOperatorX : public OperatorX<LocalStateType> {
 public:
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status prepare(RuntimeState* state) override;
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
     Status get_block_after_projects(RuntimeState* state, Block* block, bool* 
eos) override {
-        Status status = get_block(state, block, eos);
+        Status status = OperatorX<LocalStateType>::get_block(state, block, 
eos);
         if (status.ok()) {
             
state->get_local_state(operator_id())->update_output_block_counters(*block);
         }
diff --git a/be/src/exec/operator/schema_scan_operator.cpp 
b/be/src/exec/operator/schema_scan_operator.cpp
index 3d5922573b9..f50d6a003bb 100644
--- a/be/src/exec/operator/schema_scan_operator.cpp
+++ b/be/src/exec/operator/schema_scan_operator.cpp
@@ -208,7 +208,7 @@ Status SchemaScanOperatorX::prepare(RuntimeState* state) {
     return Status::OK();
 }
 
-Status SchemaScanOperatorX::get_block(RuntimeState* state, Block* block, bool* 
eos) {
+Status SchemaScanOperatorX::get_block_impl(RuntimeState* state, Block* block, 
bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     RETURN_IF_CANCELLED(state);
diff --git a/be/src/exec/operator/schema_scan_operator.h 
b/be/src/exec/operator/schema_scan_operator.h
index 49729cabf33..fc7c79067d9 100644
--- a/be/src/exec/operator/schema_scan_operator.h
+++ b/be/src/exec/operator/schema_scan_operator.h
@@ -63,7 +63,7 @@ public:
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status prepare(RuntimeState* state) override;
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
 
     [[nodiscard]] bool is_source() const override { return true; }
 
diff --git a/be/src/exec/operator/set_probe_sink_operator.cpp 
b/be/src/exec/operator/set_probe_sink_operator.cpp
index 6954c93ee65..c3fb08a3725 100644
--- a/be/src/exec/operator/set_probe_sink_operator.cpp
+++ b/be/src/exec/operator/set_probe_sink_operator.cpp
@@ -61,7 +61,8 @@ Status 
SetProbeSinkOperatorX<is_intersect>::prepare(RuntimeState* state) {
 }
 
 template <bool is_intersect>
-Status SetProbeSinkOperatorX<is_intersect>::sink(RuntimeState* state, Block* 
in_block, bool eos) {
+Status SetProbeSinkOperatorX<is_intersect>::sink_impl(RuntimeState* state, 
Block* in_block,
+                                                      bool eos) {
     RETURN_IF_CANCELLED(state);
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/exec/operator/set_probe_sink_operator.h 
b/be/src/exec/operator/set_probe_sink_operator.h
index c736e2af229..7c294fe7254 100644
--- a/be/src/exec/operator/set_probe_sink_operator.h
+++ b/be/src/exec/operator/set_probe_sink_operator.h
@@ -101,7 +101,7 @@ public:
 
     Status prepare(RuntimeState* state) override;
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
         return _is_colocate ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
                             : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
diff --git a/be/src/exec/operator/set_sink_operator.cpp 
b/be/src/exec/operator/set_sink_operator.cpp
index ec2c717c2cf..614a45afbc8 100644
--- a/be/src/exec/operator/set_sink_operator.cpp
+++ b/be/src/exec/operator/set_sink_operator.cpp
@@ -67,7 +67,7 @@ Status SetSinkLocalState<is_intersect>::close(RuntimeState* 
state, Status exec_s
 }
 
 template <bool is_intersect>
-Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, Block* 
in_block, bool eos) {
+Status SetSinkOperatorX<is_intersect>::sink_impl(RuntimeState* state, Block* 
in_block, bool eos) {
     RETURN_IF_CANCELLED(state);
     auto& local_state = get_local_state(state);
 
diff --git a/be/src/exec/operator/set_sink_operator.h 
b/be/src/exec/operator/set_sink_operator.h
index e1c937f1471..5ac71634cd8 100644
--- a/be/src/exec/operator/set_sink_operator.h
+++ b/be/src/exec/operator/set_sink_operator.h
@@ -112,7 +112,7 @@ public:
 
     Status prepare(RuntimeState* state) override;
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
         return _is_colocate ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
                             : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
diff --git a/be/src/exec/operator/set_source_operator.cpp 
b/be/src/exec/operator/set_source_operator.cpp
index 5cc299e7dbb..63afdae0814 100644
--- a/be/src/exec/operator/set_source_operator.cpp
+++ b/be/src/exec/operator/set_source_operator.cpp
@@ -74,7 +74,8 @@ Status SetSourceLocalState<is_intersect>::open(RuntimeState* 
state) {
 }
 
 template <bool is_intersect>
-Status SetSourceOperatorX<is_intersect>::get_block(RuntimeState* state, Block* 
block, bool* eos) {
+Status SetSourceOperatorX<is_intersect>::get_block_impl(RuntimeState* state, 
Block* block,
+                                                        bool* eos) {
     RETURN_IF_CANCELLED(state);
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/exec/operator/set_source_operator.h 
b/be/src/exec/operator/set_source_operator.h
index 40887f35fab..cacfd6a335f 100644
--- a/be/src/exec/operator/set_source_operator.h
+++ b/be/src/exec/operator/set_source_operator.h
@@ -85,7 +85,7 @@ public:
                             : DataDistribution(ExchangeType::HASH_SHUFFLE);
     }
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
     Status set_child(OperatorPtr child) override {
         Base::_child = child;
         return Status::OK();
diff --git a/be/src/exec/operator/sort_sink_operator.cpp 
b/be/src/exec/operator/sort_sink_operator.cpp
index 9b045da6d8c..9c77b52bd02 100644
--- a/be/src/exec/operator/sort_sink_operator.cpp
+++ b/be/src/exec/operator/sort_sink_operator.cpp
@@ -131,7 +131,7 @@ Status SortSinkOperatorX::prepare(RuntimeState* state) {
     return VExpr::open(_ordering_expr_ctxs, state);
 }
 
-Status SortSinkOperatorX::sink(doris::RuntimeState* state, Block* in_block, 
bool eos) {
+Status SortSinkOperatorX::sink_impl(doris::RuntimeState* state, Block* 
in_block, bool eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/sort_sink_operator.h 
b/be/src/exec/operator/sort_sink_operator.h
index eda5bb31b53..b0c66483b9b 100644
--- a/be/src/exec/operator/sort_sink_operator.h
+++ b/be/src/exec/operator/sort_sink_operator.h
@@ -76,7 +76,7 @@ public:
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
 
     Status prepare(RuntimeState* state) override;
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
         if (_is_analytic_sort) {
             return _is_colocate && _require_bucket_distribution
diff --git a/be/src/exec/operator/sort_source_operator.cpp 
b/be/src/exec/operator/sort_source_operator.cpp
index 681411d46ef..88ab9261b4a 100644
--- a/be/src/exec/operator/sort_source_operator.cpp
+++ b/be/src/exec/operator/sort_source_operator.cpp
@@ -30,7 +30,7 @@ SortSourceOperatorX::SortSourceOperatorX(ObjectPool* pool, 
const TPlanNode& tnod
                                          const DescriptorTbl& descs)
         : OperatorX<SortLocalState>(pool, tnode, operator_id, descs) {}
 
-Status SortSourceOperatorX::get_block(RuntimeState* state, Block* block, bool* 
eos) {
+Status SortSourceOperatorX::get_block_impl(RuntimeState* state, Block* block, 
bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
diff --git a/be/src/exec/operator/sort_source_operator.h 
b/be/src/exec/operator/sort_source_operator.h
index c2a63b82ccd..79e59f635e0 100644
--- a/be/src/exec/operator/sort_source_operator.h
+++ b/be/src/exec/operator/sort_source_operator.h
@@ -43,7 +43,7 @@ public:
 #ifdef BE_TEST
     SortSourceOperatorX() = default;
 #endif
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
 
     bool is_source() const override { return true; }
 
diff --git a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp 
b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
index 67040e2762d..5bc2bab0d14 100644
--- a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
+++ b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
@@ -118,7 +118,7 @@ Status 
SpillIcebergTableSinkOperatorX::prepare(RuntimeState* state) {
     return VExpr::open(_output_vexpr_ctxs, state);
 }
 
-Status SpillIcebergTableSinkOperatorX::sink(RuntimeState* state, Block* 
in_block, bool eos) {
+Status SpillIcebergTableSinkOperatorX::sink_impl(RuntimeState* state, Block* 
in_block, bool eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/spill_iceberg_table_sink_operator.h 
b/be/src/exec/operator/spill_iceberg_table_sink_operator.h
index b754e998896..6da926ae20f 100644
--- a/be/src/exec/operator/spill_iceberg_table_sink_operator.h
+++ b/be/src/exec/operator/spill_iceberg_table_sink_operator.h
@@ -63,7 +63,7 @@ public:
 
     Status prepare(RuntimeState* state) override;
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
 
     size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
 
diff --git a/be/src/exec/operator/spill_sort_sink_operator.cpp 
b/be/src/exec/operator/spill_sort_sink_operator.cpp
index f88774d3190..82a1bea731c 100644
--- a/be/src/exec/operator/spill_sort_sink_operator.cpp
+++ b/be/src/exec/operator/spill_sort_sink_operator.cpp
@@ -143,7 +143,7 @@ size_t 
SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const {
     return mem_size > state->spill_min_revocable_mem() ? mem_size : 0;
 }
 
-Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, Block* 
in_block, bool eos) {
+Status SpillSortSinkOperatorX::sink_impl(doris::RuntimeState* state, Block* 
in_block, bool eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/spill_sort_sink_operator.h 
b/be/src/exec/operator/spill_sort_sink_operator.h
index 19cfa1b9274..cf27bd3765b 100644
--- a/be/src/exec/operator/spill_sort_sink_operator.h
+++ b/be/src/exec/operator/spill_sort_sink_operator.h
@@ -79,7 +79,7 @@ public:
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
 
     Status prepare(RuntimeState* state) override;
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
     DataDistribution required_data_distribution(RuntimeState* state) const 
override {
         return _sort_sink_operator->required_data_distribution(state);
     }
diff --git a/be/src/exec/operator/spill_sort_source_operator.cpp 
b/be/src/exec/operator/spill_sort_source_operator.cpp
index a745bf2858d..c40ab3e255a 100644
--- a/be/src/exec/operator/spill_sort_source_operator.cpp
+++ b/be/src/exec/operator/spill_sort_source_operator.cpp
@@ -244,7 +244,7 @@ Status SpillSortSourceOperatorX::close(RuntimeState* state) 
{
     return _sort_source_operator->close(state);
 }
 
-Status SpillSortSourceOperatorX::get_block(RuntimeState* state, Block* block, 
bool* eos) {
+Status SpillSortSourceOperatorX::get_block_impl(RuntimeState* state, Block* 
block, bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
 
diff --git a/be/src/exec/operator/spill_sort_source_operator.h 
b/be/src/exec/operator/spill_sort_source_operator.h
index 196ab6474b2..dbc8c3120ea 100644
--- a/be/src/exec/operator/spill_sort_source_operator.h
+++ b/be/src/exec/operator/spill_sort_source_operator.h
@@ -81,7 +81,7 @@ public:
 
     Status close(RuntimeState* state) override;
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
 
     bool is_source() const override { return true; }
 
diff --git a/be/src/exec/operator/tvf_table_sink_operator.h 
b/be/src/exec/operator/tvf_table_sink_operator.h
index 7c20f3c482e..e1a06e675c9 100644
--- a/be/src/exec/operator/tvf_table_sink_operator.h
+++ b/be/src/exec/operator/tvf_table_sink_operator.h
@@ -64,7 +64,7 @@ public:
         return VExpr::open(_output_vexpr_ctxs, state);
     }
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override {
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
         auto& local_state = get_local_state(state);
         SCOPED_TIMER(local_state.exec_time_counter());
         COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/union_sink_operator.cpp 
b/be/src/exec/operator/union_sink_operator.cpp
index dc6fb2e0d9a..b0f7cf619a8 100644
--- a/be/src/exec/operator/union_sink_operator.cpp
+++ b/be/src/exec/operator/union_sink_operator.cpp
@@ -93,7 +93,7 @@ Status UnionSinkOperatorX::prepare(RuntimeState* state) {
     return Status::OK();
 }
 
-Status UnionSinkOperatorX::sink(RuntimeState* state, Block* in_block, bool 
eos) {
+Status UnionSinkOperatorX::sink_impl(RuntimeState* state, Block* in_block, 
bool eos) {
     auto& local_state = get_local_state(state);
     if (local_state.low_memory_mode()) {
         set_low_memory_mode(state);
diff --git a/be/src/exec/operator/union_sink_operator.h 
b/be/src/exec/operator/union_sink_operator.h
index 14978ae4526..4da532b9f0b 100644
--- a/be/src/exec/operator/union_sink_operator.h
+++ b/be/src/exec/operator/union_sink_operator.h
@@ -99,7 +99,7 @@ public:
 
     Status prepare(RuntimeState* state) override;
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
 
     std::shared_ptr<BasicSharedState> create_shared_state() const override {
         if (_cur_child_id > 0) {
diff --git a/be/src/exec/operator/union_source_operator.cpp 
b/be/src/exec/operator/union_source_operator.cpp
index 0efe4ed4efd..6cc39ebb7ce 100644
--- a/be/src/exec/operator/union_source_operator.cpp
+++ b/be/src/exec/operator/union_source_operator.cpp
@@ -100,7 +100,7 @@ std::string UnionSourceLocalState::debug_string(int 
indentation_level) const {
     return fmt::to_string(debug_string_buffer);
 }
 
-Status UnionSourceOperatorX::get_block(RuntimeState* state, Block* block, 
bool* eos) {
+Status UnionSourceOperatorX::get_block_impl(RuntimeState* state, Block* block, 
bool* eos) {
     auto& local_state = get_local_state(state);
     Defer set_eos {[&]() {
         // the eos check of union operator is complex, need check all logical 
if you want modify
diff --git a/be/src/exec/operator/union_source_operator.h 
b/be/src/exec/operator/union_source_operator.h
index 748c9a48e60..01de0cbb05b 100644
--- a/be/src/exec/operator/union_source_operator.h
+++ b/be/src/exec/operator/union_source_operator.h
@@ -68,7 +68,7 @@ public:
 #ifdef BE_TEST
     UnionSourceOperatorX(int child_size) : _child_size(child_size) {}
 #endif
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
 
     bool is_source() const override { return true; }
 
diff --git a/be/src/exec/pipeline/pipeline_task.cpp 
b/be/src/exec/pipeline/pipeline_task.cpp
index fe75d7b499d..dfbb0955145 100644
--- a/be/src/exec/pipeline/pipeline_task.cpp
+++ b/be/src/exec/pipeline/pipeline_task.cpp
@@ -647,7 +647,6 @@ Status PipelineTask::execute(bool* done) {
 
             bool eos = false;
             RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, 
&eos));
-            RETURN_IF_ERROR(block->check_type_and_column());
             _eos = eos;
         }
 
@@ -718,7 +717,7 @@ Status PipelineTask::execute(bool* done) {
                     }
                 }
             });
-            RETURN_IF_ERROR(block->check_type_and_column());
+
             status = _sink->sink(_state, block, _eos);
 
             if (_eos) {
diff --git a/be/test/exec/operator/agg_operator_test.cpp 
b/be/test/exec/operator/agg_operator_test.cpp
index 02b6a79bb72..bac6f0da381 100644
--- a/be/test/exec/operator/agg_operator_test.cpp
+++ b/be/test/exec/operator/agg_operator_test.cpp
@@ -104,7 +104,7 @@ class MockDistributionOperator final : public 
OperatorX<MockLocalState> {
 public:
     MockDistributionOperator(ExchangeType exchange_type) : 
_exchange_type(exchange_type) {}
 
-    Status get_block(RuntimeState* /*state*/, Block* /*block*/, bool* eos) 
override {
+    Status get_block_impl(RuntimeState* /*state*/, Block* /*block*/, bool* 
eos) override {
         *eos = true;
         return Status::OK();
     }
diff --git a/be/test/exec/operator/analytic_sink_operator_test.cpp 
b/be/test/exec/operator/analytic_sink_operator_test.cpp
index a64d16c676f..bd865b9aca9 100644
--- a/be/test/exec/operator/analytic_sink_operator_test.cpp
+++ b/be/test/exec/operator/analytic_sink_operator_test.cpp
@@ -40,7 +40,9 @@ public:
         return Status::OK();
     }
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override { 
return Status::OK(); }
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override {
+        return Status::OK();
+    }
     Status setup_local_state(RuntimeState* state, LocalStateInfo& info) 
override {
         return Status::OK();
     }
diff --git a/be/test/exec/operator/partition_sort_sink_operator_test.cpp 
b/be/test/exec/operator/partition_sort_sink_operator_test.cpp
index 744ca8e8452..36a90bf5a38 100644
--- a/be/test/exec/operator/partition_sort_sink_operator_test.cpp
+++ b/be/test/exec/operator/partition_sort_sink_operator_test.cpp
@@ -37,7 +37,9 @@ public:
         return Status::OK();
     }
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override { 
return Status::OK(); }
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override {
+        return Status::OK();
+    }
     Status setup_local_state(RuntimeState* state, LocalStateInfo& info) 
override {
         return Status::OK();
     }
diff --git a/be/test/exec/operator/partitioned_aggregation_test_helper.h 
b/be/test/exec/operator/partitioned_aggregation_test_helper.h
index 5ecfe8dd297..da0881e84ea 100644
--- a/be/test/exec/operator/partitioned_aggregation_test_helper.h
+++ b/be/test/exec/operator/partitioned_aggregation_test_helper.h
@@ -83,7 +83,9 @@ public:
         return Status::OK();
     }
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override { 
return Status::OK(); }
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
+        return Status::OK();
+    }
 };
 
 class MockPartitionedAggLocalState : public PartitionedAggLocalState {
diff --git a/be/test/exec/operator/partitioned_hash_join_test_helper.h 
b/be/test/exec/operator/partitioned_hash_join_test_helper.h
index 9dcbb7335f5..ba4ff661288 100644
--- a/be/test/exec/operator/partitioned_hash_join_test_helper.h
+++ b/be/test/exec/operator/partitioned_hash_join_test_helper.h
@@ -115,7 +115,9 @@ public:
         return Status::OK();
     }
 
-    Status sink(RuntimeState* state, Block* in_block, bool eos) override { 
return Status::OK(); }
+    Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
+        return Status::OK();
+    }
 
     std::string get_memory_usage_debug_str(RuntimeState* state) const override 
{ return "mock"; }
 };
diff --git a/be/test/exec/operator/query_cache_operator_test.cpp 
b/be/test/exec/operator/query_cache_operator_test.cpp
index a99e9bcb9d9..91c73b99077 100644
--- a/be/test/exec/operator/query_cache_operator_test.cpp
+++ b/be/test/exec/operator/query_cache_operator_test.cpp
@@ -36,7 +36,9 @@ public:
         return Status::OK();
     }
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override { 
return Status::OK(); }
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override {
+        return Status::OK();
+    }
     Status setup_local_state(RuntimeState* state, LocalStateInfo& info) 
override {
         return Status::OK();
     }
diff --git a/be/test/exec/operator/sort_operator_test.cpp 
b/be/test/exec/operator/sort_operator_test.cpp
index 310f3ffb4b6..0a26844b3bc 100644
--- a/be/test/exec/operator/sort_operator_test.cpp
+++ b/be/test/exec/operator/sort_operator_test.cpp
@@ -37,7 +37,9 @@ public:
         return Status::OK();
     }
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override { 
return Status::OK(); }
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override {
+        return Status::OK();
+    }
     Status setup_local_state(RuntimeState* state, LocalStateInfo& info) 
override {
         return Status::OK();
     }
diff --git a/be/test/exec/operator/spill_sort_test_helper.h 
b/be/test/exec/operator/spill_sort_test_helper.h
index c887212b2fd..81ca44ce2bd 100644
--- a/be/test/exec/operator/spill_sort_test_helper.h
+++ b/be/test/exec/operator/spill_sort_test_helper.h
@@ -53,7 +53,7 @@ public:
                             const DescriptorTbl& descs)
             : SortSourceOperatorX(pool, tnode, operator_id, descs) {}
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override {
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override {
         std::swap(*block, this->block);
         *eos = this->eos;
         return Status::OK();
diff --git a/be/test/exec/operator/streaming_agg_operator_test.cpp 
b/be/test/exec/operator/streaming_agg_operator_test.cpp
index bbe54ebec5d..4596f040b03 100644
--- a/be/test/exec/operator/streaming_agg_operator_test.cpp
+++ b/be/test/exec/operator/streaming_agg_operator_test.cpp
@@ -66,7 +66,9 @@ public:
         return Status::OK();
     }
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override { 
return Status::OK(); }
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override {
+        return Status::OK();
+    }
     Status setup_local_state(RuntimeState* state, LocalStateInfo& info) 
override {
         return Status::OK();
     }
diff --git a/be/test/exec/operator/table_function_operator_test.cpp 
b/be/test/exec/operator/table_function_operator_test.cpp
index 913687414be..72e189d0fa8 100644
--- a/be/test/exec/operator/table_function_operator_test.cpp
+++ b/be/test/exec/operator/table_function_operator_test.cpp
@@ -53,7 +53,9 @@ public:
         return Status::OK();
     }
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override { 
return Status::OK(); }
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override {
+        return Status::OK();
+    }
     Status setup_local_state(RuntimeState* state, LocalStateInfo& info) 
override {
         return Status::OK();
     }
diff --git a/be/test/testutil/mock/mock_operators.h 
b/be/test/testutil/mock/mock_operators.h
index bba11eb7473..1077a767018 100644
--- a/be/test/testutil/mock/mock_operators.h
+++ b/be/test/testutil/mock/mock_operators.h
@@ -34,7 +34,7 @@ public:
         return Status::OK();
     }
 
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override {
+    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override {
         block->swap(_block);
         *eos = _eos;
         return Status::OK();
@@ -57,7 +57,7 @@ public:
 
 class MockSinkOperator final : public DataSinkOperatorXBase {
 public:
-    Status sink(RuntimeState* state, Block* block, bool eos) override { return 
Status::OK(); }
+    Status sink_impl(RuntimeState* state, Block* block, bool eos) override { 
return Status::OK(); }
 
     Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) 
override {
         return Status::OK();
diff --git a/be/test/util/profile_spec_test.cpp 
b/be/test/util/profile_spec_test.cpp
index 68a535c9cd0..172427395d0 100644
--- a/be/test/util/profile_spec_test.cpp
+++ b/be/test/util/profile_spec_test.cpp
@@ -97,7 +97,7 @@ private:
         Status prepare(RuntimeState* state) override { return Status::OK(); }
         Status open(RuntimeState* state) { return Status::OK(); }
         Status close(RuntimeState* state) override { return Status::OK(); }
-        Status get_block(RuntimeState* state, Block* block, bool* eos) 
override {
+        Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override {
             return Status::OK();
         }
     };
@@ -113,7 +113,7 @@ private:
 
         Status prepare(RuntimeState* state) override { return Status::OK(); }
         Status close(RuntimeState* state) override { return Status::OK(); }
-        Status get_block(RuntimeState* state, Block* block, bool* eos) 
override {
+        Status get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override {
             *eos = true;
             block->swap(_block);
             return Status::OK();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to