This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new acfb24f2804 [refactor](pipelineX) Simplify blocking state (#34408)
acfb24f2804 is described below

commit acfb24f28043f8cbc2a700c7d29a9ee082fec6bb
Author: Gabriel <[email protected]>
AuthorDate: Tue May 7 21:14:21 2024 +0800

    [refactor](pipelineX) Simplify blocking state (#34408)
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp      | 102 ++++++---------------
 be/src/pipeline/exec/exchange_sink_buffer.h        |  20 ++--
 be/src/pipeline/exec/exchange_sink_operator.cpp    |  20 ++--
 be/src/pipeline/exec/exchange_sink_operator.h      |  17 ++--
 be/src/pipeline/exec/operator.h                    |   6 +-
 be/src/pipeline/exec/result_file_sink_operator.cpp |   2 +-
 be/src/pipeline/pipeline_task.cpp                  |  60 ++----------
 be/src/pipeline/pipeline_task.h                    |  50 ++--------
 be/src/pipeline/task_scheduler.cpp                 |   5 +-
 be/src/vec/sink/vdata_stream_sender.cpp            |  65 ++++++++++---
 be/src/vec/sink/vdata_stream_sender.h              |  81 +++++-----------
 be/src/vec/sink/vresult_file_sink.cpp              |   3 +-
 be/src/vec/sink/writer/vfile_result_writer.cpp     |   4 +-
 be/src/vec/sink/writer/vfile_result_writer.h       |   8 +-
 14 files changed, 150 insertions(+), 293 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 7f46bfcf353..a8cba5cca04 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -83,9 +83,8 @@ std::shared_ptr<BroadcastPBlockHolder> 
BroadcastPBlockHolderQueue::pop() {
 
 namespace pipeline {
 
-template <typename Parent>
-ExchangeSinkBuffer<Parent>::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId 
dest_node_id,
-                                               int send_id, int be_number, 
RuntimeState* state)
+ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId 
dest_node_id, int send_id,
+                                       int be_number, RuntimeState* state)
         : HasTaskExecutionCtx(state),
           _queue_capacity(0),
           _is_finishing(false),
@@ -96,11 +95,9 @@ ExchangeSinkBuffer<Parent>::ExchangeSinkBuffer(PUniqueId 
query_id, PlanNodeId de
           _state(state),
           _context(state->get_query_ctx()) {}
 
-template <typename Parent>
-ExchangeSinkBuffer<Parent>::~ExchangeSinkBuffer() = default;
+ExchangeSinkBuffer::~ExchangeSinkBuffer() = default;
 
-template <typename Parent>
-void ExchangeSinkBuffer<Parent>::close() {
+void ExchangeSinkBuffer::close() {
     // Could not clear the queue here, because there maybe a running rpc want 
to
     // get a request from the queue, and clear method will release the request
     // and it will core.
@@ -109,8 +106,7 @@ void ExchangeSinkBuffer<Parent>::close() {
     //_instance_to_request.clear();
 }
 
-template <typename Parent>
-bool ExchangeSinkBuffer<Parent>::can_write() const {
+bool ExchangeSinkBuffer::can_write() const {
     size_t max_package_size =
             config::exchg_buffer_queue_capacity_factor * 
_instance_to_package_queue.size();
     size_t total_package_size = 0;
@@ -120,42 +116,13 @@ bool ExchangeSinkBuffer<Parent>::can_write() const {
     return total_package_size <= max_package_size;
 }
 
-template <typename Parent>
-void ExchangeSinkBuffer<Parent>::_set_ready_to_finish(bool all_done) {
+void ExchangeSinkBuffer::_set_ready_to_finish(bool all_done) {
     if (_finish_dependency && _should_stop && all_done) {
         _finish_dependency->set_ready();
     }
 }
 
-template <typename Parent>
-bool ExchangeSinkBuffer<Parent>::is_pending_finish() {
-    //note(wb) angly implementation here, because operator couples the 
scheduling logic
-    // graceful implementation maybe as follows:
-    // 1 make ExchangeSinkBuffer support try close which calls 
brpc::StartCancel
-    // 2 make BlockScheduler calls tryclose when query is cancel
-    DCHECK(_context != nullptr);
-    bool need_cancel = _context->is_cancelled();
-
-    for (auto& pair : _instance_to_package_queue_mutex) {
-        std::unique_lock<std::mutex> lock(*(pair.second));
-        auto& id = pair.first;
-        if (!_rpc_channel_is_idle.at(id)) {
-            // when pending finish, we need check whether current query is 
cancelled
-            if (need_cancel && _instance_to_rpc_ctx.find(id) != 
_instance_to_rpc_ctx.end()) {
-                auto& rpc_ctx = _instance_to_rpc_ctx[id];
-                if (!rpc_ctx.is_cancelled) {
-                    
brpc::StartCancel(rpc_ctx._send_callback->cntl_->call_id());
-                    rpc_ctx.is_cancelled = true;
-                }
-            }
-            return true;
-        }
-    }
-    return false;
-}
-
-template <typename Parent>
-void ExchangeSinkBuffer<Parent>::register_sink(TUniqueId fragment_instance_id) 
{
+void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
     if (_is_finishing) {
         return;
     }
@@ -165,10 +132,9 @@ void ExchangeSinkBuffer<Parent>::register_sink(TUniqueId 
fragment_instance_id) {
     }
     _instance_to_package_queue_mutex[low_id] = std::make_unique<std::mutex>();
     _instance_to_seq[low_id] = 0;
-    _instance_to_package_queue[low_id] =
-            std::queue<TransmitInfo<Parent>, 
std::list<TransmitInfo<Parent>>>();
+    _instance_to_package_queue[low_id] = std::queue<TransmitInfo, 
std::list<TransmitInfo>>();
     _instance_to_broadcast_package_queue[low_id] =
-            std::queue<BroadcastTransmitInfo<Parent>, 
std::list<BroadcastTransmitInfo<Parent>>>();
+            std::queue<BroadcastTransmitInfo, 
std::list<BroadcastTransmitInfo>>();
     _queue_capacity =
             config::exchg_buffer_queue_capacity_factor * 
_instance_to_package_queue.size();
     PUniqueId finst_id;
@@ -181,8 +147,7 @@ void ExchangeSinkBuffer<Parent>::register_sink(TUniqueId 
fragment_instance_id) {
     _construct_request(low_id, finst_id);
 }
 
-template <typename Parent>
-Status ExchangeSinkBuffer<Parent>::add_block(TransmitInfo<Parent>&& request) {
+Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
     if (_is_finishing) {
         return Status::OK();
     }
@@ -212,8 +177,7 @@ Status 
ExchangeSinkBuffer<Parent>::add_block(TransmitInfo<Parent>&& request) {
     return Status::OK();
 }
 
-template <typename Parent>
-Status ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&& 
request) {
+Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
     if (_is_finishing) {
         return Status::OK();
     }
@@ -239,16 +203,14 @@ Status 
ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&& req
     return Status::OK();
 }
 
-template <typename Parent>
-Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) {
+Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
     std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
 
     DCHECK(_rpc_channel_is_idle[id] == false);
 
-    std::queue<TransmitInfo<Parent>, std::list<TransmitInfo<Parent>>>& q =
-            _instance_to_package_queue[id];
-    std::queue<BroadcastTransmitInfo<Parent>, 
std::list<BroadcastTransmitInfo<Parent>>>&
-            broadcast_q = _instance_to_broadcast_package_queue[id];
+    std::queue<TransmitInfo, std::list<TransmitInfo>>& q = 
_instance_to_package_queue[id];
+    std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& 
broadcast_q =
+            _instance_to_broadcast_package_queue[id];
 
     if (_is_finishing) {
         _rpc_channel_is_idle[id] = true;
@@ -417,8 +379,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
     return Status::OK();
 }
 
-template <typename Parent>
-void ExchangeSinkBuffer<Parent>::_construct_request(InstanceLoId id, PUniqueId 
finst_id) {
+void ExchangeSinkBuffer::_construct_request(InstanceLoId id, PUniqueId 
finst_id) {
     _instance_to_request[id] = std::make_shared<PTransmitDataParams>();
     _instance_to_request[id]->mutable_finst_id()->CopyFrom(finst_id);
     _instance_to_request[id]->mutable_query_id()->CopyFrom(_query_id);
@@ -428,8 +389,7 @@ void 
ExchangeSinkBuffer<Parent>::_construct_request(InstanceLoId id, PUniqueId f
     _instance_to_request[id]->set_be_number(_be_number);
 }
 
-template <typename Parent>
-void ExchangeSinkBuffer<Parent>::_ended(InstanceLoId id) {
+void ExchangeSinkBuffer::_ended(InstanceLoId id) {
     if (!_instance_to_package_queue_mutex.template contains(id)) {
         std::stringstream ss;
         ss << "failed find the instance id:" << id
@@ -450,33 +410,29 @@ void ExchangeSinkBuffer<Parent>::_ended(InstanceLoId id) {
     }
 }
 
-template <typename Parent>
-void ExchangeSinkBuffer<Parent>::_failed(InstanceLoId id, const std::string& 
err) {
+void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
     _is_finishing = true;
     _context->cancel(err, Status::Cancelled(err));
     _ended(id);
 }
 
-template <typename Parent>
-void ExchangeSinkBuffer<Parent>::_set_receiver_eof(InstanceLoId id) {
+void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
     std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
     _instance_to_receiver_eof[id] = true;
     if (!_rpc_channel_is_idle[id]) {
         _rpc_channel_is_idle[id] = true;
         _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
     }
-    std::queue<BroadcastTransmitInfo<Parent>, 
std::list<BroadcastTransmitInfo<Parent>>> empty;
+    std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> empty;
     swap(empty, _instance_to_broadcast_package_queue[id]);
 }
 
-template <typename Parent>
-bool ExchangeSinkBuffer<Parent>::_is_receiver_eof(InstanceLoId id) {
+bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) {
     std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
     return _instance_to_receiver_eof[id];
 }
 
-template <typename Parent>
-void ExchangeSinkBuffer<Parent>::get_max_min_rpc_time(int64_t* max_time, 
int64_t* min_time) {
+void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* 
min_time) {
     int64_t local_max_time = 0;
     int64_t local_min_time = INT64_MAX;
     for (auto& [id, time] : _instance_to_rpc_time) {
@@ -489,8 +445,7 @@ void 
ExchangeSinkBuffer<Parent>::get_max_min_rpc_time(int64_t* max_time, int64_t
     *min_time = local_min_time == INT64_MAX ? 0 : local_min_time;
 }
 
-template <typename Parent>
-int64_t ExchangeSinkBuffer<Parent>::get_sum_rpc_time() {
+int64_t ExchangeSinkBuffer::get_sum_rpc_time() {
     int64_t sum_time = 0;
     for (auto& [id, time] : _instance_to_rpc_time) {
         sum_time += time;
@@ -498,9 +453,8 @@ int64_t ExchangeSinkBuffer<Parent>::get_sum_rpc_time() {
     return sum_time;
 }
 
-template <typename Parent>
-void ExchangeSinkBuffer<Parent>::set_rpc_time(InstanceLoId id, int64_t 
start_rpc_time,
-                                              int64_t receive_rpc_time) {
+void ExchangeSinkBuffer::set_rpc_time(InstanceLoId id, int64_t start_rpc_time,
+                                      int64_t receive_rpc_time) {
     _rpc_count++;
     int64_t rpc_spend_time = receive_rpc_time - start_rpc_time;
     DCHECK(_instance_to_rpc_time.find(id) != _instance_to_rpc_time.end());
@@ -509,8 +463,7 @@ void ExchangeSinkBuffer<Parent>::set_rpc_time(InstanceLoId 
id, int64_t start_rpc
     }
 }
 
-template <typename Parent>
-void ExchangeSinkBuffer<Parent>::update_profile(RuntimeProfile* profile) {
+void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) {
     auto* _max_rpc_timer = ADD_TIMER(profile, "RpcMaxTime");
     auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime");
     auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime");
@@ -528,8 +481,5 @@ void 
ExchangeSinkBuffer<Parent>::update_profile(RuntimeProfile* profile) {
     _avg_rpc_timer->set(sum_time / std::max(static_cast<int64_t>(1), 
_rpc_count.load()));
 }
 
-template class ExchangeSinkBuffer<vectorized::VDataStreamSender>;
-template class ExchangeSinkBuffer<pipeline::ExchangeSinkLocalState>;
-
 } // namespace pipeline
 } // namespace doris
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 74207bbffd3..2955ff959de 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -51,7 +51,6 @@ class Dependency;
 
 namespace vectorized {
 class VDataStreamSender;
-template <typename>
 class PipChannel;
 
 template <typename T>
@@ -119,17 +118,15 @@ private:
 } // namespace vectorized
 
 namespace pipeline {
-template <typename Parent>
 struct TransmitInfo {
-    vectorized::PipChannel<Parent>* channel = nullptr;
+    vectorized::PipChannel* channel = nullptr;
     std::unique_ptr<PBlock> block;
     bool eos;
     Status exec_status;
 };
 
-template <typename Parent>
 struct BroadcastTransmitInfo {
-    vectorized::PipChannel<Parent>* channel = nullptr;
+    vectorized::PipChannel* channel = nullptr;
     std::shared_ptr<vectorized::BroadcastPBlockHolder> block_holder = nullptr;
     bool eos;
 };
@@ -195,7 +192,6 @@ struct ExchangeRpcContext {
 };
 
 // Each ExchangeSinkOperator have one ExchangeSinkBuffer
-template <typename Parent>
 class ExchangeSinkBuffer : public HasTaskExecutionCtx {
 public:
     ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int 
send_id, int be_number,
@@ -203,10 +199,9 @@ public:
     ~ExchangeSinkBuffer();
     void register_sink(TUniqueId);
 
-    Status add_block(TransmitInfo<Parent>&& request);
-    Status add_block(BroadcastTransmitInfo<Parent>&& request);
+    Status add_block(TransmitInfo&& request);
+    Status add_block(BroadcastTransmitInfo&& request);
     bool can_write() const;
-    bool is_pending_finish();
     void close();
     void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t 
receive_rpc_time);
     void update_profile(RuntimeProfile* profile);
@@ -233,13 +228,12 @@ private:
     phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
             _instance_to_package_queue_mutex;
     // store data in non-broadcast shuffle
-    phmap::flat_hash_map<InstanceLoId,
-                         std::queue<TransmitInfo<Parent>, 
std::list<TransmitInfo<Parent>>>>
+    phmap::flat_hash_map<InstanceLoId, std::queue<TransmitInfo, 
std::list<TransmitInfo>>>
             _instance_to_package_queue;
     size_t _queue_capacity;
     // store data in broadcast shuffle
-    phmap::flat_hash_map<InstanceLoId, 
std::queue<BroadcastTransmitInfo<Parent>,
-                                                  
std::list<BroadcastTransmitInfo<Parent>>>>
+    phmap::flat_hash_map<InstanceLoId,
+                         std::queue<BroadcastTransmitInfo, 
std::list<BroadcastTransmitInfo>>>
             _instance_to_broadcast_package_queue;
     using PackageSeq = int64_t;
     // must init zero
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 943eea704c7..4a48f5ebaa8 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -85,9 +85,9 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
         const auto& fragment_instance_id = p._dests[i].fragment_instance_id;
         if (fragment_id_to_channel_index.find(fragment_instance_id.lo) ==
             fragment_id_to_channel_index.end()) {
-            channel_shared_ptrs.emplace_back(new 
vectorized::PipChannel<ExchangeSinkLocalState>(
-                    this, p._row_desc, p._dests[i].brpc_server, 
fragment_instance_id,
-                    p._dest_node_id));
+            channel_shared_ptrs.emplace_back(
+                    new vectorized::PipChannel(this, p._row_desc, 
p._dests[i].brpc_server,
+                                               fragment_instance_id, 
p._dest_node_id));
             fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
                                                  channel_shared_ptrs.size() - 
1);
             channels.push_back(channel_shared_ptrs.back().get());
@@ -131,8 +131,8 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
     PUniqueId id;
     id.set_hi(_state->query_id().hi);
     id.set_lo(_state->query_id().lo);
-    _sink_buffer = 
std::make_unique<ExchangeSinkBuffer<ExchangeSinkLocalState>>(
-            id, p._dest_node_id, _sender_id, _state->be_number(), state);
+    _sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, p._dest_node_id, 
_sender_id,
+                                                        _state->be_number(), 
state);
 
     register_channels(_sink_buffer.get());
     _queue_dependency =
@@ -410,7 +410,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         }
     } else if (_part_type == TPartitionType::RANDOM) {
         // 1. select channel
-        vectorized::PipChannel<ExchangeSinkLocalState>* current_channel =
+        vectorized::PipChannel* current_channel =
                 local_state.channels[local_state.current_channel_idx];
         if (!current_channel->is_receiver_eof()) {
             // 2. serialize, send and rollover block
@@ -494,7 +494,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
     } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
         // Control the number of channels according to the flow, thereby 
controlling the number of table sink writers.
         // 1. select channel
-        vectorized::PipChannel<ExchangeSinkLocalState>* current_channel =
+        vectorized::PipChannel* current_channel =
                 local_state.channels[local_state.current_channel_idx];
         if (!current_channel->is_receiver_eof()) {
             // 2. serialize, send and rollover block
@@ -560,11 +560,9 @@ Status 
ExchangeSinkOperatorX::serialize_block(ExchangeSinkLocalState& state, vec
     return Status::OK();
 }
 
-void ExchangeSinkLocalState::register_channels(
-        pipeline::ExchangeSinkBuffer<ExchangeSinkLocalState>* buffer) {
+void ExchangeSinkLocalState::register_channels(pipeline::ExchangeSinkBuffer* 
buffer) {
     for (auto channel : channels) {
-        ((vectorized::PipChannel<ExchangeSinkLocalState>*)channel)
-                ->register_exchange_buffer(buffer);
+        ((vectorized::PipChannel*)channel)->register_exchange_buffer(buffer);
     }
 }
 
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index dd9dad12b58..df68c8de74b 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -58,9 +58,9 @@ public:
               current_channel_idx(0),
               only_local_exchange(false),
               _serializer(this) {
-        _finish_dependency = std::make_shared<FinishDependency>(
-                parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FINISH_DEPENDENCY",
-                state->get_query_ctx());
+        _finish_dependency = 
std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
+                                                          parent->get_name() + 
"_FINISH_DEPENDENCY",
+                                                          true, 
state->get_query_ctx());
     }
 
     std::vector<Dependency*> dependencies() const override {
@@ -78,7 +78,7 @@ public:
     Status close(RuntimeState* state, Status exec_status) override;
     Dependency* finishdependency() override { return _finish_dependency.get(); 
}
     Status serialize_block(vectorized::Block* src, PBlock* dest, int 
num_receivers = 1);
-    void 
register_channels(pipeline::ExchangeSinkBuffer<ExchangeSinkLocalState>* buffer);
+    void register_channels(pipeline::ExchangeSinkBuffer* buffer);
     Status 
get_next_available_buffer(std::shared_ptr<vectorized::BroadcastPBlockHolder>* 
holder);
 
     RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; }
@@ -110,9 +110,8 @@ public:
         return Status::OK();
     }
     Status _send_new_partition_batch();
-    std::vector<vectorized::PipChannel<ExchangeSinkLocalState>*> channels;
-    
std::vector<std::shared_ptr<vectorized::PipChannel<ExchangeSinkLocalState>>>
-            channel_shared_ptrs;
+    std::vector<vectorized::PipChannel*> channels;
+    std::vector<std::shared_ptr<vectorized::PipChannel>> channel_shared_ptrs;
     int current_channel_idx; // index of current channel to send to if _random 
== true
     bool only_local_exchange;
 
@@ -123,10 +122,10 @@ public:
 private:
     friend class ExchangeSinkOperatorX;
     friend class vectorized::Channel<ExchangeSinkLocalState>;
-    friend class vectorized::PipChannel<ExchangeSinkLocalState>;
+    friend class vectorized::PipChannel;
     friend class vectorized::BlockSerializer<ExchangeSinkLocalState>;
 
-    std::unique_ptr<ExchangeSinkBuffer<ExchangeSinkLocalState>> _sink_buffer = 
nullptr;
+    std::unique_ptr<ExchangeSinkBuffer> _sink_buffer = nullptr;
     RuntimeProfile::Counter* _serialize_batch_timer = nullptr;
     RuntimeProfile::Counter* _compress_timer = nullptr;
     RuntimeProfile::Counter* _brpc_send_timer = nullptr;
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index d80a76062d4..e4560c571c7 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -877,9 +877,9 @@ public:
     using Base = PipelineXSinkLocalState<FakeSharedState>;
     AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state)
             : Base(parent, state), _async_writer_dependency(nullptr) {
-        _finish_dependency = std::make_shared<FinishDependency>(
-                parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FINISH_DEPENDENCY",
-                state->get_query_ctx());
+        _finish_dependency = 
std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
+                                                          parent->get_name() + 
"_FINISH_DEPENDENCY",
+                                                          true, 
state->get_query_ctx());
     }
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index f8cadad1df7..fa2794c4c92 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -103,7 +103,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& i
         // create writer
         _writer.reset(new (std::nothrow) vectorized::VFileResultWriter(
                 p._file_opts.get(), p._storage_type, 
state->fragment_instance_id(),
-                _output_vexpr_ctxs, _sender.get(), nullptr, 
state->return_object_data_as_binary(),
+                _output_vexpr_ctxs, _sender, nullptr, 
state->return_object_data_as_binary(),
                 p._output_row_descriptor));
     } else {
         // init channel
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 3f4a875e8ed..2c211835c55 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -169,23 +169,15 @@ void PipelineTask::_init_profile() {
     _sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time);
     _close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time);
 
-    _wait_bf_timer = ADD_TIMER(_task_profile, "WaitBfTime");
     _wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
 
     _block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT);
-    _block_by_source_counts = ADD_COUNTER(_task_profile, 
"NumBlockedBySrcTimes", TUnit::UNIT);
-    _block_by_sink_counts = ADD_COUNTER(_task_profile, 
"NumBlockedBySinkTimes", TUnit::UNIT);
     _schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", 
TUnit::UNIT);
     _yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT);
     _core_change_times = ADD_COUNTER(_task_profile, "CoreChangeTimes", 
TUnit::UNIT);
-
-    _wait_bf_counts = ADD_COUNTER(_task_profile, "WaitBfTimes", TUnit::UNIT);
-    _wait_dependency_counts = ADD_COUNTER(_task_profile, 
"WaitDenpendencyTimes", TUnit::UNIT);
-    _pending_finish_counts = ADD_COUNTER(_task_profile, "PendingFinishTimes", 
TUnit::UNIT);
 }
 
 void PipelineTask::_fresh_profile_counter() {
-    COUNTER_SET(_wait_bf_timer, (int64_t)_wait_bf_watcher.elapsed_time());
     COUNTER_SET(_schedule_counts, (int64_t)_schedule_time);
     COUNTER_SET(_wait_worker_timer, 
(int64_t)_wait_worker_watcher.elapsed_time());
 }
@@ -233,12 +225,8 @@ Status PipelineTask::execute(bool* eos) {
         }
     }};
     *eos = false;
-    if (has_dependency()) {
-        set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
-        return Status::OK();
-    }
-    if (_runtime_filter_blocked_dependency() != nullptr) {
-        set_state(PipelineTaskState::BLOCKED_FOR_RF);
+    if (has_dependency() || _runtime_filter_blocked_dependency() != nullptr) {
+        set_state(PipelineTaskState::BLOCKED);
         return Status::OK();
     }
     // The status must be runnable
@@ -247,24 +235,16 @@ Status PipelineTask::execute(bool* eos) {
             SCOPED_RAW_TIMER(&time_spent);
             RETURN_IF_ERROR(_open());
         }
-        if (!source_can_read()) {
-            set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
-            return Status::OK();
-        }
-        if (!sink_can_write()) {
-            set_state(PipelineTaskState::BLOCKED_FOR_SINK);
+        if (!source_can_read() || !sink_can_write()) {
+            set_state(PipelineTaskState::BLOCKED);
             return Status::OK();
         }
     }
 
     Status status = Status::OK();
     while (!_fragment_context->is_canceled()) {
-        if (_root->need_data_from_children(_state) && !source_can_read()) {
-            set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
-            break;
-        }
-        if (!sink_can_write()) {
-            set_state(PipelineTaskState::BLOCKED_FOR_SINK);
+        if ((_root->need_data_from_children(_state) && !source_can_read()) || 
!sink_can_write()) {
+            set_state(PipelineTaskState::BLOCKED);
             break;
         }
 
@@ -379,34 +359,8 @@ void PipelineTask::set_state(PipelineTaskState state) {
     if (_cur_state == state) {
         return;
     }
-    if (_cur_state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
-        if (state == PipelineTaskState::RUNNABLE) {
-            _wait_source_watcher.stop();
-        }
-    } else if (_cur_state == PipelineTaskState::BLOCKED_FOR_SINK) {
-        if (state == PipelineTaskState::RUNNABLE) {
-            _wait_sink_watcher.stop();
-        }
-    } else if (_cur_state == PipelineTaskState::BLOCKED_FOR_RF) {
-        if (state == PipelineTaskState::RUNNABLE) {
-            _wait_bf_watcher.stop();
-        }
-    } else if (_cur_state == PipelineTaskState::RUNNABLE) {
+    if (_cur_state == PipelineTaskState::RUNNABLE && state != 
PipelineTaskState::RUNNABLE) {
         COUNTER_UPDATE(_block_counts, 1);
-        if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
-            _wait_source_watcher.start();
-            COUNTER_UPDATE(_block_by_source_counts, 1);
-        } else if (state == PipelineTaskState::BLOCKED_FOR_SINK) {
-            _wait_sink_watcher.start();
-            COUNTER_UPDATE(_block_by_sink_counts, 1);
-        } else if (state == PipelineTaskState::BLOCKED_FOR_RF) {
-            _wait_bf_watcher.start();
-            COUNTER_UPDATE(_wait_bf_counts, 1);
-        } else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) {
-            COUNTER_UPDATE(_wait_dependency_counts, 1);
-        } else if (state == PipelineTaskState::PENDING_FINISH) {
-            COUNTER_UPDATE(_pending_finish_counts, 1);
-        }
     }
 
     _cur_state = state;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index cebce249f4d..06c2cea3e2c 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -69,27 +69,21 @@ namespace doris::pipeline {
  */
 enum class PipelineTaskState : uint8_t {
     NOT_READY = 0, // do not prepare
-    BLOCKED_FOR_DEPENDENCY = 1,
-    BLOCKED_FOR_SOURCE = 2,
-    BLOCKED_FOR_SINK = 3,
-    RUNNABLE = 4, // can execute
+    BLOCKED = 1,   // blocked by dependency
+    RUNNABLE = 2,  // can execute
     PENDING_FINISH =
-            5, // compute task is over, but still hold resource. like some 
scan and sink task
-    FINISHED = 6,
-    CANCELED = 7,
-    BLOCKED_FOR_RF = 8,
+            3,    // compute task is over, but still hold resource. like some 
scan and sink task
+    FINISHED = 4, // finish with a regular state
+    CANCELED = 5, // being cancelled
+
 };
 
 inline const char* get_state_name(PipelineTaskState idx) {
     switch (idx) {
     case PipelineTaskState::NOT_READY:
         return "NOT_READY";
-    case PipelineTaskState::BLOCKED_FOR_DEPENDENCY:
-        return "BLOCKED_FOR_DEPENDENCY";
-    case PipelineTaskState::BLOCKED_FOR_SOURCE:
-        return "BLOCKED_FOR_SOURCE";
-    case PipelineTaskState::BLOCKED_FOR_SINK:
-        return "BLOCKED_FOR_SINK";
+    case PipelineTaskState::BLOCKED:
+        return "BLOCKED";
     case PipelineTaskState::RUNNABLE:
         return "RUNNABLE";
     case PipelineTaskState::PENDING_FINISH:
@@ -98,23 +92,10 @@ inline const char* get_state_name(PipelineTaskState idx) {
         return "FINISHED";
     case PipelineTaskState::CANCELED:
         return "CANCELED";
-    case PipelineTaskState::BLOCKED_FOR_RF:
-        return "BLOCKED_FOR_RF";
     }
-    LOG(FATAL) << "__builtin_unreachable";
     __builtin_unreachable();
 }
 
-inline bool is_final_state(PipelineTaskState idx) {
-    switch (idx) {
-    case PipelineTaskState::FINISHED:
-    case PipelineTaskState::CANCELED:
-        return true;
-    default:
-        return false;
-    }
-}
-
 class TaskQueue;
 class PriorityTaskQueue;
 class Dependency;
@@ -168,8 +149,6 @@ public:
 
     void finalize();
 
-    bool is_finished() const { return _finished.load(); }
-
     std::string debug_string();
 
     bool is_pending_finish() { return _finish_blocked_dependency() != nullptr; 
}
@@ -212,8 +191,6 @@ public:
 
     OperatorXPtr source() const { return _source; }
 
-    OperatorXs operatorXs() { return _operators; }
-
     int task_id() const { return _index; };
 
     void clear_blocking_state() {
@@ -261,7 +238,6 @@ public:
     void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
     PipelineTaskState get_state() const { return _cur_state; }
     void set_state(PipelineTaskState state);
-    TUniqueId instance_id() const { return _state->fragment_instance_id(); }
 
     bool is_running() { return _running.load(); }
     void set_running(bool running) { _running = running; }
@@ -349,7 +325,6 @@ private:
 
     uint32_t _index;
     PipelinePtr _pipeline;
-    bool _dependency_finish = false;
     bool _has_exceed_timeout = false;
     bool _prepared;
     bool _opened;
@@ -384,18 +359,9 @@ private:
     RuntimeProfile::Counter* _sink_timer = nullptr;
     RuntimeProfile::Counter* _close_timer = nullptr;
     RuntimeProfile::Counter* _block_counts = nullptr;
-    RuntimeProfile::Counter* _block_by_source_counts = nullptr;
-    RuntimeProfile::Counter* _block_by_sink_counts = nullptr;
     RuntimeProfile::Counter* _schedule_counts = nullptr;
-    MonotonicStopWatch _wait_source_watcher;
-    MonotonicStopWatch _wait_bf_watcher;
-    RuntimeProfile::Counter* _wait_bf_timer = nullptr;
-    RuntimeProfile::Counter* _wait_bf_counts = nullptr;
-    MonotonicStopWatch _wait_sink_watcher;
     MonotonicStopWatch _wait_worker_watcher;
     RuntimeProfile::Counter* _wait_worker_timer = nullptr;
-    RuntimeProfile::Counter* _wait_dependency_counts = nullptr;
-    RuntimeProfile::Counter* _pending_finish_counts = nullptr;
     // TODO we should calculate the time between when really runnable and 
runnable
     RuntimeProfile::Counter* _yield_counts = nullptr;
     RuntimeProfile::Counter* _core_change_times = nullptr;
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index b32343acb2e..ed88f19bae4 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -227,10 +227,7 @@ void TaskScheduler::_do_work(size_t index) {
 
         auto pipeline_state = task->get_state();
         switch (pipeline_state) {
-        case PipelineTaskState::BLOCKED_FOR_SOURCE:
-        case PipelineTaskState::BLOCKED_FOR_SINK:
-        case PipelineTaskState::BLOCKED_FOR_RF:
-        case PipelineTaskState::BLOCKED_FOR_DEPENDENCY:
+        case PipelineTaskState::BLOCKED:
             task->set_running(false);
             break;
         case PipelineTaskState::RUNNABLE:
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index cf805b5f0c4..2e79ed0dd5e 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -176,19 +176,57 @@ Status Channel<Parent>::open(RuntimeState* state) {
     return Status::OK();
 }
 
-template <typename Parent>
-std::shared_ptr<pipeline::Dependency> 
PipChannel<Parent>::get_local_channel_dependency() {
-    if (!Channel<Parent>::_local_recvr) {
-        if constexpr (std::is_same_v<pipeline::ExchangeSinkLocalState, 
Parent>) {
-            throw Exception(ErrorCode::INTERNAL_ERROR,
-                            "_local_recvr is null: " +
-                                    
std::to_string(Channel<Parent>::_parent->parent()->node_id()));
+std::shared_ptr<pipeline::Dependency> 
PipChannel::get_local_channel_dependency() {
+    if (!Channel<pipeline::ExchangeSinkLocalState>::_local_recvr) {
+        throw Exception(
+                ErrorCode::INTERNAL_ERROR,
+                "_local_recvr is null: " +
+                        
std::to_string(Channel<pipeline::ExchangeSinkLocalState>::_parent->parent()
+                                               ->node_id()));
+    }
+    return 
Channel<pipeline::ExchangeSinkLocalState>::_local_recvr->get_local_channel_dependency(
+            Channel<pipeline::ExchangeSinkLocalState>::_parent->sender_id());
+}
+
+Status PipChannel::send_remote_block(PBlock* block, bool eos, Status 
exec_status) {
+    
COUNTER_UPDATE(Channel<pipeline::ExchangeSinkLocalState>::_parent->blocks_sent_counter(),
 1);
+    std::unique_ptr<PBlock> pblock_ptr;
+    pblock_ptr.reset(block);
+
+    if (eos) {
+        if (_eos_send) {
+            return Status::OK();
         } else {
-            throw Exception(ErrorCode::INTERNAL_ERROR, "_local_recvr is null");
+            _eos_send = true;
+        }
+    }
+    if (eos || block->column_metas_size()) {
+        RETURN_IF_ERROR(_buffer->add_block({this, std::move(pblock_ptr), eos, 
exec_status}));
+    }
+    return Status::OK();
+}
+
+Status 
PipChannel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, 
bool eos) {
+    
COUNTER_UPDATE(Channel<pipeline::ExchangeSinkLocalState>::_parent->blocks_sent_counter(),
 1);
+    if (eos) {
+        if (_eos_send) {
+            return Status::OK();
         }
+        _eos_send = true;
     }
-    return Channel<Parent>::_local_recvr->get_local_channel_dependency(
-            Channel<Parent>::_parent->sender_id());
+    if (eos || block->get_block()->column_metas_size()) {
+        RETURN_IF_ERROR(_buffer->add_block({this, block, eos}));
+    }
+    return Status::OK();
+}
+
+Status PipChannel::send_current_block(bool eos, Status exec_status) {
+    if (Channel<pipeline::ExchangeSinkLocalState>::is_local()) {
+        return 
Channel<pipeline::ExchangeSinkLocalState>::send_local_block(exec_status, eos);
+    }
+    
SCOPED_CONSUME_MEM_TRACKER(Channel<pipeline::ExchangeSinkLocalState>::_parent->mem_tracker());
+    RETURN_IF_ERROR(send_remote_block(_pblock.release(), eos, exec_status));
+    return Status::OK();
 }
 
 template <typename Parent>
@@ -885,10 +923,9 @@ Status VDataStreamSender::_get_next_available_buffer(
     }
 }
 
-void VDataStreamSender::register_pipeline_channels(
-        pipeline::ExchangeSinkBuffer<VDataStreamSender>* buffer) {
+void 
VDataStreamSender::register_pipeline_channels(pipeline::ExchangeSinkBuffer* 
buffer) {
     for (auto channel : _channels) {
-        
((PipChannel<VDataStreamSender>*)channel)->register_exchange_buffer(buffer);
+        ((PipChannel*)channel)->register_exchange_buffer(buffer);
     }
 }
 
@@ -910,8 +947,6 @@ bool VDataStreamSender::channel_all_can_write() {
 
 template class Channel<pipeline::ExchangeSinkLocalState>;
 template class Channel<VDataStreamSender>;
-template class PipChannel<pipeline::ExchangeSinkLocalState>;
-template class PipChannel<VDataStreamSender>;
 template class Channel<pipeline::ResultFileSinkLocalState>;
 template class BlockSerializer<pipeline::ResultFileSinkLocalState>;
 template class BlockSerializer<pipeline::ExchangeSinkLocalState>;
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 959d6ead10f..5ca31bcbe44 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -67,9 +67,9 @@ enum CompressionTypePB : int;
 } // namespace segment_v2
 
 namespace pipeline {
-class ExchangeSinkOperator;
 class ExchangeSinkOperatorX;
 class Dependency;
+class ExchangeSinkLocalState;
 } // namespace pipeline
 
 namespace vectorized {
@@ -109,7 +109,6 @@ struct ShuffleChannelIds {
 
 class VDataStreamSender : public DataSink {
 public:
-    friend class pipeline::ExchangeSinkOperator;
     VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id,
                       const RowDescriptor& row_desc, const TDataStreamSink& 
sink,
                       const std::vector<TPlanFragmentDestination>& 
destinations);
@@ -131,7 +130,7 @@ public:
 
     RuntimeState* state() { return _state; }
 
-    void 
register_pipeline_channels(pipeline::ExchangeSinkBuffer<VDataStreamSender>* 
buffer);
+    void register_pipeline_channels(pipeline::ExchangeSinkBuffer* buffer);
 
     bool channel_all_can_write();
 
@@ -154,8 +153,7 @@ public:
 protected:
     friend class BlockSerializer<VDataStreamSender>;
     friend class Channel<VDataStreamSender>;
-    friend class PipChannel<VDataStreamSender>;
-    friend class pipeline::ExchangeSinkBuffer<VDataStreamSender>;
+    friend class PipChannel;
 
     void _roll_pb_block();
     Status _get_next_available_buffer(std::shared_ptr<BroadcastPBlockHolder>* 
holder);
@@ -254,7 +252,7 @@ template <typename Parent = VDataStreamSender>
 class Channel {
 public:
     friend class VDataStreamSender;
-    friend class pipeline::ExchangeSinkBuffer<Parent>;
+    friend class pipeline::ExchangeSinkBuffer;
     // Create channel to send data to particular ipaddress/port/query/node
     // combination. buffer_size is specified in bytes and a soft limit on
     // how much tuple data is getting accumulated before being sent; it only 
applies
@@ -465,18 +463,19 @@ Status VDataStreamSender::channel_add_rows_with_idx(
     return Status::OK();
 }
 
-template <typename Parent = VDataStreamSender>
-class PipChannel final : public Channel<Parent> {
+class PipChannel final : public Channel<pipeline::ExchangeSinkLocalState> {
 public:
-    PipChannel(Parent* parent, const RowDescriptor& row_desc, const 
TNetworkAddress& brpc_dest,
-               const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id)
-            : Channel<Parent>(parent, row_desc, brpc_dest, 
fragment_instance_id, dest_node_id) {
+    PipChannel(pipeline::ExchangeSinkLocalState* parent, const RowDescriptor& 
row_desc,
+               const TNetworkAddress& brpc_dest, const TUniqueId& 
fragment_instance_id,
+               PlanNodeId dest_node_id)
+            : Channel<pipeline::ExchangeSinkLocalState>(parent, row_desc, 
brpc_dest,
+                                                        fragment_instance_id, 
dest_node_id) {
         ch_roll_pb_block();
     }
 
     ~PipChannel() override {
-        if (Channel<Parent>::_ch_cur_pb_block) {
-            delete Channel<Parent>::_ch_cur_pb_block;
+        if (Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block) {
+            delete Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block;
         }
     }
 
@@ -487,7 +486,7 @@ public:
         // 2. Create a new PBlock every time. In this way we don't need a lock 
but have to allocate
         //    new memory.
         // Now we use the second way.
-        Channel<Parent>::_ch_cur_pb_block = new PBlock();
+        Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block = new 
PBlock();
     }
 
     // Asynchronously sends a block
@@ -495,48 +494,21 @@ public:
     // rpc (or OK if there wasn't one that hasn't been reported yet).
     // if batch is nullptr, send the eof packet
     Status send_remote_block(PBlock* block, bool eos = false,
-                             Status exec_status = Status::OK()) override {
-        COUNTER_UPDATE(Channel<Parent>::_parent->blocks_sent_counter(), 1);
-        std::unique_ptr<PBlock> pblock_ptr;
-        pblock_ptr.reset(block);
-
-        if (eos) {
-            if (_eos_send) {
-                return Status::OK();
-            } else {
-                _eos_send = true;
-            }
-        }
-        if (eos || block->column_metas_size()) {
-            RETURN_IF_ERROR(_buffer->add_block({this, std::move(pblock_ptr), 
eos, exec_status}));
-        }
-        return Status::OK();
-    }
+                             Status exec_status = Status::OK()) override;
 
     Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
-                                bool eos = false) override {
-        COUNTER_UPDATE(Channel<Parent>::_parent->blocks_sent_counter(), 1);
-        if (eos) {
-            if (_eos_send) {
-                return Status::OK();
-            }
-            _eos_send = true;
-        }
-        if (eos || block->get_block()->column_metas_size()) {
-            RETURN_IF_ERROR(_buffer->add_block({this, block, eos}));
-        }
-        return Status::OK();
-    }
+                                bool eos = false) override;
 
     Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) 
override {
-        if (Channel<Parent>::_fragment_instance_id.lo == -1) {
+        if 
(Channel<pipeline::ExchangeSinkLocalState>::_fragment_instance_id.lo == -1) {
             return Status::OK();
         }
 
         bool serialized = false;
         _pblock = std::make_unique<PBlock>();
-        RETURN_IF_ERROR(Channel<Parent>::_serializer.next_serialized_block(
-                block, _pblock.get(), 1, &serialized, eos, &rows));
+        RETURN_IF_ERROR(
+                
Channel<pipeline::ExchangeSinkLocalState>::_serializer.next_serialized_block(
+                        block, _pblock.get(), 1, &serialized, eos, &rows));
         if (serialized) {
             Status exec_status = Status::OK();
             RETURN_IF_ERROR(send_current_block(eos, exec_status));
@@ -546,18 +518,11 @@ public:
     }
 
     // send _mutable_block
-    Status send_current_block(bool eos, Status exec_status) override {
-        if (Channel<Parent>::is_local()) {
-            return Channel<Parent>::send_local_block(exec_status, eos);
-        }
-        SCOPED_CONSUME_MEM_TRACKER(Channel<Parent>::_parent->mem_tracker());
-        RETURN_IF_ERROR(send_remote_block(_pblock.release(), eos, 
exec_status));
-        return Status::OK();
-    }
+    Status send_current_block(bool eos, Status exec_status) override;
 
-    void register_exchange_buffer(pipeline::ExchangeSinkBuffer<Parent>* 
buffer) {
+    void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) {
         _buffer = buffer;
-        _buffer->register_sink(Channel<Parent>::_fragment_instance_id);
+        
_buffer->register_sink(Channel<pipeline::ExchangeSinkLocalState>::_fragment_instance_id);
     }
 
     std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> 
get_send_callback(
@@ -576,7 +541,7 @@ public:
 private:
     friend class VDataStreamSender;
 
-    pipeline::ExchangeSinkBuffer<Parent>* _buffer = nullptr;
+    pipeline::ExchangeSinkBuffer* _buffer = nullptr;
     bool _eos_send = false;
     std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> 
_send_callback;
     std::unique_ptr<PBlock> _pblock;
diff --git a/be/src/vec/sink/vresult_file_sink.cpp 
b/be/src/vec/sink/vresult_file_sink.cpp
index bb7630fccee..e10ebec7806 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -83,8 +83,7 @@ Status VResultFileSink::prepare(RuntimeState* state) {
         // create writer
         _writer.reset(new (std::nothrow) VFileResultWriter(
                 _file_opts.get(), _storage_type, 
state->fragment_instance_id(), _output_vexpr_ctxs,
-                _sender.get(), nullptr, state->return_object_data_as_binary(),
-                _output_row_descriptor));
+                _sender, nullptr, state->return_object_data_as_binary(), 
_output_row_descriptor));
     } else {
         // init channel
         _output_block =
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp 
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index d83b3ce5103..d8f6cfa5747 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -66,8 +66,8 @@ VFileResultWriter::VFileResultWriter(const ResultFileOptions* 
file_opts,
                                      const TStorageBackendType::type 
storage_type,
                                      const TUniqueId fragment_instance_id,
                                      const VExprContextSPtrs& 
output_vexpr_ctxs,
-                                     BufferControlBlock* sinker, Block* 
output_block,
-                                     bool output_object_data,
+                                     std::shared_ptr<BufferControlBlock> 
sinker,
+                                     Block* output_block, bool 
output_object_data,
                                      const RowDescriptor& 
output_row_descriptor)
         : AsyncResultWriter(output_vexpr_ctxs),
           _file_opts(file_opts),
diff --git a/be/src/vec/sink/writer/vfile_result_writer.h 
b/be/src/vec/sink/writer/vfile_result_writer.h
index 72ba90cd015..30d5893a01d 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -52,9 +52,9 @@ public:
     VFileResultWriter(const ResultFileOptions* file_option,
                       const TStorageBackendType::type storage_type,
                       const TUniqueId fragment_instance_id,
-                      const VExprContextSPtrs& _output_vexpr_ctxs, 
BufferControlBlock* sinker,
-                      Block* output_block, bool output_object_data,
-                      const RowDescriptor& output_row_descriptor);
+                      const VExprContextSPtrs& _output_vexpr_ctxs,
+                      std::shared_ptr<BufferControlBlock> sinker, Block* 
output_block,
+                      bool output_object_data, const RowDescriptor& 
output_row_descriptor);
 
     VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
 
@@ -129,7 +129,7 @@ private:
     RuntimeProfile::Counter* _written_data_bytes = nullptr;
 
     // _sinker and _output_batch are not owned by FileResultWriter
-    BufferControlBlock* _sinker = nullptr;
+    std::shared_ptr<BufferControlBlock> _sinker = nullptr;
     Block* _output_block = nullptr;
     // set to true if the final statistic result is sent
     bool _is_result_sent = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to