This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 0a31a3c9745b235226751e3f80bd14254f163852
Author: Gabriel <[email protected]>
AuthorDate: Mon Apr 15 20:16:37 2024 +0800

    [refactor](pipelineX) Reduce prepare overhead (PART I) (#33550)
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp | 46 ++++++-------
 .../pipeline/exec/aggregation_source_operator.cpp  |  2 +-
 be/src/pipeline/exec/analytic_sink_operator.cpp    | 14 ++--
 be/src/pipeline/exec/analytic_sink_operator.h      |  1 +
 be/src/pipeline/exec/analytic_source_operator.cpp  | 14 ++--
 be/src/pipeline/exec/analytic_source_operator.h    |  1 +
 .../distinct_streaming_aggregation_operator.cpp    | 23 ++++---
 .../exec/distinct_streaming_aggregation_operator.h |  1 +
 be/src/pipeline/exec/exchange_sink_operator.cpp    | 47 +++++++++-----
 be/src/pipeline/exec/exchange_sink_operator.h      | 12 ++--
 be/src/pipeline/exec/exchange_source_operator.cpp  | 14 ++--
 be/src/pipeline/exec/repeat_operator.cpp           |  4 +-
 be/src/pipeline/exec/repeat_operator.h             |  2 +-
 be/src/pipeline/exec/result_sink_operator.cpp      |  9 +--
 be/src/pipeline/exec/result_sink_operator.h        |  4 +-
 be/src/pipeline/exec/scan_operator.cpp             | 23 +++----
 be/src/pipeline/exec/schema_scan_operator.cpp      |  5 +-
 be/src/pipeline/exec/set_probe_sink_operator.cpp   | 18 ++++--
 be/src/pipeline/exec/set_probe_sink_operator.h     |  1 +
 be/src/pipeline/exec/set_sink_operator.cpp         | 28 +++++---
 be/src/pipeline/exec/set_sink_operator.h           |  1 +
 be/src/pipeline/exec/set_source_operator.cpp       |  4 +-
 be/src/pipeline/exec/sort_sink_operator.cpp        | 14 ++--
 be/src/pipeline/exec/sort_sink_operator.h          |  5 +-
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |  9 ++-
 be/src/pipeline/exec/spill_sort_sink_operator.h    |  1 -
 .../pipeline/exec/spill_sort_source_operator.cpp   |  4 ++
 .../exec/streaming_aggregation_operator.cpp        | 28 +++++---
 .../pipeline/exec/streaming_aggregation_operator.h |  1 +
 be/src/pipeline/exec/table_function_operator.cpp   |  6 +-
 be/src/pipeline/exec/table_function_operator.h     |  2 +-
 be/src/pipeline/exec/union_sink_operator.cpp       | 12 +++-
 be/src/pipeline/exec/union_sink_operator.h         |  1 +
 be/src/pipeline/exec/union_source_operator.cpp     | 17 +++--
 be/src/pipeline/exec/union_source_operator.h       |  1 +
 be/src/pipeline/pipeline_x/operator.cpp            |  2 +
 be/src/pipeline/pipeline_x/operator.h              |  2 +
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     | 15 ++---
 be/src/vec/runtime/vdata_stream_mgr.cpp            | 17 +++--
 be/src/vec/runtime/vdata_stream_mgr.h              |  4 +-
 be/src/vec/sink/vdata_stream_sender.cpp            | 75 ++++++++++++++++++++--
 be/src/vec/sink/vdata_stream_sender.h              |  7 +-
 .../main/java/org/apache/doris/qe/Coordinator.java |  4 +-
 .../java/org/apache/doris/qe/SessionVariable.java  | 11 ----
 44 files changed, 332 insertions(+), 180 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 6e0042da7d2..e29d6de2860 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -63,22 +63,9 @@ AggSinkLocalState::AggSinkLocalState(DataSinkOperatorXBase* 
parent, RuntimeState
 Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(Base::exec_time_counter());
-    SCOPED_TIMER(Base::_open_timer);
+    SCOPED_TIMER(Base::_init_timer);
     _agg_data = Base::_shared_state->agg_data.get();
     _agg_arena_pool = Base::_shared_state->agg_arena_pool.get();
-    auto& p = Base::_parent->template cast<AggSinkOperatorX>();
-    Base::_shared_state->align_aggregate_states = p._align_aggregate_states;
-    Base::_shared_state->total_size_of_aggregate_states = 
p._total_size_of_aggregate_states;
-    Base::_shared_state->offsets_of_aggregate_states = 
p._offsets_of_aggregate_states;
-    Base::_shared_state->make_nullable_keys = p._make_nullable_keys;
-    for (auto& evaluator : p._aggregate_evaluators) {
-        
Base::_shared_state->aggregate_evaluators.push_back(evaluator->clone(state, 
p._pool));
-    }
-    Base::_shared_state->probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
-    for (size_t i = 0; i < Base::_shared_state->probe_expr_ctxs.size(); i++) {
-        RETURN_IF_ERROR(
-                p._probe_expr_ctxs[i]->clone(state, 
Base::_shared_state->probe_expr_ctxs[i]));
-    }
     _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), 
"HashTable",
                                                             TUnit::BYTES, 
"MemoryUsage", 1);
     _serialize_key_arena_memory_usage = 
Base::profile()->AddHighWaterMarkCounter(
@@ -95,12 +82,30 @@ Status AggSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     _hash_table_emplace_timer = ADD_TIMER(Base::profile(), 
"HashTableEmplaceTime");
     _hash_table_input_counter = ADD_COUNTER(Base::profile(), 
"HashTableInputCount", TUnit::UNIT);
     _max_row_size_counter = ADD_COUNTER(Base::profile(), "MaxRowSizeInBytes", 
TUnit::UNIT);
-    COUNTER_SET(_max_row_size_counter, (int64_t)0);
 
+    return Status::OK();
+}
+
+Status AggSinkLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(Base::exec_time_counter());
+    SCOPED_TIMER(Base::_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
+    auto& p = Base::_parent->template cast<AggSinkOperatorX>();
+    Base::_shared_state->align_aggregate_states = p._align_aggregate_states;
+    Base::_shared_state->total_size_of_aggregate_states = 
p._total_size_of_aggregate_states;
+    Base::_shared_state->offsets_of_aggregate_states = 
p._offsets_of_aggregate_states;
+    Base::_shared_state->make_nullable_keys = p._make_nullable_keys;
+    for (auto& evaluator : p._aggregate_evaluators) {
+        
Base::_shared_state->aggregate_evaluators.push_back(evaluator->clone(state, 
p._pool));
+    }
+    Base::_shared_state->probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
+    for (size_t i = 0; i < Base::_shared_state->probe_expr_ctxs.size(); i++) {
+        RETURN_IF_ERROR(
+                p._probe_expr_ctxs[i]->clone(state, 
Base::_shared_state->probe_expr_ctxs[i]));
+    }
     for (auto& evaluator : Base::_shared_state->aggregate_evaluators) {
         evaluator->set_timer(_merge_timer, _expr_timer);
     }
-
     Base::_shared_state->agg_profile_arena = 
std::make_unique<vectorized::Arena>();
 
     if (Base::_shared_state->probe_expr_ctxs.empty()) {
@@ -139,15 +144,6 @@ Status AggSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
                                (!p._have_conjuncts) && // no having conjunct
                                p._needs_finalize;      // agg's finalize step
     }
-
-    return Status::OK();
-}
-
-Status AggSinkLocalState::open(RuntimeState* state) {
-    SCOPED_TIMER(Base::exec_time_counter());
-    SCOPED_TIMER(Base::_open_timer);
-    RETURN_IF_ERROR(Base::open(state));
-    _agg_data = Base::_shared_state->agg_data.get();
     // move _create_agg_status to open not in during prepare,
     // because during prepare and open thread is not the same one,
     // this could cause unable to get JVM
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 9c47f1c8cb3..cff6f9fec42 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -625,7 +625,7 @@ Status AggLocalState::close(RuntimeState* state) {
     }
 
     /// _hash_table_size_counter may be null if prepare failed.
-    if (_hash_table_size_counter) {
+    if (_hash_table_size_counter && _shared_state->ready_to_execute) {
         std::visit(
                 [&](auto&& agg_method) {
                     COUNTER_SET(_hash_table_size_counter, 
int64_t(agg_method.hash_table->size()));
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 26f0b0812f7..a1d3384edc6 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -29,15 +29,21 @@ OPERATOR_CODE_GENERATOR(AnalyticSinkOperator, 
StreamingOperator)
 Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
     RETURN_IF_ERROR(PipelineXSinkLocalState<AnalyticSharedState>::init(state, 
info));
     SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_init_timer);
+    _blocks_memory_usage =
+            _profile->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, 
"MemoryUsage", 1);
+    _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");
+    return Status::OK();
+}
+
+Status AnalyticSinkLocalState::open(RuntimeState* state) {
+    RETURN_IF_ERROR(PipelineXSinkLocalState<AnalyticSharedState>::open(state));
+    SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     auto& p = _parent->cast<AnalyticSinkOperatorX>();
     
_shared_state->partition_by_column_idxs.resize(p._partition_by_eq_expr_ctxs.size());
     
_shared_state->ordey_by_column_idxs.resize(p._order_by_eq_expr_ctxs.size());
 
-    _blocks_memory_usage =
-            _profile->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, 
"MemoryUsage", 1);
-    _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");
-
     size_t agg_size = p._agg_expr_ctxs.size();
     _agg_expr_ctxs.resize(agg_size);
     _shared_state->agg_input_columns.resize(agg_size);
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index e04259a0fc4..3ae4a7b5cff 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -55,6 +55,7 @@ public:
             : PipelineXSinkLocalState<AnalyticSharedState>(parent, state) {}
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    Status open(RuntimeState* state) override;
 
 private:
     friend class AnalyticSinkOperatorX;
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp 
b/be/src/pipeline/exec/analytic_source_operator.cpp
index d8befd152a2..f6658583d46 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -161,16 +161,22 @@ bool AnalyticLocalState::_whether_need_next_partition(
 Status AnalyticLocalState::init(RuntimeState* state, LocalStateInfo& info) {
     RETURN_IF_ERROR(PipelineXLocalState<AnalyticSharedState>::init(state, 
info));
     SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_init_timer);
+    _blocks_memory_usage =
+            profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, 
"MemoryUsage", 1);
+    _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");
+    return Status::OK();
+}
+
+Status AnalyticLocalState::open(RuntimeState* state) {
+    RETURN_IF_ERROR(PipelineXLocalState<AnalyticSharedState>::open(state));
+    SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     _agg_arena_pool = std::make_unique<vectorized::Arena>();
 
     auto& p = _parent->cast<AnalyticSourceOperatorX>();
     _agg_functions_size = p._agg_functions.size();
 
-    _blocks_memory_usage =
-            profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, 
"MemoryUsage", 1);
-    _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");
-
     _agg_functions.resize(p._agg_functions.size());
     for (size_t i = 0; i < _agg_functions.size(); i++) {
         _agg_functions[i] = p._agg_functions[i]->clone(state, 
state->obj_pool());
diff --git a/be/src/pipeline/exec/analytic_source_operator.h 
b/be/src/pipeline/exec/analytic_source_operator.h
index cdfe2644f45..17a4d34ec73 100644
--- a/be/src/pipeline/exec/analytic_source_operator.h
+++ b/be/src/pipeline/exec/analytic_source_operator.h
@@ -53,6 +53,7 @@ public:
     AnalyticLocalState(RuntimeState* state, OperatorXBase* parent);
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
+    Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
 
     void init_result_columns();
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 9151f4a29d5..064b9532878 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -68,8 +68,23 @@ 
DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta
 
 Status DistinctStreamingAggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
+    SCOPED_TIMER(Base::exec_time_counter());
+    SCOPED_TIMER(Base::_init_timer);
+    _build_timer = ADD_TIMER(Base::profile(), "BuildTime");
+    _exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
+    _hash_table_compute_timer = ADD_TIMER(Base::profile(), 
"HashTableComputeTime");
+    _hash_table_emplace_timer = ADD_TIMER(Base::profile(), 
"HashTableEmplaceTime");
+    _hash_table_input_counter = ADD_COUNTER(Base::profile(), 
"HashTableInputCount", TUnit::UNIT);
+    _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", 
TUnit::UNIT);
+    _insert_keys_to_column_timer = ADD_TIMER(profile(), 
"InsertKeysToColumnTime");
+
+    return Status::OK();
+}
+
+Status DistinctStreamingAggLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(Base::exec_time_counter());
     SCOPED_TIMER(Base::_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
     auto& p = Base::_parent->template cast<DistinctStreamingAggOperatorX>();
     for (auto& evaluator : p._aggregate_evaluators) {
         _aggregate_evaluators.push_back(evaluator->clone(state, p._pool));
@@ -79,14 +94,6 @@ Status DistinctStreamingAggLocalState::init(RuntimeState* 
state, LocalStateInfo&
         RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, 
_probe_expr_ctxs[i]));
     }
 
-    _build_timer = ADD_TIMER(Base::profile(), "BuildTime");
-    _exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
-    _hash_table_compute_timer = ADD_TIMER(Base::profile(), 
"HashTableComputeTime");
-    _hash_table_emplace_timer = ADD_TIMER(Base::profile(), 
"HashTableEmplaceTime");
-    _hash_table_input_counter = ADD_COUNTER(Base::profile(), 
"HashTableInputCount", TUnit::UNIT);
-    _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", 
TUnit::UNIT);
-    _insert_keys_to_column_timer = ADD_TIMER(profile(), 
"InsertKeysToColumnTime");
-
     if (_probe_expr_ctxs.empty()) {
         _agg_data->without_key = 
reinterpret_cast<vectorized::AggregateDataPtr>(
                 _agg_profile_arena->alloc(p._total_size_of_aggregate_states));
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index 125f176375b..d2246a2eaa2 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -43,6 +43,7 @@ public:
     DistinctStreamingAggLocalState(RuntimeState* state, OperatorXBase* parent);
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
+    Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
 
 private:
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 07c7130894a..2c37f24eac4 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -99,10 +99,12 @@ bool ExchangeSinkLocalState::transfer_large_data_by_brpc() 
const {
     return _parent->cast<ExchangeSinkOperatorX>()._transfer_large_data_by_brpc;
 }
 
+static const std::string timer_name = "WaitForDependencyTime";
+
 Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     _sender_id = info.sender_id;
 
     _bytes_sent_counter = ADD_COUNTER(_profile, "BytesSent", TUnit::BYTES);
@@ -125,7 +127,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
             "");
     _merge_block_timer = ADD_TIMER(profile(), "MergeBlockTime");
     _local_bytes_send_counter = ADD_COUNTER(_profile, "LocalBytesSent", 
TUnit::BYTES);
-    static const std::string timer_name = "WaitForDependencyTime";
     _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_profile, timer_name, 1);
     _wait_queue_timer =
             ADD_CHILD_TIMER_WITH_LEVEL(_profile, "WaitForRpcBufferQueue", 
timer_name, 1);
@@ -150,9 +151,24 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
     }
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
+    // Make sure brpc stub is ready before execution.
+    for (int i = 0; i < channels.size(); ++i) {
+        RETURN_IF_ERROR(channels[i]->init_stub(state));
+    }
+    return Status::OK();
+}
+
+Status ExchangeSinkLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
+    auto& p = _parent->cast<ExchangeSinkOperatorX>();
+    _part_type = p._part_type;
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+
     int local_size = 0;
     for (int i = 0; i < channels.size(); ++i) {
-        RETURN_IF_ERROR(channels[i]->init(state));
+        RETURN_IF_ERROR(channels[i]->open(state));
         if (channels[i]->is_local()) {
             local_size++;
         }
@@ -192,15 +208,13 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
                 ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer", 
timer_name);
     } else if (local_size > 0) {
         size_t dep_id = 0;
-        _local_channels_dependency.resize(local_size);
-        _wait_channel_timer.resize(local_size);
         for (auto* channel : channels) {
             if (channel->is_local()) {
-                _local_channels_dependency[dep_id] = 
channel->get_local_channel_dependency();
+                
_local_channels_dependency.push_back(channel->get_local_channel_dependency());
                 DCHECK(_local_channels_dependency[dep_id] != nullptr);
-                _wait_channel_timer[dep_id] = _profile->add_nonzero_counter(
+                _wait_channel_timer.push_back(_profile->add_nonzero_counter(
                         fmt::format("WaitForLocalExchangeBuffer{}", dep_id), 
TUnit ::TIME_NS,
-                        timer_name, 1);
+                        timer_name, 1));
                 dep_id++;
             }
         }
@@ -282,12 +296,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
     }
 
     _finish_dependency->block();
-
-    return Status::OK();
-}
-
-Status ExchangeSinkLocalState::open(RuntimeState* state) {
-    RETURN_IF_ERROR(Base::open(state));
     if (_part_type == TPartitionType::HASH_PARTITIONED ||
         _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
         _part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
@@ -685,7 +693,10 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
     }
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_close_timer);
-    COUNTER_UPDATE(_wait_queue_timer, 
_queue_dependency->watcher_elapse_time());
+    if (_queue_dependency) {
+        COUNTER_UPDATE(_wait_queue_timer, 
_queue_dependency->watcher_elapse_time());
+    }
+
     COUNTER_SET(_wait_for_finish_dependency_timer, 
_finish_dependency->watcher_elapse_time());
     if (_broadcast_dependency) {
         COUNTER_UPDATE(_wait_broadcast_buffer_timer, 
_broadcast_dependency->watcher_elapse_time());
@@ -694,8 +705,10 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
         COUNTER_UPDATE(_wait_channel_timer[i],
                        _local_channels_dependency[i]->watcher_elapse_time());
     }
-    _sink_buffer->update_profile(profile());
-    _sink_buffer->close();
+    if (_sink_buffer) {
+        _sink_buffer->update_profile(profile());
+        _sink_buffer->close();
+    }
     return Base::close(state, exec_status);
 }
 
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 17878fc6ead..9c40242cd03 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -158,7 +158,7 @@ private:
     friend class vectorized::PipChannel<ExchangeSinkLocalState>;
     friend class vectorized::BlockSerializer<ExchangeSinkLocalState>;
 
-    std::unique_ptr<ExchangeSinkBuffer<ExchangeSinkLocalState>> _sink_buffer;
+    std::unique_ptr<ExchangeSinkBuffer<ExchangeSinkLocalState>> _sink_buffer = 
nullptr;
     RuntimeProfile::Counter* _serialize_batch_timer = nullptr;
     RuntimeProfile::Counter* _compress_timer = nullptr;
     RuntimeProfile::Counter* _brpc_send_timer = nullptr;
@@ -268,7 +268,7 @@ private:
                                      vectorized::Block* block, bool eos);
     RuntimeState* _state = nullptr;
 
-    const std::vector<TExpr>& _texprs;
+    const std::vector<TExpr> _texprs;
 
     const RowDescriptor& _row_desc;
 
@@ -291,10 +291,10 @@ private:
     segment_v2::CompressionTypePB _compression_type;
 
     // for tablet sink shuffle
-    const TOlapTableSchemaParam& _tablet_sink_schema;
-    const TOlapTablePartitionParam& _tablet_sink_partition;
-    const TOlapTableLocationParam& _tablet_sink_location;
-    const TTupleId& _tablet_sink_tuple_id;
+    const TOlapTableSchemaParam _tablet_sink_schema;
+    const TOlapTablePartitionParam _tablet_sink_partition;
+    const TOlapTableLocationParam _tablet_sink_location;
+    const TTupleId _tablet_sink_tuple_id;
     int64_t _tablet_sink_txn_id = -1;
     std::shared_ptr<ObjectPool> _pool;
 
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index fd44b23995d..a23dae6dd62 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -69,7 +69,7 @@ std::string ExchangeSourceOperatorX::debug_string(int 
indentation_level) const {
 Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<ExchangeSourceOperatorX>();
     stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
             state, p.input_row_desc(), state->fragment_instance_id(), 
p.node_id(), p.num_senders(),
@@ -77,19 +77,16 @@ Status ExchangeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     const auto& queues = stream_recvr->sender_queues();
     deps.resize(queues.size());
     metrics.resize(queues.size());
+    static const std::string timer_name = "WaitForDependencyTime";
+    _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, 
timer_name, 1);
     for (size_t i = 0; i < queues.size(); i++) {
         deps[i] = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
                                             "SHUFFLE_DATA_DEPENDENCY", 
state->get_query_ctx());
         queues[i]->set_dependency(deps[i]);
-    }
-    static const std::string timer_name = "WaitForDependencyTime";
-    _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, 
timer_name, 1);
-    for (size_t i = 0; i < queues.size(); i++) {
         metrics[i] = 
_runtime_profile->add_nonzero_counter(fmt::format("WaitForData{}", i),
                                                            TUnit ::TIME_NS, 
timer_name, 1);
     }
-    
RETURN_IF_ERROR(_parent->cast<ExchangeSourceOperatorX>()._vsort_exec_exprs.clone(
-            state, vsort_exec_exprs));
+
     return Status::OK();
 }
 
@@ -97,6 +94,9 @@ Status ExchangeLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     RETURN_IF_ERROR(Base::open(state));
+
+    
RETURN_IF_ERROR(_parent->cast<ExchangeSourceOperatorX>()._vsort_exec_exprs.clone(
+            state, vsort_exec_exprs));
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/repeat_operator.cpp 
b/be/src/pipeline/exec/repeat_operator.cpp
index 0f9cf93b3f5..42d009f0e76 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -47,10 +47,10 @@ RepeatLocalState::RepeatLocalState(RuntimeState* state, 
OperatorXBase* parent)
           _child_block(vectorized::Block::create_unique()),
           _repeat_id_idx(0) {}
 
-Status RepeatLocalState::init(RuntimeState* state, LocalStateInfo& info) {
-    RETURN_IF_ERROR(Base::init(state, info));
+Status RepeatLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
     auto& p = _parent->cast<Parent>();
     _expr_ctxs.resize(p._expr_ctxs.size());
     for (size_t i = 0; i < _expr_ctxs.size(); i++) {
diff --git a/be/src/pipeline/exec/repeat_operator.h 
b/be/src/pipeline/exec/repeat_operator.h
index 9cb671fccb0..208b3d1e005 100644
--- a/be/src/pipeline/exec/repeat_operator.h
+++ b/be/src/pipeline/exec/repeat_operator.h
@@ -54,7 +54,7 @@ public:
     using Base = PipelineXLocalState<FakeSharedState>;
     RepeatLocalState(RuntimeState* state, OperatorXBase* parent);
 
-    Status init(RuntimeState* state, LocalStateInfo& info) override;
+    Status open(RuntimeState* state) override;
 
     Status get_repeated_block(vectorized::Block* child_block, int 
repeat_id_idx,
                               vectorized::Block* output_block);
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index b89ce4adb2e..d0cd130cc8a 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -53,17 +53,18 @@ bool ResultSinkOperator::can_write() {
 Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     static const std::string timer_name = "WaitForDependencyTime";
     _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_profile, timer_name, 1);
     auto fragment_instance_id = state->fragment_instance_id();
+
+    _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", 
TUnit::UNIT, 1);
+    _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", 
TUnit::UNIT, 1);
+
     // create sender
-    std::shared_ptr<BufferControlBlock> sender = nullptr;
     RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
             state->fragment_instance_id(), 
vectorized::RESULT_SINK_BUFFER_SIZE, &_sender, true,
             state->execution_timeout()));
-    _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", 
TUnit::UNIT, 1);
-    _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", 
TUnit::UNIT, 1);
     
((PipBufferControlBlock*)_sender.get())->set_dependency(_dependency->shared_from_this());
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/result_sink_operator.h 
b/be/src/pipeline/exec/result_sink_operator.h
index 564f05eef43..aed9961a6d6 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -62,8 +62,8 @@ private:
 
     vectorized::VExprContextSPtrs _output_vexpr_ctxs;
 
-    std::shared_ptr<BufferControlBlock> _sender;
-    std::shared_ptr<ResultWriter> _writer;
+    std::shared_ptr<BufferControlBlock> _sender = nullptr;
+    std::shared_ptr<ResultWriter> _writer = nullptr;
     RuntimeProfile::Counter* _blocks_sent_counter = nullptr;
     RuntimeProfile::Counter* _rows_sent_counter = nullptr;
 };
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index baff3df2d37..94d07f8c0d6 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -110,19 +110,9 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, 
LocalStateInfo& info)
     _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
             _runtime_profile, "WaitForDependency[" + _scan_dependency->name() 
+ "]Time", 1);
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<typename Derived::Parent>();
     RETURN_IF_ERROR(RuntimeFilterConsumer::init(state, 
p.ignore_data_distribution()));
-
-    _common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size());
-    for (size_t i = 0; i < _common_expr_ctxs_push_down.size(); i++) {
-        RETURN_IF_ERROR(
-                p._common_expr_ctxs_push_down[i]->clone(state, 
_common_expr_ctxs_push_down[i]));
-    }
-    _stale_expr_ctxs.resize(p._stale_expr_ctxs.size());
-    for (size_t i = 0; i < _stale_expr_ctxs.size(); i++) {
-        RETURN_IF_ERROR(p._stale_expr_ctxs[i]->clone(state, 
_stale_expr_ctxs[i]));
-    }
     // init profile for runtime filter
     RuntimeFilterConsumer::_init_profile(profile());
     init_runtime_filter_dependency(_filter_dependencies, p.operator_id(), 
p.node_id(),
@@ -149,7 +139,18 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
     if (_opened) {
         return Status::OK();
     }
+    RETURN_IF_ERROR(PipelineXLocalState<>::open(state));
+    auto& p = _parent->cast<typename Derived::Parent>();
+    _common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size());
+    for (size_t i = 0; i < _common_expr_ctxs_push_down.size(); i++) {
+        RETURN_IF_ERROR(
+                p._common_expr_ctxs_push_down[i]->clone(state, 
_common_expr_ctxs_push_down[i]));
+    }
     RETURN_IF_ERROR(_acquire_runtime_filter(true));
+    _stale_expr_ctxs.resize(p._stale_expr_ctxs.size());
+    for (size_t i = 0; i < _stale_expr_ctxs.size(); i++) {
+        RETURN_IF_ERROR(p._stale_expr_ctxs[i]->clone(state, 
_stale_expr_ctxs[i]));
+    }
     RETURN_IF_ERROR(_process_conjuncts());
 
     auto status = _eos ? Status::OK() : _prepare_scanners();
diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp 
b/be/src/pipeline/exec/schema_scan_operator.cpp
index 20eec02bc13..2d32e21d991 100644
--- a/be/src/pipeline/exec/schema_scan_operator.cpp
+++ b/be/src/pipeline/exec/schema_scan_operator.cpp
@@ -48,7 +48,7 @@ Status SchemaScanLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
 
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<SchemaScanOperatorX>();
     _scanner_param.common_param = p._common_scanner_param;
     // init schema scanner profile
@@ -69,6 +69,9 @@ Status SchemaScanLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
 }
 
 Status SchemaScanLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(PipelineXLocalState<>::open(state));
     return _schema_scanner->start(state);
 }
 
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp 
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index 76248f2c75c..9ae40930d5f 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -137,23 +137,31 @@ Status 
SetProbeSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized
 
 template <bool is_intersect>
 Status SetProbeSinkLocalState<is_intersect>::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
-    RETURN_IF_ERROR(PipelineXSinkLocalState<SetSharedState>::init(state, 
info));
+    RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     Parent& parent = _parent->cast<Parent>();
     _shared_state->probe_finished_children_dependency[parent._cur_child_id] = 
_dependency;
     _dependency->block();
+
     _child_exprs.resize(parent._child_exprs.size());
     for (size_t i = 0; i < _child_exprs.size(); i++) {
         RETURN_IF_ERROR(parent._child_exprs[i]->clone(state, _child_exprs[i]));
     }
-
     auto& child_exprs_lists = _shared_state->child_exprs_lists;
     child_exprs_lists[parent._cur_child_id] = _child_exprs;
+    return Status::OK();
+}
+
+template <bool is_intersect>
+Status SetProbeSinkLocalState<is_intersect>::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
 
     // Add the if check only for compatible with old optimiser
-    if (child_exprs_lists.size() > 1) {
-        _probe_columns.resize(child_exprs_lists[1].size());
+    if (_shared_state->child_quantity > 1) {
+        _probe_columns.resize(_child_exprs.size());
     }
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h 
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 6b4197ea94b..9f80f03966b 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -81,6 +81,7 @@ public:
             : Base(parent, state) {}
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    Status open(RuntimeState* state) override;
     int64_t* valid_element_in_hash_tbl() { return 
&_shared_state->valid_element_in_hash_tbl; }
 
 private:
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp 
b/be/src/pipeline/exec/set_sink_operator.cpp
index 7ef4871555d..3b6de314060 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -162,28 +162,38 @@ template <bool is_intersect>
 Status SetSinkLocalState<is_intersect>::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     RETURN_IF_ERROR(PipelineXSinkLocalState<SetSharedState>::init(state, 
info));
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     _build_timer = ADD_TIMER(_profile, "BuildTime");
-
     auto& parent = _parent->cast<Parent>();
     _shared_state->probe_finished_children_dependency[parent._cur_child_id] = 
_dependency;
+    DCHECK(parent._cur_child_id == 0);
+    auto& child_exprs_lists = _shared_state->child_exprs_lists;
+    DCHECK(child_exprs_lists.empty() || child_exprs_lists.size() == 
parent._child_quantity);
+    if (child_exprs_lists.empty()) {
+        child_exprs_lists.resize(parent._child_quantity);
+    }
     _child_exprs.resize(parent._child_exprs.size());
     for (size_t i = 0; i < _child_exprs.size(); i++) {
         RETURN_IF_ERROR(parent._child_exprs[i]->clone(state, _child_exprs[i]));
     }
-
+    child_exprs_lists[parent._cur_child_id] = _child_exprs;
     _shared_state->child_quantity = parent._child_quantity;
+    return Status::OK();
+}
 
+template <bool is_intersect>
+Status SetSinkLocalState<is_intersect>::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(PipelineXSinkLocalState<SetSharedState>::open(state));
+
+    auto& parent = _parent->cast<Parent>();
+    DCHECK(parent._cur_child_id == 0);
     auto& child_exprs_lists = _shared_state->child_exprs_lists;
-    DCHECK(child_exprs_lists.empty() || child_exprs_lists.size() == 
parent._child_quantity);
-    if (child_exprs_lists.empty()) {
-        child_exprs_lists.resize(parent._child_quantity);
-    }
-    child_exprs_lists[parent._cur_child_id] = _child_exprs;
 
     _shared_state->hash_table_variants = 
std::make_unique<vectorized::SetHashTableVariants>();
 
-    for (const auto& ctx : child_exprs_lists[0]) {
+    for (const auto& ctx : child_exprs_lists[parent._cur_child_id]) {
         
_shared_state->build_not_ignore_null.push_back(ctx->root()->is_nullable());
     }
     _shared_state->hash_table_init();
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index 24f23593ea0..2a6bb63c02e 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -73,6 +73,7 @@ public:
     SetSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : 
Base(parent, state) {}
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    Status open(RuntimeState* state) override;
 
 private:
     friend class SetSinkOperatorX<is_intersect>;
diff --git a/be/src/pipeline/exec/set_source_operator.cpp 
b/be/src/pipeline/exec/set_source_operator.cpp
index 15524a25a7b..97ad66a867e 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -53,7 +53,7 @@ template <bool is_intersect>
 Status SetSourceLocalState<is_intersect>::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     _shared_state->probe_finished_children_dependency.resize(
             _parent->cast<SetSourceOperatorX<is_intersect>>()._child_quantity, 
nullptr);
     return Status::OK();
@@ -63,7 +63,7 @@ template <bool is_intersect>
 Status SetSourceLocalState<is_intersect>::open(RuntimeState* state) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
-    RETURN_IF_ERROR(PipelineXLocalState<SetSharedState>::open(state));
+    RETURN_IF_ERROR(Base::open(state));
     auto& child_exprs_lists = _shared_state->child_exprs_lists;
 
     auto output_data_types = vectorized::VectorizedUtils::get_data_types(
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp 
b/be/src/pipeline/exec/sort_sink_operator.cpp
index 687332e1aec..d89e54614d1 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -29,9 +29,18 @@ namespace doris::pipeline {
 OPERATOR_CODE_GENERATOR(SortSinkOperator, StreamingOperator)
 
 Status SortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) 
{
-    RETURN_IF_ERROR(PipelineXSinkLocalState<SortSharedState>::init(state, 
info));
+    RETURN_IF_ERROR(Base::init(state, info));
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_init_timer);
+    _sort_blocks_memory_usage =
+            ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "SortBlocks", TUnit::BYTES, 
"MemoryUsage", 1);
+    return Status::OK();
+}
+
+Status SortSinkLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
     auto& p = _parent->cast<SortSinkOperatorX>();
 
     RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs));
@@ -62,9 +71,6 @@ Status SortSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     _shared_state->sorter->init_profile(_profile);
 
     _profile->add_info_string("TOP-N", p._limit == -1 ? "false" : "true");
-
-    _sort_blocks_memory_usage =
-            ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "SortBlocks", TUnit::BYTES, 
"MemoryUsage", 1);
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/sort_sink_operator.h 
b/be/src/pipeline/exec/sort_sink_operator.h
index f23437f72f9..ad9c23401b4 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -51,12 +51,13 @@ class SortSinkOperatorX;
 
 class SortSinkLocalState : public PipelineXSinkLocalState<SortSharedState> {
     ENABLE_FACTORY_CREATOR(SortSinkLocalState);
+    using Base = PipelineXSinkLocalState<SortSharedState>;
 
 public:
-    SortSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
-            : PipelineXSinkLocalState<SortSharedState>(parent, state) {}
+    SortSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : 
Base(parent, state) {}
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    Status open(RuntimeState* state) override;
 
 private:
     friend class SortSinkOperatorX;
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 78c5c9f51e9..e83434250e0 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -27,11 +27,12 @@ 
SpillSortSinkLocalState::SpillSortSinkLocalState(DataSinkOperatorXBase* parent,
             parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FINISH_DEPENDENCY",
             state->get_query_ctx());
 }
+
 Status SpillSortSinkLocalState::init(doris::RuntimeState* state,
                                      doris::pipeline::LocalSinkStateInfo& 
info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
 
     _init_counters();
 
@@ -45,6 +46,7 @@ Status SpillSortSinkLocalState::init(doris::RuntimeState* 
state,
     }
     return Status::OK();
 }
+
 void SpillSortSinkLocalState::_init_counters() {
     _internal_runtime_profile = 
std::make_unique<RuntimeProfile>("internal_profile");
 
@@ -72,10 +74,7 @@ void SpillSortSinkLocalState::update_profile(RuntimeProfile* 
child_profile) {
     UPDATE_PROFILE(_merge_block_timer, "MergeBlockTime");
     UPDATE_PROFILE(_sort_blocks_memory_usage, "SortBlocks");
 }
-Status SpillSortSinkLocalState::open(RuntimeState* state) {
-    RETURN_IF_ERROR(Base::open(state));
-    return Status::OK();
-}
+
 Status SpillSortSinkLocalState::close(RuntimeState* state, Status 
execsink_status) {
     auto& parent = Base::_parent->template cast<Parent>();
     if (parent._enable_spill) {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h 
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index d66215411aa..4604696eff2 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -35,7 +35,6 @@ public:
     ~SpillSortSinkLocalState() override = default;
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
-    Status open(RuntimeState* state) override;
     Status close(RuntimeState* state, Status exec_status) override;
     Dependency* finishdependency() override { return _finish_dependency.get(); 
}
 
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 5edb0daf7fc..707a1c3f5a1 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -31,6 +31,8 @@ SpillSortLocalState::SpillSortLocalState(RuntimeState* state, 
OperatorXBase* par
 }
 Status SpillSortLocalState::init(RuntimeState* state, LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_init_timer);
     _internal_runtime_profile = 
std::make_unique<RuntimeProfile>("internal_profile");
     _spill_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillMergeSortTime", "Spill", 1);
     _spill_merge_sort_timer =
@@ -49,6 +51,8 @@ Status SpillSortLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
 }
 
 Status SpillSortLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
     if (_opened) {
         return Status::OK();
     }
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 1ec283bdc1f..dfcfb0ebc45 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -83,15 +83,7 @@ StreamingAggLocalState::StreamingAggLocalState(RuntimeState* 
state, OperatorXBas
 Status StreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& info) 
{
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(Base::exec_time_counter());
-    SCOPED_TIMER(Base::_open_timer);
-    auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
-    for (auto& evaluator : p._aggregate_evaluators) {
-        _aggregate_evaluators.push_back(evaluator->clone(state, p._pool));
-    }
-    _probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
-    for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) {
-        RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, 
_probe_expr_ctxs[i]));
-    }
+    SCOPED_TIMER(Base::_init_timer);
     _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), 
"HashTable",
                                                             TUnit::BYTES, 
"MemoryUsage", 1);
     _serialize_key_arena_memory_usage = 
Base::profile()->AddHighWaterMarkCounter(
@@ -119,7 +111,23 @@ Status StreamingAggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     _serialize_result_timer = ADD_TIMER(profile(), "SerializeResultTime");
     _hash_table_iterate_timer = ADD_TIMER(profile(), "HashTableIterateTime");
     _insert_keys_to_column_timer = ADD_TIMER(profile(), 
"InsertKeysToColumnTime");
-    COUNTER_SET(_max_row_size_counter, (int64_t)0);
+
+    return Status::OK();
+}
+
+Status StreamingAggLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(Base::exec_time_counter());
+    SCOPED_TIMER(Base::_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
+
+    auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
+    for (auto& evaluator : p._aggregate_evaluators) {
+        _aggregate_evaluators.push_back(evaluator->clone(state, p._pool));
+    }
+    _probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
+    for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) {
+        RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, 
_probe_expr_ctxs[i]));
+    }
 
     for (auto& evaluator : _aggregate_evaluators) {
         evaluator->set_timer(_merge_timer, _expr_timer);
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index 1ccb7e31d0f..2895fc63f39 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -42,6 +42,7 @@ public:
     ~StreamingAggLocalState() override = default;
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
+    Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
     Status do_pre_agg(vectorized::Block* input_block, vectorized::Block* 
output_block);
     void make_nullable_output_key(vectorized::Block* block);
diff --git a/be/src/pipeline/exec/table_function_operator.cpp 
b/be/src/pipeline/exec/table_function_operator.cpp
index bd4bc3f90d6..b4d993ef035 100644
--- a/be/src/pipeline/exec/table_function_operator.cpp
+++ b/be/src/pipeline/exec/table_function_operator.cpp
@@ -44,8 +44,10 @@ Status TableFunctionOperator::close(doris::RuntimeState* 
state) {
 TableFunctionLocalState::TableFunctionLocalState(RuntimeState* state, 
OperatorXBase* parent)
         : PipelineXLocalState<>(state, parent), 
_child_block(vectorized::Block::create_unique()) {}
 
-Status TableFunctionLocalState::init(RuntimeState* state, LocalStateInfo& 
info) {
-    RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
+Status TableFunctionLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(PipelineXLocalState<>::exec_time_counter());
+    SCOPED_TIMER(PipelineXLocalState<>::_open_timer);
+    RETURN_IF_ERROR(PipelineXLocalState<>::open(state));
     auto& p = _parent->cast<TableFunctionOperatorX>();
     _vfn_ctxs.resize(p._vfn_ctxs.size());
     for (size_t i = 0; i < _vfn_ctxs.size(); i++) {
diff --git a/be/src/pipeline/exec/table_function_operator.h 
b/be/src/pipeline/exec/table_function_operator.h
index 3379a8f5b4a..49dd242bfe7 100644
--- a/be/src/pipeline/exec/table_function_operator.h
+++ b/be/src/pipeline/exec/table_function_operator.h
@@ -55,7 +55,7 @@ public:
     TableFunctionLocalState(RuntimeState* state, OperatorXBase* parent);
     ~TableFunctionLocalState() override = default;
 
-    Status init(RuntimeState* state, LocalStateInfo& info) override;
+    Status open(RuntimeState* state) override;
     void process_next_child_row();
     Status get_expanded_block(RuntimeState* state, vectorized::Block* 
output_block, bool* eos);
 
diff --git a/be/src/pipeline/exec/union_sink_operator.cpp 
b/be/src/pipeline/exec/union_sink_operator.cpp
index ce1195f042b..5acf6c8e1a2 100644
--- a/be/src/pipeline/exec/union_sink_operator.cpp
+++ b/be/src/pipeline/exec/union_sink_operator.cpp
@@ -95,16 +95,24 @@ Status UnionSinkOperator::close(RuntimeState* state) {
 
 Status UnionSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
     RETURN_IF_ERROR(Base::init(state, info));
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_init_timer);
+    auto& p = _parent->cast<Parent>();
+    _shared_state->data_queue.set_sink_dependency(_dependency, 
p._cur_child_id);
+    return Status::OK();
+}
+
+Status UnionSinkLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
     auto& p = _parent->cast<Parent>();
     _child_expr.resize(p._child_expr.size());
-    _shared_state->data_queue.set_sink_dependency(_dependency, 
p._cur_child_id);
     for (size_t i = 0; i < p._child_expr.size(); i++) {
         RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
     }
     return Status::OK();
-};
+}
 
 UnionSinkOperatorX::UnionSinkOperatorX(int child_id, int sink_id, ObjectPool* 
pool,
                                        const TPlanNode& tnode, const 
DescriptorTbl& descs)
diff --git a/be/src/pipeline/exec/union_sink_operator.h 
b/be/src/pipeline/exec/union_sink_operator.h
index 6d79d3f2a9f..97b704078c6 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -73,6 +73,7 @@ public:
     UnionSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
             : Base(parent, state), _child_row_idx(0) {}
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    Status open(RuntimeState* state) override;
     friend class UnionSinkOperatorX;
     using Base = PipelineXSinkLocalState<UnionSharedState>;
     using Parent = UnionSinkOperatorX;
diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index de0348de508..10f98a8d1cb 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -110,7 +110,7 @@ Status UnionSourceOperator::get_block(RuntimeState* state, 
vectorized::Block* bl
 Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<Parent>();
     if (p.get_child_count() != 0) {
         ((UnionSharedState*)_dependency->shared_state())
@@ -124,6 +124,18 @@ Status UnionSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
                 _runtime_profile, "WaitForDependency[" + _dependency->name() + 
"]Time", 1);
     }
 
+    if (p.get_child_count() == 0) {
+        _dependency->set_ready();
+    }
+    return Status::OK();
+}
+
+Status UnionSourceLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
+
+    auto& p = _parent->cast<Parent>();
     // Const exprs materialized by this node. These exprs don't refer to any 
children.
     // Only materialized by the first fragment instance to avoid duplication.
     if (state->per_fragment_instance_idx() == 0) {
@@ -143,9 +155,6 @@ Status UnionSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
         }
     }
 
-    if (p.get_child_count() == 0) {
-        _dependency->set_ready();
-    }
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/union_source_operator.h 
b/be/src/pipeline/exec/union_source_operator.h
index 023e6363d48..60530521ec0 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -78,6 +78,7 @@ public:
     UnionSourceLocalState(RuntimeState* state, OperatorXBase* parent) : 
Base(state, parent) {};
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
+    Status open(RuntimeState* state) override;
 
     [[nodiscard]] std::string debug_string(int indentation_level = 0) const 
override;
 
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 08f0c4b73cc..a468729765f 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -415,6 +415,7 @@ Status 
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
     _blocks_returned_counter =
             ADD_COUNTER_WITH_LEVEL(_runtime_profile, "BlocksProduced", 
TUnit::UNIT, 1);
     _projection_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, 
"ProjectionTime", 1);
+    _init_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "InitTime", 1);
     _open_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "OpenTime", 1);
     _close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1);
     _exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1);
@@ -467,6 +468,7 @@ Status 
PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
         _dependency = nullptr;
     }
     _rows_input_counter = ADD_COUNTER_WITH_LEVEL(_profile, "InputRows", 
TUnit::UNIT, 1);
+    _init_timer = ADD_TIMER_WITH_LEVEL(_profile, "InitTime", 1);
     _open_timer = ADD_TIMER_WITH_LEVEL(_profile, "OpenTime", 1);
     _close_timer = ADD_TIMER_WITH_LEVEL(_profile, "CloseTime", 1);
     _exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1);
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 7a0a5d12172..2d1f1611b66 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -128,6 +128,7 @@ protected:
     RuntimeProfile::Counter* _exec_timer = nullptr;
     // Account for peak memory used by this node
     RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
+    RuntimeProfile::Counter* _init_timer = nullptr;
     RuntimeProfile::Counter* _open_timer = nullptr;
     RuntimeProfile::Counter* _close_timer = nullptr;
 
@@ -542,6 +543,7 @@ protected:
             std::make_unique<RuntimeProfile>("faker profile");
 
     RuntimeProfile::Counter* _rows_input_counter = nullptr;
+    RuntimeProfile::Counter* _init_timer = nullptr;
     RuntimeProfile::Counter* _open_timer = nullptr;
     RuntimeProfile::Counter* _close_timer = nullptr;
     RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 2627b56fe7d..da3da7404f6 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -105,9 +105,11 @@ Status PipelineXTask::prepare(const 
TPipelineInstanceParams& local_params, const
         query_ctx->register_query_statistics(
                 
_state->get_local_state(op->operator_id())->get_query_statistics_ptr());
     }
-
-    _block = doris::vectorized::Block::create_unique();
-    RETURN_IF_ERROR(_extract_dependencies());
+    {
+        const auto& deps = 
_state->get_local_state(_source->operator_id())->filter_dependencies();
+        std::copy(deps.begin(), deps.end(),
+                  std::inserter(_filter_dependencies, 
_filter_dependencies.end()));
+    }
     // We should make sure initial state for task are runnable so that we can 
do some preparation jobs (e.g. initialize runtime filters).
     set_state(PipelineTaskState::RUNNABLE);
     _prepared = true;
@@ -139,11 +141,6 @@ Status PipelineXTask::_extract_dependencies() {
             _finish_dependencies.push_back(fin_dep);
         }
     }
-    {
-        const auto& deps = 
_state->get_local_state(_source->operator_id())->filter_dependencies();
-        std::copy(deps.begin(), deps.end(),
-                  std::inserter(_filter_dependencies, 
_filter_dependencies.end()));
-    }
     return Status::OK();
 }
 
@@ -196,6 +193,8 @@ Status PipelineXTask::_open() {
         RETURN_IF_ERROR(local_state->open(_state));
     }
     RETURN_IF_ERROR(_state->get_sink_local_state()->open(_state));
+    RETURN_IF_ERROR(_extract_dependencies());
+    _block = doris::vectorized::Block::create_unique();
     _opened = true;
     return Status::OK();
 }
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp 
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index 75ac55dac50..46d335fbf00 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -76,9 +76,8 @@ std::shared_ptr<VDataStreamRecvr> 
VDataStreamMgr::create_recvr(
     return recvr;
 }
 
-std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::find_recvr(const TUniqueId& 
fragment_instance_id,
-                                                             PlanNodeId 
node_id,
-                                                             bool 
acquire_lock) {
+Status VDataStreamMgr::find_recvr(const TUniqueId& fragment_instance_id, 
PlanNodeId node_id,
+                                  std::shared_ptr<VDataStreamRecvr>* res, bool 
acquire_lock) {
     VLOG_ROW << "looking up fragment_instance_id=" << 
print_id(fragment_instance_id)
              << ", node=" << node_id;
     size_t hash_value = get_hash_value(fragment_instance_id, node_id);
@@ -93,11 +92,13 @@ std::shared_ptr<VDataStreamRecvr> 
VDataStreamMgr::find_recvr(const TUniqueId& fr
         auto recvr = range.first->second;
         if (recvr->fragment_instance_id() == fragment_instance_id &&
             recvr->dest_node_id() == node_id) {
-            return recvr;
+            *res = recvr;
+            return Status::OK();
         }
         ++range.first;
     }
-    return nullptr;
+    return Status::InternalError("Could not find local receiver for node {} 
with instance {}",
+                                 node_id, print_id(fragment_instance_id));
 }
 
 Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request,
@@ -106,7 +107,8 @@ Status VDataStreamMgr::transmit_block(const 
PTransmitDataParams* request,
     TUniqueId t_finst_id;
     t_finst_id.hi = finst_id.hi();
     t_finst_id.lo = finst_id.lo();
-    auto recvr = find_recvr(t_finst_id, request->node_id());
+    std::shared_ptr<VDataStreamRecvr> recvr = nullptr;
+    static_cast<void>(find_recvr(t_finst_id, request->node_id(), &recvr));
     if (recvr == nullptr) {
         // The receiver may remove itself from the receiver map via 
deregister_recvr()
         // at any time without considering the remaining number of senders.
@@ -191,7 +193,8 @@ void VDataStreamMgr::cancel(const TUniqueId& 
fragment_instance_id, Status exec_s
         FragmentStreamSet::iterator i =
                 
_fragment_stream_set.lower_bound(std::make_pair(fragment_instance_id, 0));
         while (i != _fragment_stream_set.end() && i->first == 
fragment_instance_id) {
-            std::shared_ptr<VDataStreamRecvr> recvr = find_recvr(i->first, 
i->second, false);
+            std::shared_ptr<VDataStreamRecvr> recvr;
+            WARN_IF_ERROR(find_recvr(i->first, i->second, &recvr, false), "");
             if (recvr == nullptr) {
                 // keep going but at least log it
                 std::stringstream err;
diff --git a/be/src/vec/runtime/vdata_stream_mgr.h 
b/be/src/vec/runtime/vdata_stream_mgr.h
index 853d9846211..43605bb6abd 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.h
+++ b/be/src/vec/runtime/vdata_stream_mgr.h
@@ -55,8 +55,8 @@ public:
                                                    PlanNodeId dest_node_id, 
int num_senders,
                                                    RuntimeProfile* profile, 
bool is_merging);
 
-    std::shared_ptr<VDataStreamRecvr> find_recvr(const TUniqueId& 
fragment_instance_id,
-                                                 PlanNodeId node_id, bool 
acquire_lock = true);
+    Status find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId 
node_id,
+                      std::shared_ptr<VDataStreamRecvr>* res, bool 
acquire_lock = true);
 
     Status deregister_recvr(const TUniqueId& fragment_instance_id, PlanNodeId 
node_id);
 
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 682f9cc2a1c..c220a76317a 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -80,6 +80,55 @@ Status Channel<Parent>::init(RuntimeState* state) {
 
     _brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000;
 
+    if (state->query_options().__isset.enable_local_exchange) {
+        _is_local &= state->query_options().enable_local_exchange;
+    }
+
+    if (_is_local) {
+        
RETURN_IF_ERROR(_parent->state()->exec_env()->vstream_mgr()->find_recvr(
+                _fragment_instance_id, _dest_node_id, &_local_recvr));
+    } else {
+        if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) {
+            _brpc_stub = 
state->exec_env()->brpc_internal_client_cache()->get_client(
+                    "127.0.0.1", _brpc_dest_addr.port);
+        } else {
+            _brpc_stub =
+                    
state->exec_env()->brpc_internal_client_cache()->get_client(_brpc_dest_addr);
+        }
+
+        if (!_brpc_stub) {
+            std::string msg = fmt::format("Get rpc stub failed, 
dest_addr={}:{}",
+                                          _brpc_dest_addr.hostname, 
_brpc_dest_addr.port);
+            LOG(WARNING) << msg;
+            return Status::InternalError(msg);
+        }
+    }
+
+    _serializer.set_is_local(_is_local);
+
+    // In bucket shuffle join will set fragment_instance_id (-1, -1)
+    // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
+    // so the empty channel not need call function close_internal()
+    _need_close = (_fragment_instance_id.hi != -1 && _fragment_instance_id.lo 
!= -1);
+    _state = state;
+    return Status::OK();
+}
+
+template <typename Parent>
+Status Channel<Parent>::init_stub(RuntimeState* state) {
+    if (_brpc_dest_addr.hostname.empty()) {
+        LOG(WARNING) << "there is no brpc destination address's hostname"
+                        ", maybe version is not compatible.";
+        return Status::InternalError("no brpc destination");
+    }
+    if (state->query_options().__isset.enable_local_exchange) {
+        _is_local &= state->query_options().enable_local_exchange;
+    }
+    if (_is_local) {
+        
RETURN_IF_ERROR(_parent->state()->exec_env()->vstream_mgr()->find_recvr(
+                _fragment_instance_id, _dest_node_id, &_local_recvr));
+        return Status::OK();
+    }
     if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) {
         _brpc_stub = 
state->exec_env()->brpc_internal_client_cache()->get_client(
                 "127.0.0.1", _brpc_dest_addr.port);
@@ -93,15 +142,27 @@ Status Channel<Parent>::init(RuntimeState* state) {
         LOG(WARNING) << msg;
         return Status::InternalError(msg);
     }
+    return Status::OK();
+}
 
-    if (state->query_options().__isset.enable_local_exchange) {
-        _is_local &= state->query_options().enable_local_exchange;
-    }
+template <typename Parent>
+Status Channel<Parent>::open(RuntimeState* state) {
+    _be_number = state->be_number();
+    _brpc_request = std::make_shared<PTransmitDataParams>();
+    // initialize brpc request
+    _brpc_request->mutable_finst_id()->set_hi(_fragment_instance_id.hi);
+    _brpc_request->mutable_finst_id()->set_lo(_fragment_instance_id.lo);
+    _finst_id = _brpc_request->finst_id();
 
-    if (_is_local) {
-        _local_recvr = _parent->state()->exec_env()->vstream_mgr()->find_recvr(
-                _fragment_instance_id, _dest_node_id);
-    }
+    _brpc_request->mutable_query_id()->set_hi(state->query_id().hi);
+    _brpc_request->mutable_query_id()->set_lo(state->query_id().lo);
+    _query_id = _brpc_request->query_id();
+
+    _brpc_request->set_node_id(_dest_node_id);
+    _brpc_request->set_sender_id(_parent->sender_id());
+    _brpc_request->set_be_number(_be_number);
+
+    _brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000;
 
     _serializer.set_is_local(_is_local);
 
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 726f5b7935b..b9462434f07 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -286,6 +286,9 @@ public:
     // Returns OK if successful, error indication otherwise.
     Status init(RuntimeState* state);
 
+    Status init_stub(RuntimeState* state);
+    Status open(RuntimeState* state);
+
     // Asynchronously sends a row batch.
     // Returns the status of the most recently finished transmit_data
     // rpc (or OK if there wasn't one that hasn't been reported yet).
@@ -396,8 +399,8 @@ protected:
     PUniqueId _finst_id;
     PUniqueId _query_id;
     PBlock _pb_block;
-    std::shared_ptr<PTransmitDataParams> _brpc_request;
-    std::shared_ptr<PBackendService_Stub> _brpc_stub;
+    std::shared_ptr<PTransmitDataParams> _brpc_request = nullptr;
+    std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
     std::shared_ptr<DummyBrpcCallback<PTransmitDataResult>> 
_send_remote_block_callback;
     Status _receiver_status;
     int32_t _brpc_timeout_ms = 500;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 3ae1e5723ac..951371de80c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -902,9 +902,7 @@ public class Coordinator implements CoordInterface {
             // For example: select * from numbers("number"="10") will generate 
ExchangeNode and
             // TableValuedFunctionScanNode, we should ensure 
TableValuedFunctionScanNode does not
             // send data until ExchangeNode is ready to receive.
-            boolean twoPhaseExecution = ConnectContext.get() != null
-                    && 
ConnectContext.get().getSessionVariable().isEnableSinglePhaseExecutionCommitOpt()
-                    ? fragments.size() > 1 && addressToBackendID.size() > 1 : 
fragments.size() > 1;
+            boolean twoPhaseExecution = fragments.size() > 1;
             for (PlanFragment fragment : fragments) {
                 FragmentExecParams params = 
fragmentExecParamsMap.get(fragment.getFragmentId());
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index fb0d8a42237..9581f5e600b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -212,8 +212,6 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_BUSHY_TREE = "enable_bushy_tree";
 
-    public static final String ENABLE_SINGLE_PHASE_EXECUTION_COMMIT_OPT = 
"enable_single_phase_execution_commit_opt";
-
     public static final String MAX_JOIN_NUMBER_BUSHY_TREE = 
"max_join_number_bushy_tree";
     public static final String ENABLE_PARTITION_TOPN = "enable_partition_topn";
 
@@ -1207,10 +1205,6 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_LOCAL_EXCHANGE, fuzzy = true, varType = 
VariableAnnotation.DEPRECATED)
     public boolean enableLocalExchange = true;
 
-    @VariableMgr.VarAttr(name = ENABLE_SINGLE_PHASE_EXECUTION_COMMIT_OPT, 
fuzzy = true,
-            varType = VariableAnnotation.DEPRECATED)
-    private boolean enableSinglePhaseExecutionCommitOpt = true;
-
     /**
      * For debug purpose, don't merge unique key and agg key when reading data.
      */
@@ -1758,7 +1752,6 @@ public class SessionVariable implements Serializable, 
Writable {
         this.parallelPipelineTaskNum = random.nextInt(8);
         this.enableCommonExprPushdown = random.nextBoolean();
         this.enableLocalExchange = random.nextBoolean();
-        this.enableSinglePhaseExecutionCommitOpt = random.nextBoolean();
         // This will cause be dead loop, disable it first
         // this.disableJoinReorder = random.nextBoolean();
         this.disableStreamPreaggregations = random.nextBoolean();
@@ -3611,8 +3604,4 @@ public class SessionVariable implements Serializable, 
Writable {
     public int getMaxMsgSizeOfResultReceiver() {
         return this.maxMsgSizeOfResultReceiver;
     }
-
-    public boolean isEnableSinglePhaseExecutionCommitOpt() {
-        return enableSinglePhaseExecutionCommitOpt;
-    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to