This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b1eef30b493 [pipelineX](dependency) Wake up task by dependencies 
(#26879)
b1eef30b493 is described below

commit b1eef30b49318caa42110bede0535fc76dc669ac
Author: Gabriel <[email protected]>
AuthorDate: Sat Nov 18 03:20:24 2023 +0800

    [pipelineX](dependency) Wake up task by dependencies (#26879)
    
    ---------
    
    Co-authored-by: Mryange <[email protected]>
---
 be/src/http/action/pipeline_task_action.cpp        |  41 +++
 be/src/http/action/pipeline_task_action.h          |  36 +++
 be/src/pipeline/exec/exchange_sink_buffer.cpp      |  33 +-
 be/src/pipeline/exec/exchange_sink_buffer.h        |   8 +
 be/src/pipeline/exec/exchange_sink_operator.cpp    |  15 +-
 be/src/pipeline/exec/exchange_sink_operator.h      |  47 ++-
 be/src/pipeline/exec/exchange_source_operator.cpp  |   5 +-
 be/src/pipeline/exec/exchange_source_operator.h    |   7 +-
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |   2 +-
 be/src/pipeline/exec/hashjoin_build_sink.h         |   3 +-
 .../exec/multi_cast_data_stream_source.cpp         |   3 +-
 be/src/pipeline/exec/multi_cast_data_streamer.cpp  |  31 ++
 be/src/pipeline/exec/multi_cast_data_streamer.h    |  20 +-
 .../pipeline/exec/partition_sort_source_operator.h |   2 +-
 be/src/pipeline/exec/result_sink_operator.cpp      |  12 +-
 be/src/pipeline/exec/result_sink_operator.h        |  10 +-
 be/src/pipeline/exec/scan_operator.cpp             |  19 +-
 be/src/pipeline/exec/scan_operator.h               |  40 +--
 be/src/pipeline/exec/set_probe_sink_operator.cpp   |   9 +-
 be/src/pipeline/exec/set_sink_operator.cpp         |   4 +-
 be/src/pipeline/exec/set_source_operator.cpp       |   2 +-
 be/src/pipeline/exec/union_source_operator.cpp     |   3 +-
 be/src/pipeline/pipeline_fragment_context.cpp      |   3 +-
 be/src/pipeline/pipeline_fragment_context.h        |   8 +-
 be/src/pipeline/pipeline_task.h                    |   7 +
 be/src/pipeline/pipeline_x/dependency.cpp          | 230 +++++++++++++-
 be/src/pipeline/pipeline_x/dependency.h            | 340 +++++++++------------
 .../local_exchange_sink_operator.cpp               |   5 +-
 .../local_exchange_source_operator.cpp             |   2 +-
 be/src/pipeline/pipeline_x/operator.cpp            |  17 +-
 be/src/pipeline/pipeline_x/operator.h              |   8 +-
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  25 +-
 .../pipeline_x/pipeline_x_fragment_context.h       |   2 +
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     |  75 +++--
 be/src/pipeline/pipeline_x/pipeline_x_task.h       | 101 +++---
 be/src/pipeline/task_scheduler.cpp                 |  56 +++-
 be/src/pipeline/task_scheduler.h                   |   1 -
 be/src/runtime/fragment_mgr.cpp                    |  18 ++
 be/src/runtime/fragment_mgr.h                      |   2 +
 be/src/service/http_service.cpp                    |   6 +
 be/src/vec/exec/scan/scanner_context.cpp           |  54 ++--
 be/src/vec/exec/scan/scanner_context.h             |   4 +
 be/src/vec/runtime/vdata_stream_recvr.cpp          |   2 +-
 be/src/vec/sink/writer/async_result_writer.cpp     |   5 +-
 be/src/vec/sink/writer/async_result_writer.h       |   3 +-
 45 files changed, 858 insertions(+), 468 deletions(-)

diff --git a/be/src/http/action/pipeline_task_action.cpp 
b/be/src/http/action/pipeline_task_action.cpp
new file mode 100644
index 00000000000..c3b49b57136
--- /dev/null
+++ b/be/src/http/action/pipeline_task_action.cpp
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/pipeline_task_action.h"
+
+#include <sstream>
+#include <string>
+
+#include "http/http_channel.h"
+#include "http/http_headers.h"
+#include "http/http_request.h"
+#include "http/http_status.h"
+#include "pipeline/pipeline_fragment_context.h"
+#include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
+
+namespace doris {
+
+const static std::string HEADER_JSON = "application/json";
+
+void PipelineTaskAction::handle(HttpRequest* req) {
+    req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; 
version=0.0.4");
+    HttpChannel::send_reply(req, HttpStatus::OK,
+                            
ExecEnv::GetInstance()->fragment_mgr()->dump_pipeline_tasks());
+}
+
+} // end namespace doris
diff --git a/be/src/http/action/pipeline_task_action.h 
b/be/src/http/action/pipeline_task_action.h
new file mode 100644
index 00000000000..488a1148a53
--- /dev/null
+++ b/be/src/http/action/pipeline_task_action.h
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "http/http_handler.h"
+
+namespace doris {
+
+class HttpRequest;
+
+// Get BE health state from http API.
+class PipelineTaskAction : public HttpHandler {
+public:
+    PipelineTaskAction() = default;
+
+    ~PipelineTaskAction() override = default;
+
+    void handle(HttpRequest* req) override;
+};
+
+} // end namespace doris
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 46364528458..68f34ffc820 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -90,6 +90,13 @@ bool ExchangeSinkBuffer<Parent>::can_write() const {
     return total_package_size <= max_package_size;
 }
 
+template <typename Parent>
+void ExchangeSinkBuffer<Parent>::_set_ready_to_finish(bool all_done) {
+    if (_finish_dependency && _should_stop && all_done) {
+        _finish_dependency->set_ready_to_finish();
+    }
+}
+
 template <typename Parent>
 bool ExchangeSinkBuffer<Parent>::is_pending_finish() {
     //note(wb) angly implementation here, because operator couples the 
scheduling logic
@@ -160,9 +167,6 @@ Status 
ExchangeSinkBuffer<Parent>::add_block(TransmitInfo<Parent>&& request) {
             send_now = true;
             _rpc_channel_is_idle[ins_id.lo] = false;
             _busy_channels++;
-            if (_finish_dependency) {
-                _finish_dependency->block_finishing();
-            }
         }
         _instance_to_package_queue[ins_id.lo].emplace(std::move(request));
         _total_queue_size++;
@@ -196,9 +200,6 @@ Status 
ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&& req
             send_now = true;
             _rpc_channel_is_idle[ins_id.lo] = false;
             _busy_channels++;
-            if (_finish_dependency) {
-                _finish_dependency->block_finishing();
-            }
         }
         _instance_to_broadcast_package_queue[ins_id.lo].emplace(request);
     }
@@ -222,10 +223,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
 
     if (_is_finishing) {
         _rpc_channel_is_idle[id] = true;
-        _busy_channels--;
-        if (_finish_dependency && _busy_channels == 0) {
-            _finish_dependency->set_ready_to_finish();
-        }
+        _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
         return Status::OK();
     }
 
@@ -361,10 +359,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
         broadcast_q.pop();
     } else {
         _rpc_channel_is_idle[id] = true;
-        _busy_channels--;
-        if (_finish_dependency && _busy_channels == 0) {
-            _finish_dependency->set_ready_to_finish();
-        }
+        _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
     }
 
     return Status::OK();
@@ -396,11 +391,8 @@ void ExchangeSinkBuffer<Parent>::_ended(InstanceLoId id) {
     } else {
         std::unique_lock<std::mutex> 
lock(*_instance_to_package_queue_mutex[id]);
         if (!_rpc_channel_is_idle[id]) {
-            _busy_channels--;
             _rpc_channel_is_idle[id] = true;
-            if (_finish_dependency && _busy_channels == 0) {
-                _finish_dependency->set_ready_to_finish();
-            }
+            _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
         }
     }
 }
@@ -417,11 +409,8 @@ void 
ExchangeSinkBuffer<Parent>::_set_receiver_eof(InstanceLoId id) {
     std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
     _instance_to_receiver_eof[id] = true;
     if (!_rpc_channel_is_idle[id]) {
-        _busy_channels--;
         _rpc_channel_is_idle[id] = true;
-        if (_finish_dependency && _busy_channels == 0) {
-            _finish_dependency->set_ready_to_finish();
-        }
+        _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
     }
 }
 
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 9111f553b27..edcfa80bc28 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -195,7 +195,14 @@ public:
     }
     void set_query_statistics(QueryStatistics* statistics) { _statistics = 
statistics; }
 
+    void set_should_stop() {
+        _should_stop = true;
+        _set_ready_to_finish(_busy_channels == 0);
+    }
+
 private:
+    void _set_ready_to_finish(bool all_done);
+
     phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
             _instance_to_package_queue_mutex;
     // store data in non-broadcast shuffle
@@ -244,6 +251,7 @@ private:
     std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency = nullptr;
     std::shared_ptr<FinishDependency> _finish_dependency = nullptr;
     QueryStatistics* _statistics = nullptr;
+    std::atomic<bool> _should_stop {false};
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 25418492954..f5518a1553e 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -173,13 +173,16 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
 
     register_channels(_sink_buffer.get());
 
-    _exchange_sink_dependency = 
AndDependency::create_shared(_parent->operator_id());
-    _queue_dependency = 
ExchangeSinkQueueDependency::create_shared(_parent->operator_id());
+    _exchange_sink_dependency =
+            AndDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
+    _queue_dependency =
+            ExchangeSinkQueueDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
     _sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
     _exchange_sink_dependency->add_child(_queue_dependency);
     if ((p._part_type == TPartitionType::UNPARTITIONED || channels.size() == 
1) &&
         !only_local_exchange) {
-        _broadcast_dependency = 
BroadcastDependency::create_shared(_parent->operator_id());
+        _broadcast_dependency =
+                BroadcastDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
         
_broadcast_dependency->set_available_block(config::num_broadcast_buffer);
         _broadcast_pb_blocks.reserve(config::num_broadcast_buffer);
         for (size_t i = 0; i < config::num_broadcast_buffer; i++) {
@@ -194,7 +197,8 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
         size_t dep_id = 0;
         _local_channels_dependency.resize(local_size);
         _wait_channel_timer.resize(local_size);
-        auto deps_for_channels = 
AndDependency::create_shared(_parent->operator_id());
+        auto deps_for_channels =
+                AndDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
         for (auto channel : channels) {
             if (channel->is_local()) {
                 _local_channels_dependency[dep_id] = 
channel->get_local_channel_dependency();
@@ -225,6 +229,8 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
                                   fmt::format("Crc32HashPartitioner({})", 
_partition_count));
     }
 
+    _finish_dependency->should_finish_after_check();
+
     return Status::OK();
 }
 
@@ -506,6 +512,7 @@ Status ExchangeSinkOperatorX::try_close(RuntimeState* 
state, Status exec_status)
             final_st = st;
         }
     }
+    local_state._sink_buffer->set_should_stop();
     return final_st;
 }
 
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 6b9d3b5e4b1..9dc670cd668 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -67,7 +67,8 @@ private:
 class ExchangeSinkQueueDependency final : public WriteDependency {
 public:
     ENABLE_FACTORY_CREATOR(ExchangeSinkQueueDependency);
-    ExchangeSinkQueueDependency(int id) : WriteDependency(id, 
"ResultQueueDependency") {}
+    ExchangeSinkQueueDependency(int id, int node_id)
+            : WriteDependency(id, node_id, "ResultQueueDependency") {}
     ~ExchangeSinkQueueDependency() override = default;
 
     void* shared_state() override { return nullptr; }
@@ -76,23 +77,23 @@ public:
 class BroadcastDependency final : public WriteDependency {
 public:
     ENABLE_FACTORY_CREATOR(BroadcastDependency);
-    BroadcastDependency(int id) : WriteDependency(id, "BroadcastDependency"), 
_available_block(0) {}
+    BroadcastDependency(int id, int node_id)
+            : WriteDependency(id, node_id, "BroadcastDependency"), 
_available_block(0) {}
     ~BroadcastDependency() override = default;
 
-    [[nodiscard]] WriteDependency* write_blocked_by() override {
-        if (config::enable_fuzzy_mode && _available_block == 0 &&
-            _should_log(_write_dependency_watcher.elapsed_time())) {
-            LOG(WARNING) << "========Dependency may be blocked by some 
reasons: " << name() << " "
-                         << id();
-        }
-        return _available_block > 0 ? nullptr : this;
-    }
-
     void set_available_block(int available_block) { _available_block = 
available_block; }
 
-    void return_available_block() { _available_block++; }
+    void return_available_block() {
+        _available_block++;
+        WriteDependency::set_ready_for_write();
+    }
 
-    void take_available_block() { _available_block--; }
+    void take_available_block() {
+        auto old_vale = _available_block.fetch_sub(1);
+        if (old_vale == 1) {
+            WriteDependency::block_writing();
+        }
+    }
 
     void* shared_state() override {
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not 
reach here!");
@@ -134,25 +135,11 @@ private:
 class LocalExchangeChannelDependency final : public WriteDependency {
 public:
     ENABLE_FACTORY_CREATOR(LocalExchangeChannelDependency);
-    LocalExchangeChannelDependency(int id, std::shared_ptr<bool> mem_available)
-            : WriteDependency(id, "LocalExchangeChannelDependency"),
-              _mem_available(mem_available) {}
+    LocalExchangeChannelDependency(int id, int node_id)
+            : WriteDependency(id, node_id, "LocalExchangeChannelDependency") {}
     ~LocalExchangeChannelDependency() override = default;
-
-    WriteDependency* write_blocked_by() override {
-        if (config::enable_fuzzy_mode && !_is_runnable() &&
-            _should_log(_write_dependency_watcher.elapsed_time())) {
-            LOG(WARNING) << "========Dependency may be blocked by some 
reasons: " << name() << " "
-                         << id();
-        }
-        return _is_runnable() ? nullptr : this;
-    }
-
     void* shared_state() override { return nullptr; }
-
-private:
-    bool _is_runnable() const { return _ready_for_write || *_mem_available; }
-    std::shared_ptr<bool> _mem_available;
+    // TODO(gabriel): blocked by memory
 };
 
 class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 3213ed55778..4a29694a37f 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -51,12 +51,13 @@ Status ExchangeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
             state, p.input_row_desc(), state->fragment_instance_id(), 
p.node_id(), p.num_senders(),
             profile(), p.is_merging(), p.sub_plan_query_statistics_recvr());
-    source_dependency = AndDependency::create_shared(_parent->operator_id());
+    source_dependency = AndDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
     const auto& queues = stream_recvr->sender_queues();
     deps.resize(queues.size());
     metrics.resize(queues.size());
     for (size_t i = 0; i < queues.size(); i++) {
-        deps[i] = 
ExchangeDataDependency::create_shared(_parent->operator_id(), queues[i]);
+        deps[i] = 
ExchangeDataDependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                        queues[i]);
         queues[i]->set_dependency(deps[i]);
         source_dependency->add_child(deps[i]);
     }
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index c41268f8eac..5d754747bee 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -53,8 +53,9 @@ public:
 struct ExchangeDataDependency final : public Dependency {
 public:
     ENABLE_FACTORY_CREATOR(ExchangeDataDependency);
-    ExchangeDataDependency(int id, vectorized::VDataStreamRecvr::SenderQueue* 
sender_queue)
-            : Dependency(id, "DataDependency"), _always_done(false) {}
+    ExchangeDataDependency(int id, int node_id,
+                           vectorized::VDataStreamRecvr::SenderQueue* 
sender_queue)
+            : Dependency(id, node_id, "DataDependency"), _always_done(false) {}
     void* shared_state() override { return nullptr; }
 
     void set_always_done() {
@@ -70,7 +71,7 @@ public:
         if (_always_done) {
             return;
         }
-        _ready_for_read = false;
+        Dependency::block_reading();
     }
 
 private:
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index cb63f64ab42..3d9827a27b4 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -49,7 +49,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     _shared_hash_table_dependency =
-            SharedHashTableDependency::create_shared(_parent->operator_id());
+            SharedHashTableDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
     auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
     _shared_state->join_op_variants = p._join_op_variants;
     if (p._is_broadcast_join && 
state->enable_share_hash_table_for_broadcast_join()) {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 10056a30e72..a1815ca5118 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -49,7 +49,8 @@ class HashJoinBuildSinkOperatorX;
 class SharedHashTableDependency final : public WriteDependency {
 public:
     ENABLE_FACTORY_CREATOR(SharedHashTableDependency);
-    SharedHashTableDependency(int id) : WriteDependency(id, 
"SharedHashTableDependency") {}
+    SharedHashTableDependency(int id, int node_id)
+            : WriteDependency(id, node_id, "SharedHashTableDependency") {}
     ~SharedHashTableDependency() override = default;
 
     void* shared_state() override { return nullptr; }
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 4c6e21c5c6d..26eee2161c9 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -138,7 +138,8 @@ Status 
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     auto& p = _parent->cast<Parent>();
-    
static_cast<MultiCastDependency*>(_dependency)->set_consumer_id(p._consumer_id);
+    _shared_state->multi_cast_data_streamer.set_dep_by_sender_idx(
+            p._consumer_id, static_cast<MultiCastDependency*>(_dependency));
     _output_expr_contexts.resize(p._output_expr_contexts.size());
     for (size_t i = 0; i < p._output_expr_contexts.size(); i++) {
         RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, 
_output_expr_contexts[i]));
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp 
b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
index 3929c6ced09..8d7a745a042 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
@@ -17,6 +17,7 @@
 
 #include "multi_cast_data_streamer.h"
 
+#include "pipeline/pipeline_x/dependency.h"
 #include "runtime/runtime_state.h"
 
 namespace doris::pipeline {
@@ -46,6 +47,9 @@ void MultiCastDataStreamer::pull(int sender_idx, 
doris::vectorized::Block* block
         }
     }
     *eos = _eos and pos_to_pull == _multi_cast_blocks.end();
+    if (pos_to_pull == _multi_cast_blocks.end()) {
+        _block_reading(sender_idx);
+    }
 }
 
 void MultiCastDataStreamer::close_sender(int sender_idx) {
@@ -63,6 +67,7 @@ void MultiCastDataStreamer::close_sender(int sender_idx) {
         }
     }
     _closed_sender_count++;
+    _block_reading(sender_idx);
 }
 
 Status MultiCastDataStreamer::push(RuntimeState* state, 
doris::vectorized::Block* block, bool eos) {
@@ -87,10 +92,36 @@ Status MultiCastDataStreamer::push(RuntimeState* state, 
doris::vectorized::Block
     for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
         if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
             _sender_pos_to_read[i] = end;
+            _set_ready_for_read(i);
         }
     }
     _eos = eos;
     return Status::OK();
 }
 
+void MultiCastDataStreamer::_set_ready_for_read(int sender_idx) {
+    if (_dependencies.empty()) {
+        return;
+    }
+    auto* dep = _dependencies[sender_idx];
+    DCHECK(dep);
+    dep->set_ready_for_read();
+}
+
+void MultiCastDataStreamer::_set_ready_for_read() {
+    for (auto* dep : _dependencies) {
+        DCHECK(dep);
+        dep->set_ready_for_read();
+    }
+}
+
+void MultiCastDataStreamer::_block_reading(int sender_idx) {
+    if (_dependencies.empty()) {
+        return;
+    }
+    auto* dep = _dependencies[sender_idx];
+    DCHECK(dep);
+    dep->block_reading();
+}
+
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h 
b/be/src/pipeline/exec/multi_cast_data_streamer.h
index 92c0e24079e..4d03fe53b80 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.h
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.h
@@ -21,6 +21,7 @@
 
 namespace doris::pipeline {
 
+class MultiCastDependency;
 struct MultiCastBlock {
     MultiCastBlock(vectorized::Block* block, int used_count, size_t mem_size);
 
@@ -33,11 +34,16 @@ struct MultiCastBlock {
 // code
 class MultiCastDataStreamer {
 public:
-    MultiCastDataStreamer(const RowDescriptor& row_desc, ObjectPool* pool, int 
cast_sender_count)
+    MultiCastDataStreamer(const RowDescriptor& row_desc, ObjectPool* pool, int 
cast_sender_count,
+                          bool with_dependencies = false)
             : _row_desc(row_desc),
               _profile(pool->add(new 
RuntimeProfile("MultiCastDataStreamSink"))),
               _cast_sender_count(cast_sender_count) {
         _sender_pos_to_read.resize(cast_sender_count, 
_multi_cast_blocks.end());
+        if (with_dependencies) {
+            _dependencies.resize(cast_sender_count, nullptr);
+        }
+
         _peak_mem_usage = ADD_COUNTER(profile(), "PeakMemUsage", TUnit::BYTES);
         _process_rows = ADD_COUNTER(profile(), "ProcessRows", TUnit::UNIT);
     };
@@ -65,9 +71,19 @@ public:
     void set_eos() {
         std::lock_guard l(_mutex);
         _eos = true;
+        _set_ready_for_read();
+    }
+
+    void set_dep_by_sender_idx(int sender_idx, MultiCastDependency* dep) {
+        _dependencies[sender_idx] = dep;
+        _block_reading(sender_idx);
     }
 
 private:
+    void _set_ready_for_read(int sender_idx);
+    void _set_ready_for_read();
+    void _block_reading(int sender_idx);
+
     const RowDescriptor& _row_desc;
     RuntimeProfile* _profile;
     std::list<MultiCastBlock> _multi_cast_blocks;
@@ -80,5 +96,7 @@ private:
 
     RuntimeProfile::Counter* _process_rows;
     RuntimeProfile::Counter* _peak_mem_usage;
+
+    std::vector<MultiCastDependency*> _dependencies;
 };
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h 
b/be/src/pipeline/exec/partition_sort_source_operator.h
index 40b802c35e6..9720e55efa7 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.h
+++ b/be/src/pipeline/exec/partition_sort_source_operator.h
@@ -66,7 +66,7 @@ private:
     friend class PartitionSortSourceOperatorX;
     RuntimeProfile::Counter* _get_sorted_timer;
     RuntimeProfile::Counter* _get_next_timer;
-    int _sort_idx = 0;
+    std::atomic<int> _sort_idx = 0;
 };
 
 class PartitionSortSourceOperatorX final : public 
OperatorX<PartitionSortSourceLocalState> {
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index b65a5ba1b86..e27b0da1d32 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -65,12 +65,16 @@ Status ResultSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info)
     RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
             state->fragment_instance_id(), 
vectorized::RESULT_SINK_BUFFER_SIZE, &_sender, true,
             state->execution_timeout()));
-    _result_sink_dependency = 
OrDependency::create_shared(_parent->operator_id());
-    _buffer_dependency = 
ResultBufferDependency::create_shared(_parent->operator_id());
-    _cancel_dependency = 
CancelDependency::create_shared(_parent->operator_id());
+    _result_sink_dependency =
+            OrDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
+    _buffer_dependency =
+            ResultBufferDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
+    _cancel_dependency =
+            CancelDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
     _result_sink_dependency->add_child(_cancel_dependency);
     _result_sink_dependency->add_child(_buffer_dependency);
-    _queue_dependency = 
ResultQueueDependency::create_shared(_parent->operator_id());
+    _queue_dependency =
+            ResultQueueDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
     _result_sink_dependency->add_child(_queue_dependency);
 
     ((PipBufferControlBlock*)_sender.get())
diff --git a/be/src/pipeline/exec/result_sink_operator.h 
b/be/src/pipeline/exec/result_sink_operator.h
index 311d2c70673..e117f55d25c 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -46,7 +46,8 @@ public:
 class ResultBufferDependency final : public WriteDependency {
 public:
     ENABLE_FACTORY_CREATOR(ResultBufferDependency);
-    ResultBufferDependency(int id) : WriteDependency(id, 
"ResultBufferDependency") {}
+    ResultBufferDependency(int id, int node_id)
+            : WriteDependency(id, node_id, "ResultBufferDependency") {}
     ~ResultBufferDependency() override = default;
 
     void* shared_state() override { return nullptr; }
@@ -55,7 +56,8 @@ public:
 class ResultQueueDependency final : public WriteDependency {
 public:
     ENABLE_FACTORY_CREATOR(ResultQueueDependency);
-    ResultQueueDependency(int id) : WriteDependency(id, 
"ResultQueueDependency") {}
+    ResultQueueDependency(int id, int node_id)
+            : WriteDependency(id, node_id, "ResultQueueDependency") {}
     ~ResultQueueDependency() override = default;
 
     void* shared_state() override { return nullptr; }
@@ -64,7 +66,9 @@ public:
 class CancelDependency final : public WriteDependency {
 public:
     ENABLE_FACTORY_CREATOR(CancelDependency);
-    CancelDependency(int id) : WriteDependency(id, "CancelDependency") { 
_ready_for_write = false; }
+    CancelDependency(int id, int node_id) : WriteDependency(id, node_id, 
"CancelDependency") {
+        _ready_for_write = false;
+    }
     ~CancelDependency() override = default;
 
     void* shared_state() override { return nullptr; }
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 601bf5b8b9f..d2953d3593b 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -122,11 +122,11 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, 
LocalStateInfo& info)
     SCOPED_TIMER(_open_timer);
     RETURN_IF_ERROR(RuntimeFilterConsumer::init(state));
 
-    _source_dependency = 
OrDependency::create_shared(PipelineXLocalState<>::_parent->operator_id());
+    _source_dependency = 
OrDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(),
+                                                     
PipelineXLocalState<>::_parent->node_id());
 
-    _open_dependency = 
OpenDependency::create_shared(PipelineXLocalState<>::_parent->operator_id());
-    _source_dependency->add_child(_open_dependency);
-    _eos_dependency = 
EosDependency::create_shared(PipelineXLocalState<>::_parent->operator_id());
+    _eos_dependency = 
EosDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(),
+                                                   
PipelineXLocalState<>::_parent->node_id());
     _source_dependency->add_child(_eos_dependency);
     auto& p = _parent->cast<typename Derived::Parent>();
     set_scan_ranges(state, info.scan_ranges);
@@ -168,7 +168,7 @@ template <typename Derived>
 Status ScanLocalState<Derived>::open(RuntimeState* state) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
-    if (_open_dependency == nullptr) {
+    if (_opened) {
         return Status::OK();
     }
     RETURN_IF_ERROR(_acquire_runtime_filter());
@@ -177,12 +177,12 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) 
{
     auto status =
             _eos_dependency->read_blocked_by() == nullptr ? Status::OK() : 
_prepare_scanners();
     if (_scanner_ctx) {
+        _finish_dependency->should_finish_after_check();
         DCHECK(_eos_dependency->read_blocked_by() != nullptr && 
_num_scanners->value() > 0);
         RETURN_IF_ERROR(_scanner_ctx->init());
         
RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get()));
     }
-    _source_dependency->remove_first_child();
-    _open_dependency = nullptr;
+    _opened = true;
     return status;
 }
 
@@ -1181,11 +1181,10 @@ Status ScanLocalState<Derived>::_start_scanners(
     _scanner_ctx = PipScannerContext::create_shared(state(), this, 
p._output_tuple_desc, scanners,
                                                     p.limit(), 
state()->scan_queue_mem_limit(),
                                                     p._col_distribute_ids, 1);
-    _scanner_done_dependency =
-            ScannerDoneDependency::create_shared(p.operator_id(), 
_scanner_ctx.get());
+    _scanner_done_dependency = 
ScannerDoneDependency::create_shared(p.operator_id(), p.node_id());
     _source_dependency->add_child(_scanner_done_dependency);
     _data_ready_dependency =
-            DataReadyDependency::create_shared(p.operator_id(), 
_scanner_ctx.get());
+            DataReadyDependency::create_shared(p.operator_id(), p.node_id(), 
_scanner_ctx.get());
     _source_dependency->add_child(_data_ready_dependency);
 
     _scanner_ctx->set_dependency(_data_ready_dependency, 
_scanner_done_dependency,
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 68d006006f6..66543dc7ffd 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -56,58 +56,34 @@ public:
     Status try_close(RuntimeState* state) override;
 };
 
-struct OpenDependency final : public Dependency {
-public:
-    ENABLE_FACTORY_CREATOR(OpenDependency);
-    OpenDependency(int id) : Dependency(id, "OpenDependency") {}
-    void* shared_state() override { return nullptr; }
-    [[nodiscard]] Dependency* read_blocked_by() override { return nullptr; }
-    [[nodiscard]] int64_t read_watcher_elapse_time() override { return 0; }
-};
-
 class EosDependency final : public Dependency {
 public:
     ENABLE_FACTORY_CREATOR(EosDependency);
-    EosDependency(int id) : Dependency(id, "EosDependency") {}
+    EosDependency(int id, int node_id) : Dependency(id, node_id, 
"EosDependency") {}
     void* shared_state() override { return nullptr; }
 };
 
 class ScannerDoneDependency final : public Dependency {
 public:
     ENABLE_FACTORY_CREATOR(ScannerDoneDependency);
-    ScannerDoneDependency(int id, vectorized::ScannerContext* scanner_ctx)
-            : Dependency(id, "ScannerDoneDependency"), 
_scanner_ctx(scanner_ctx) {}
+    ScannerDoneDependency(int id, int node_id) : Dependency(id, node_id, 
"ScannerDoneDependency") {}
     void* shared_state() override { return nullptr; }
-    [[nodiscard]] Dependency* read_blocked_by() override {
-        return _scanner_ctx->done() ? nullptr : this;
-    }
-    void set_ready_for_read() override {
-        // ScannerContext is set done outside this function now and only stop 
watcher here.
-        _read_dependency_watcher.stop();
-    }
-
-private:
-    vectorized::ScannerContext* _scanner_ctx;
 };
 
 class DataReadyDependency final : public Dependency {
 public:
     ENABLE_FACTORY_CREATOR(DataReadyDependency);
-    DataReadyDependency(int id, vectorized::ScannerContext* scanner_ctx)
-            : Dependency(id, "DataReadyDependency"), _scanner_ctx(scanner_ctx) 
{}
+    DataReadyDependency(int id, int node_id, vectorized::ScannerContext* 
scanner_ctx)
+            : Dependency(id, node_id, "DataReadyDependency"), 
_scanner_ctx(scanner_ctx) {}
 
     void* shared_state() override { return nullptr; }
 
-    [[nodiscard]] Dependency* read_blocked_by() override {
+    // TODO(gabriel):
+    [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
         if (_scanner_ctx->get_num_running_scanners() == 0 && 
_scanner_ctx->should_be_scheduled()) {
             _scanner_ctx->reschedule_scanner_ctx();
         }
-        if (config::enable_fuzzy_mode && !_ready_for_read &&
-            _should_log(_read_dependency_watcher.elapsed_time())) {
-            LOG(WARNING) << "========Dependency may be blocked by some 
reasons: " << name() << " "
-                         << id();
-        }
-        return _ready_for_read ? nullptr : this;
+        return Dependency::read_blocked_by(task);
     }
 
 private:
@@ -151,7 +127,7 @@ protected:
 
     virtual Status _init_profile() = 0;
 
-    std::shared_ptr<OpenDependency> _open_dependency;
+    std::atomic<bool> _opened {false};
     std::shared_ptr<EosDependency> _eos_dependency;
     std::shared_ptr<OrDependency> _source_dependency;
     std::shared_ptr<ScannerDoneDependency> _scanner_done_dependency;
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp 
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index 36503df2db8..de9b48362c3 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -72,6 +72,7 @@ template class SetProbeSinkOperator<false>;
 
 template <bool is_intersect>
 Status SetProbeSinkOperatorX<is_intersect>::init(const TPlanNode& tnode, 
RuntimeState* state) {
+    DataSinkOperatorX<SetProbeSinkLocalState<is_intersect>>::_name = 
"SET_PROBE_SINK_OPERATOR";
     const std::vector<std::vector<TExpr>>* result_texpr_lists;
 
     // Create result_expr_ctx_lists_ from thrift exprs.
@@ -109,11 +110,6 @@ Status 
SetProbeSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
 
-    if (_cur_child_id > 1) {
-        
CHECK(local_state._shared_state->probe_finished_children_index[_cur_child_id - 
1])
-                << fmt::format("child with id: {} should be probed first", 
_cur_child_id);
-    }
-
     auto probe_rows = in_block->rows();
     if (probe_rows > 0) {
         RETURN_IF_ERROR(_extract_probe_column(local_state, *in_block, 
local_state._probe_columns,
@@ -202,7 +198,6 @@ void SetProbeSinkOperatorX<is_intersect>::_finalize_probe(
         SetProbeSinkLocalState<is_intersect>& local_state) {
     auto& valid_element_in_hash_tbl = 
local_state._shared_state->valid_element_in_hash_tbl;
     auto& hash_table_variants = local_state._shared_state->hash_table_variants;
-    auto& probe_finished_children_index = 
local_state._shared_state->probe_finished_children_index;
 
     if (_cur_child_id != (local_state._shared_state->child_quantity - 1)) {
         _refresh_hash_table(local_state);
@@ -223,7 +218,7 @@ void SetProbeSinkOperatorX<is_intersect>::_finalize_probe(
     } else {
         local_state._dependency->set_ready_for_read();
     }
-    probe_finished_children_index[_cur_child_id] = true;
+    local_state._shared_state->set_probe_finished_children(_cur_child_id);
 }
 
 template <bool is_intersect>
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp 
b/be/src/pipeline/exec/set_sink_operator.cpp
index 90cc792d471..52bd8aa3cf7 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -90,7 +90,7 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* 
state, vectorized::Blo
                         },
                         *local_state._shared_state->hash_table_variants);
             }
-            
local_state._shared_state->probe_finished_children_index[_cur_child_id] = true;
+            
local_state._shared_state->set_probe_finished_children(_cur_child_id);
             if (_child_quantity == 1) {
                 local_state._dependency->set_ready_for_read();
             }
@@ -171,7 +171,6 @@ Status SetSinkLocalState<is_intersect>::init(RuntimeState* 
state, LocalSinkState
     }
 
     _shared_state->child_quantity = parent._child_quantity;
-    
_shared_state->probe_finished_children_index.assign(parent._child_quantity, 
false);
 
     auto& child_exprs_lists = _shared_state->child_exprs_lists;
     DCHECK(child_exprs_lists.size() == 0 || child_exprs_lists.size() == 
parent._child_quantity);
@@ -192,6 +191,7 @@ Status SetSinkLocalState<is_intersect>::init(RuntimeState* 
state, LocalSinkState
 
 template <bool is_intersect>
 Status SetSinkOperatorX<is_intersect>::init(const TPlanNode& tnode, 
RuntimeState* state) {
+    Base::_name = "SET_SINK_OPERATOR";
     const std::vector<std::vector<TExpr>>* result_texpr_lists;
 
     // Create result_expr_ctx_lists_ from thrift exprs.
diff --git a/be/src/pipeline/exec/set_source_operator.cpp 
b/be/src/pipeline/exec/set_source_operator.cpp
index d3840285c38..6e5e0140365 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -52,8 +52,8 @@ template class SetSourceOperator<false>;
 template <bool is_intersect>
 Status SetSourceLocalState<is_intersect>::init(RuntimeState* state, 
LocalStateInfo& info) {
     std::shared_ptr<typename SetDependency::SharedState> ss = nullptr;
-    ss.reset(new typename SetDependency::SharedState());
     auto& deps = info.dependencys;
+    ss.reset(new typename SetDependency::SharedState(deps.size()));
     for (auto& dep : deps) {
         ((SetDependency*)dep.get())->set_shared_state(ss);
     }
diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index 8e5179734cb..d1515faa1b7 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -114,7 +114,8 @@ Status UnionSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
         DCHECK(deps.size() == 1);
         DCHECK(deps.front() == nullptr);
         //child_count == 0 , we need to creat a  UnionDependency
-        deps.front() = 
std::make_shared<UnionDependency>(_parent->operator_id());
+        deps.front() =
+                std::make_shared<UnionDependency>(_parent->operator_id(), 
_parent->node_id());
         ((UnionDependency*)deps.front().get())->set_shared_state(ss);
     }
     RETURN_IF_ERROR(Base::init(state, info));
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 9610122bc02..be4fd6c5aea 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -126,7 +126,8 @@ PipelineFragmentContext::PipelineFragmentContext(
           _call_back(call_back),
           _is_report_on_cancel(true),
           _report_status_cb(report_status_cb),
-          _group_commit(group_commit) {
+          _group_commit(group_commit),
+          _create_time(MonotonicNanos()) {
     if (_query_ctx->get_task_group()) {
         _task_group_entity = _query_ctx->get_task_group()->task_entity();
     }
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 85834e513f9..aa2f139c507 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -109,8 +109,6 @@ public:
 
     void close_a_pipeline();
 
-    std::string to_http_path(const std::string& file_name);
-
     void set_merge_controller_handler(
             std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
         _merge_controller_handler = handler;
@@ -145,6 +143,10 @@ public:
     }
     void refresh_next_report_time();
 
+    virtual std::string debug_string() { return ""; }
+
+    uint64_t create_time() const { return _create_time; }
+
 protected:
     Status _create_sink(int sender_id, const TDataSink& t_data_sink, 
RuntimeState* state);
     Status _build_pipelines(ExecNode*, PipelinePtr);
@@ -222,6 +224,8 @@ private:
     static bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
     std::vector<std::unique_ptr<PipelineTask>> _tasks;
     bool _group_commit;
+
+    uint64_t _create_time;
 };
 } // namespace pipeline
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 690c4e3419d..947f3418802 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -251,6 +251,11 @@ public:
 
     void set_parent_profile(RuntimeProfile* profile) { _parent_profile = 
profile; }
 
+    virtual bool is_pipelineX() const { return false; }
+
+    bool is_running() { return _running.load(); }
+    void set_running(bool running) { _running = running; }
+
 protected:
     void _finish_p_dependency() {
         for (const auto& p : _pipeline->_parents) {
@@ -359,5 +364,7 @@ private:
     OperatorPtr _source;
     OperatorPtr _root;
     OperatorPtr _sink;
+
+    std::atomic<bool> _running {false};
 };
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp 
b/be/src/pipeline/pipeline_x/dependency.cpp
index 32bd06f5983..a06fadef03b 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -21,10 +21,177 @@
 #include <mutex>
 
 #include "common/logging.h"
+#include "pipeline/pipeline_fragment_context.h"
+#include "pipeline/pipeline_task.h"
+#include "pipeline/pipeline_x/pipeline_x_task.h"
 #include "runtime/memory/mem_tracker.h"
 
 namespace doris::pipeline {
 
+void Dependency::add_block_task(PipelineXTask* task) {
+    // TODO(gabriel): support read dependency
+    if (!_blocked_task.empty() && _blocked_task[_blocked_task.size() - 1] == 
task) {
+        return;
+    }
+    _blocked_task.push_back(task);
+}
+
+void WriteDependency::add_write_block_task(PipelineXTask* task) {
+    DCHECK(_write_blocked_task.empty() ||
+           _write_blocked_task[_write_blocked_task.size() - 1] != task)
+            << "Duplicate task: " << task->debug_string();
+    _write_blocked_task.push_back(task);
+}
+
+void FinishDependency::add_block_task(PipelineXTask* task) {
+    DCHECK(_finish_blocked_task.empty() ||
+           _finish_blocked_task[_finish_blocked_task.size() - 1] != task)
+            << "Duplicate task: " << task->debug_string();
+    _finish_blocked_task.push_back(task);
+}
+
+void RuntimeFilterDependency::add_block_task(PipelineXTask* task) {
+    DCHECK(_filter_blocked_task.empty() ||
+           _filter_blocked_task[_filter_blocked_task.size() - 1] != task)
+            << "Duplicate task: " << task->debug_string();
+    DCHECK(_blocked_by_rf) << "It is not allowed: task: " << 
task->debug_string()
+                           << " \n dependency: " << debug_string()
+                           << " \n state: " << 
get_state_name(task->get_state());
+    _filter_blocked_task.push_back(task);
+}
+
+void Dependency::set_ready_for_read() {
+    if (_ready_for_read) {
+        return;
+    }
+    _read_dependency_watcher.stop();
+    std::vector<PipelineXTask*> local_block_task {};
+    {
+        std::unique_lock<std::mutex> lc(_task_lock);
+        if (_ready_for_read) {
+            return;
+        }
+        _ready_for_read = true;
+        local_block_task.swap(_blocked_task);
+    }
+}
+
+void WriteDependency::set_ready_for_write() {
+    if (_ready_for_write) {
+        return;
+    }
+    _write_dependency_watcher.stop();
+
+    std::vector<PipelineXTask*> local_block_task {};
+    {
+        std::unique_lock<std::mutex> lc(_task_lock);
+        if (_ready_for_write) {
+            return;
+        }
+        _ready_for_write = true;
+        local_block_task.swap(_write_blocked_task);
+    }
+    for (auto* task : local_block_task) {
+        task->try_wake_up(this);
+    }
+}
+
+void FinishDependency::set_ready_to_finish() {
+    if (_ready_to_finish) {
+        return;
+    }
+    _finish_dependency_watcher.stop();
+
+    std::vector<PipelineXTask*> local_block_task {};
+    {
+        std::unique_lock<std::mutex> lc(_task_lock);
+        if (_ready_to_finish) {
+            return;
+        }
+        _ready_to_finish = true;
+        local_block_task.swap(_finish_blocked_task);
+    }
+    for (auto* task : local_block_task) {
+        task->try_wake_up(this);
+    }
+}
+
+Dependency* Dependency::read_blocked_by(PipelineXTask* task) {
+    if (config::enable_fuzzy_mode && !_ready_for_read &&
+        _should_log(_read_dependency_watcher.elapsed_time())) {
+        LOG(WARNING) << "========Dependency may be blocked by some reasons: " 
<< name() << " "
+                     << _node_id << " block tasks: " << _blocked_task.size()
+                     << " write block tasks: "
+                     << (is_write_dependency()
+                                 ? 
((WriteDependency*)this)->_write_blocked_task.size()
+                                 : 0)
+                     << " write done: "
+                     << (is_write_dependency() ? 
((WriteDependency*)this)->_ready_for_write.load()
+                                               : true)
+                     << "task: " << (task ? 
task->fragment_context()->debug_string() : "");
+    }
+
+    std::unique_lock<std::mutex> lc(_task_lock);
+    auto ready_for_read = _ready_for_read.load();
+    if (!ready_for_read && task) {
+        add_block_task(task);
+    }
+    return ready_for_read ? nullptr : this;
+}
+
+RuntimeFilterDependency* 
RuntimeFilterDependency::filter_blocked_by(PipelineXTask* task) {
+    if (!_blocked_by_rf) {
+        return nullptr;
+    }
+    std::unique_lock<std::mutex> lc(_task_lock);
+    if (*_blocked_by_rf) {
+        if (LIKELY(task)) {
+            add_block_task(task);
+        }
+        return this;
+    }
+    return nullptr;
+}
+
+FinishDependency* FinishDependency::finish_blocked_by(PipelineXTask* task) {
+    std::unique_lock<std::mutex> lc(_task_lock);
+    if (!_ready_to_finish && task) {
+        add_block_task(task);
+    }
+    return _ready_to_finish ? nullptr : this;
+}
+
+WriteDependency* WriteDependency::write_blocked_by(PipelineXTask* task) {
+    std::unique_lock<std::mutex> lc(_task_lock);
+    const auto ready_for_write = _ready_for_write.load();
+    if (!ready_for_write && task) {
+        add_write_block_task(task);
+    }
+    return ready_for_write ? nullptr : this;
+}
+
+Dependency* OrDependency::read_blocked_by(PipelineXTask* task) {
+    // TODO(gabriel):
+    for (auto& child : _children) {
+        auto* cur_res = child->read_blocked_by(nullptr);
+        if (cur_res == nullptr) {
+            return nullptr;
+        }
+    }
+    return this;
+}
+
+WriteDependency* OrDependency::write_blocked_by(PipelineXTask* task) {
+    for (auto& child : _children) {
+        CHECK(child->is_write_dependency());
+        auto* cur_res = 
((WriteDependency*)child.get())->write_blocked_by(nullptr);
+        if (cur_res == nullptr) {
+            return nullptr;
+        }
+    }
+    return this;
+}
+
 template Status HashJoinDependency::extract_join_column<true>(
         vectorized::Block&,
         
COW<vectorized::IColumn>::mutable_ptr<vectorized::ColumnVector<unsigned char>>&,
@@ -39,17 +206,43 @@ template Status 
HashJoinDependency::extract_join_column<false>(
 
 std::string Dependency::debug_string(int indentation_level) {
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer, "{}{}: id={}, done={}",
-                   std::string(indentation_level * 2, ' '), _name, _id,
-                   read_blocked_by() == nullptr);
+    fmt::format_to(debug_string_buffer, "{}{}: id={}, block task = {}, 
_ready_for_read={}",
+                   std::string(indentation_level * 2, ' '), _name, _node_id, 
_blocked_task.size(),
+                   _ready_for_read);
+    return fmt::to_string(debug_string_buffer);
+}
+
+std::string WriteDependency::debug_string(int indentation_level) {
+    fmt::memory_buffer debug_string_buffer;
+    fmt::format_to(debug_string_buffer,
+                   "{}{}: id={}, read block task = {},write block "
+                   "task = {}, _ready_for_write = {}, _ready_for_read = {}",
+                   std::string(indentation_level * 2, ' '), _name, _node_id, 
_blocked_task.size(),
+                   _write_blocked_task.size(), _ready_for_write, 
_ready_for_read);
+    return fmt::to_string(debug_string_buffer);
+}
+
+std::string FinishDependency::debug_string(int indentation_level) {
+    fmt::memory_buffer debug_string_buffer;
+    fmt::format_to(debug_string_buffer, "{}{}: id={}, block task = {}, 
_ready_to_finish = {}",
+                   std::string(indentation_level * 2, ' '), _name, _node_id,
+                   _finish_blocked_task.size(), _ready_to_finish);
+    return fmt::to_string(debug_string_buffer);
+}
+
+std::string RuntimeFilterDependency::debug_string(int indentation_level) {
+    fmt::memory_buffer debug_string_buffer;
+    fmt::format_to(
+            debug_string_buffer, "{}{}: id={}, block task = {}, _blocked_by_rf 
= {}, _filters = {}",
+            std::string(indentation_level * 2, ' '), _name, _node_id, 
_filter_blocked_task.size(),
+            _blocked_by_rf ? _blocked_by_rf->load() : false, _filters);
     return fmt::to_string(debug_string_buffer);
 }
 
 std::string AndDependency::debug_string(int indentation_level) {
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer, "{}{}: id={}, done={}, children=[",
-                   std::string(indentation_level * 2, ' '), _name, _id,
-                   read_blocked_by() == nullptr);
+    fmt::format_to(debug_string_buffer, "{}{}: id={}, children=[",
+                   std::string(indentation_level * 2, ' '), _name, _node_id);
     for (auto& child : _children) {
         fmt::format_to(debug_string_buffer, "{}, \n", 
child->debug_string(indentation_level = 1));
     }
@@ -59,9 +252,8 @@ std::string AndDependency::debug_string(int 
indentation_level) {
 
 std::string OrDependency::debug_string(int indentation_level) {
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer, "{}{}: id={}, done={}, children=[",
-                   std::string(indentation_level * 2, ' '), _name, _id,
-                   read_blocked_by() == nullptr);
+    fmt::format_to(debug_string_buffer, "{}{}: id={}, children=[",
+                   std::string(indentation_level * 2, ' '), _name, _node_id);
     for (auto& child : _children) {
         fmt::format_to(debug_string_buffer, "{}, \n", 
child->debug_string(indentation_level = 1));
     }
@@ -294,6 +486,12 @@ std::vector<uint16_t> 
HashJoinDependency::convert_block_to_null(vectorized::Bloc
     return results;
 }
 
+void SetSharedState::set_probe_finished_children(int child_id) {
+    if (child_id + 1 < probe_finished_children_dependency.size()) {
+        probe_finished_children_dependency[child_id + 
1]->set_ready_for_write();
+    }
+}
+
 template <bool BuildSide>
 Status HashJoinDependency::extract_join_column(vectorized::Block& block,
                                                
vectorized::ColumnUInt8::MutablePtr& null_map,
@@ -433,9 +631,17 @@ void RuntimeFilterDependency::add_filters(IRuntimeFilter* 
runtime_filter) {
 }
 
 void RuntimeFilterDependency::sub_filters() {
-    _filters--;
-    if (_filters == 0) {
-        *_blocked_by_rf = false;
+    auto value = _filters.fetch_sub(1);
+    if (value == 1) {
+        std::vector<PipelineXTask*> local_block_task {};
+        {
+            std::unique_lock<std::mutex> lc(_task_lock);
+            *_blocked_by_rf = false;
+            local_block_task.swap(_filter_blocked_task);
+        }
+        for (auto* task : local_block_task) {
+            task->try_wake_up(this);
+        }
     }
 }
 
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index e9635253b2b..11a8975b306 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -40,19 +40,22 @@
 #include "vec/exec/vanalytic_eval_node.h"
 #include "vec/exec/vpartition_sort_node.h"
 
-namespace doris {
-namespace pipeline {
+namespace doris::pipeline {
 class Dependency;
+class PipelineXTask;
 using DependencySPtr = std::shared_ptr<Dependency>;
 
-static constexpr auto SLOW_DEPENDENCY_THRESHOLD = 10 * 1000L * 1000L * 1000L;
-static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 5 * 1000L * 1000L * 1000L;
+static constexpr auto SLOW_DEPENDENCY_THRESHOLD = 60 * 1000L * 1000L * 1000L;
+static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 30 * 1000L * 1000L * 1000L;
 static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD);
+
 class Dependency : public std::enable_shared_from_this<Dependency> {
 public:
-    Dependency(int id, std::string name) : _id(id), _name(name), 
_ready_for_read(false) {}
+    Dependency(int id, int node_id, std::string name)
+            : _id(id), _node_id(node_id), _name(std::move(name)), 
_ready_for_read(false) {}
     virtual ~Dependency() = default;
 
+    virtual bool is_or_dep() { return false; }
     [[nodiscard]] int id() const { return _id; }
     [[nodiscard]] virtual std::string name() const { return _name; }
     virtual void* shared_state() = 0;
@@ -72,35 +75,19 @@ public:
     }
 
     // Which dependency current pipeline task is blocked by. `nullptr` if this 
dependency is ready.
-    [[nodiscard]] virtual Dependency* read_blocked_by() {
-        if (config::enable_fuzzy_mode && !_ready_for_read &&
-            _should_log(_read_dependency_watcher.elapsed_time())) {
-            LOG(WARNING) << "========Dependency may be blocked by some 
reasons: " << name() << " "
-                         << id();
-        }
-        return _ready_for_read ? nullptr : this;
-    }
+    [[nodiscard]] virtual Dependency* read_blocked_by(PipelineXTask* task = 
nullptr);
 
     // Notify downstream pipeline tasks this dependency is ready.
-    virtual void set_ready_for_read() {
-        if (_ready_for_read) {
-            return;
-        }
-        _read_dependency_watcher.stop();
-        _ready_for_read = true;
-    }
+    virtual void set_ready_for_read();
 
     // Notify downstream pipeline tasks this dependency is blocked.
     virtual void block_reading() { _ready_for_read = false; }
 
     void set_parent(std::weak_ptr<Dependency> parent) { _parent = parent; }
 
-    void add_child(std::shared_ptr<Dependency> child) {
-        _children.push_back(child);
-        child->set_parent(weak_from_this());
-    }
+    virtual void add_child(std::shared_ptr<Dependency> child) { 
_children.push_back(child); }
 
-    void remove_first_child() { _children.erase(_children.begin()); }
+    virtual void add_block_task(PipelineXTask* task);
 
 protected:
     bool _should_log(uint64_t cur_time) {
@@ -115,6 +102,7 @@ protected:
     }
 
     int _id;
+    const int _node_id;
     std::string _name;
     std::atomic<bool> _ready_for_read;
     MonotonicStopWatch _read_dependency_watcher;
@@ -124,15 +112,16 @@ protected:
     std::list<std::shared_ptr<Dependency>> _children;
 
     uint64_t _last_log_time = 0;
+    std::mutex _task_lock;
+    std::vector<PipelineXTask*> _blocked_task;
 };
 
 class WriteDependency : public Dependency {
 public:
-    WriteDependency(int id, std::string name) : Dependency(id, name), 
_ready_for_write(true) {}
+    WriteDependency(int id, int node_id, std::string name) : Dependency(id, 
node_id, name) {}
     ~WriteDependency() override = default;
 
     bool is_write_dependency() override { return true; }
-
     void start_write_watcher() {
         for (auto& child : _children) {
             CHECK(child->is_write_dependency());
@@ -145,36 +134,30 @@ public:
         return _write_dependency_watcher.elapsed_time();
     }
 
-    [[nodiscard]] virtual WriteDependency* write_blocked_by() {
-        if (config::enable_fuzzy_mode && !_ready_for_write &&
-            _should_log(_write_dependency_watcher.elapsed_time())) {
-            LOG(WARNING) << "========Dependency may be blocked by some 
reasons: " << name() << " "
-                         << id();
-        }
-        return _ready_for_write ? nullptr : this;
-    }
+    [[nodiscard]] virtual WriteDependency* write_blocked_by(PipelineXTask* 
task);
 
-    virtual void set_ready_for_write() {
-        if (_ready_for_write) {
-            return;
-        }
-        _write_dependency_watcher.stop();
-        _ready_for_write = true;
-    }
+    virtual void set_ready_for_write();
 
     virtual void block_writing() { _ready_for_write = false; }
 
+    std::string debug_string(int indentation_level = 0) override;
+    void add_write_block_task(PipelineXTask* task);
+
 protected:
-    std::atomic<bool> _ready_for_write;
+    friend class Dependency;
+    std::atomic<bool> _ready_for_write {true};
     MonotonicStopWatch _write_dependency_watcher;
+
+private:
+    std::vector<PipelineXTask*> _write_blocked_task;
 };
 
 class FinishDependency final : public Dependency {
 public:
-    FinishDependency(int id, int node_id, std::string name)
-            : Dependency(id, name), _ready_to_finish(true), _node_id(node_id) 
{}
+    FinishDependency(int id, int node_id, std::string name) : Dependency(id, 
node_id, name) {}
     ~FinishDependency() override = default;
 
+    void should_finish_after_check() { _ready_to_finish = false; }
     void start_finish_watcher() {
         for (auto& child : _children) {
             ((FinishDependency*)child.get())->start_finish_watcher();
@@ -186,31 +169,19 @@ public:
         return _finish_dependency_watcher.elapsed_time();
     }
 
-    [[nodiscard]] FinishDependency* finish_blocked_by() {
-        if (config::enable_fuzzy_mode && !_ready_to_finish &&
-            _should_log(_finish_dependency_watcher.elapsed_time())) {
-            LOG(WARNING) << "========Dependency may be blocked by some 
reasons: " << name() << " "
-                         << _node_id;
-        }
-        return _ready_to_finish ? nullptr : this;
-    }
-
-    void set_ready_to_finish() {
-        if (_ready_to_finish) {
-            return;
-        }
-        _finish_dependency_watcher.stop();
-        _ready_to_finish = true;
-    }
+    [[nodiscard]] FinishDependency* finish_blocked_by(PipelineXTask* task);
 
-    void block_finishing() { _ready_to_finish = false; }
+    void set_ready_to_finish();
 
     void* shared_state() override { return nullptr; }
+    std::string debug_string(int indentation_level = 0) override;
 
-protected:
-    std::atomic<bool> _ready_to_finish;
+    void add_block_task(PipelineXTask* task) override;
+
+private:
+    std::atomic<bool> _ready_to_finish {true};
     MonotonicStopWatch _finish_dependency_watcher;
-    const int _node_id;
+    std::vector<PipelineXTask*> _finish_blocked_task;
 };
 
 class RuntimeFilterDependency;
@@ -246,37 +217,34 @@ private:
     const int32_t _wait_time_ms;
     IRuntimeFilter* _runtime_filter;
 };
+
 class RuntimeFilterDependency final : public Dependency {
 public:
     RuntimeFilterDependency(int id, int node_id, std::string name)
-            : Dependency(id, name), _node_id(node_id) {}
-
-    RuntimeFilterDependency* filter_blocked_by() {
-        if (!_blocked_by_rf) {
-            return nullptr;
-        }
-        if (*_blocked_by_rf) {
-            return this;
-        }
-        return nullptr;
-    }
+            : Dependency(id, node_id, name) {}
+    RuntimeFilterDependency* filter_blocked_by(PipelineXTask* task);
     void* shared_state() override { return nullptr; }
     void add_filters(IRuntimeFilter* runtime_filter);
     void sub_filters();
     void set_blocked_by_rf(std::shared_ptr<std::atomic_bool> blocked_by_rf) {
         _blocked_by_rf = blocked_by_rf;
     }
+    std::string debug_string(int indentation_level = 0) override;
+
+    void add_block_task(PipelineXTask* task) override;
 
 protected:
-    const int _node_id;
     std::atomic_int _filters;
     std::shared_ptr<std::atomic_bool> _blocked_by_rf;
+
+private:
+    std::vector<PipelineXTask*> _filter_blocked_task;
 };
 
 class AndDependency final : public WriteDependency {
 public:
     ENABLE_FACTORY_CREATOR(AndDependency);
-    AndDependency(int id) : WriteDependency(id, "AndDependency") {}
+    AndDependency(int id, int node_id) : WriteDependency(id, node_id, 
"AndDependency") {}
 
     [[nodiscard]] std::string name() const override {
         fmt::memory_buffer debug_string_buffer;
@@ -292,19 +260,19 @@ public:
 
     std::string debug_string(int indentation_level = 0) override;
 
-    [[nodiscard]] Dependency* read_blocked_by() override {
+    [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
         for (auto& child : _children) {
-            if (auto* dep = child->read_blocked_by()) {
+            if (auto* dep = child->read_blocked_by(task)) {
                 return dep;
             }
         }
         return nullptr;
     }
 
-    [[nodiscard]] WriteDependency* write_blocked_by() override {
+    [[nodiscard]] WriteDependency* write_blocked_by(PipelineXTask* task) 
override {
         for (auto& child : _children) {
             CHECK(child->is_write_dependency());
-            if (auto* dep = 
((WriteDependency*)child.get())->write_blocked_by()) {
+            if (auto* dep = 
((WriteDependency*)child.get())->write_blocked_by(task)) {
                 return dep;
             }
         }
@@ -315,7 +283,7 @@ public:
 class OrDependency final : public WriteDependency {
 public:
     ENABLE_FACTORY_CREATOR(OrDependency);
-    OrDependency(int id) : WriteDependency(id, "OrDependency") {}
+    OrDependency(int id, int node_id) : WriteDependency(id, node_id, 
"OrDependency") {}
 
     [[nodiscard]] std::string name() const override {
         fmt::memory_buffer debug_string_buffer;
@@ -331,59 +299,32 @@ public:
 
     std::string debug_string(int indentation_level = 0) override;
 
-    [[nodiscard]] Dependency* read_blocked_by() override {
-        Dependency* res = nullptr;
-        for (auto& child : _children) {
-            auto* cur_res = child->read_blocked_by();
-            if (cur_res == nullptr) {
-                return nullptr;
-            } else {
-                res = cur_res;
-            }
-        }
-        return res;
-    }
+    bool is_or_dep() override { return true; }
 
-    [[nodiscard]] WriteDependency* write_blocked_by() override {
-        WriteDependency* res = nullptr;
-        for (auto& child : _children) {
-            CHECK(child->is_write_dependency());
-            auto* cur_res = 
((WriteDependency*)child.get())->write_blocked_by();
-            if (cur_res == nullptr) {
-                return nullptr;
-            } else {
-                res = cur_res;
-            }
-        }
-        return res;
+    [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override;
+
+    [[nodiscard]] WriteDependency* write_blocked_by(PipelineXTask* task) 
override;
+
+    void add_child(std::shared_ptr<Dependency> child) override {
+        WriteDependency::add_child(child);
+        child->set_parent(weak_from_this());
     }
 };
 
 struct FakeSharedState {};
 struct FakeDependency final : public WriteDependency {
 public:
-    FakeDependency(int id) : WriteDependency(id, "FakeDependency") {}
+    FakeDependency(int id, int node_id) : WriteDependency(id, node_id, 
"FakeDependency") {}
     using SharedState = FakeSharedState;
     void* shared_state() override { return nullptr; }
-    [[nodiscard]] Dependency* read_blocked_by() override { return nullptr; }
-    [[nodiscard]] WriteDependency* write_blocked_by() override { return 
nullptr; }
+    [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override { 
return nullptr; }
+    [[nodiscard]] WriteDependency* write_blocked_by(PipelineXTask* task) 
override {
+        return nullptr;
+    }
     [[nodiscard]] int64_t read_watcher_elapse_time() override { return 0; }
     [[nodiscard]] int64_t write_watcher_elapse_time() override { return 0; }
 };
 
-class AsyncWriterSinkDependency : public WriteDependency {
-public:
-    AsyncWriterSinkDependency(int id) : WriteDependency(id, 
"AsyncWriterSinkDependency") {}
-    using SharedState = FakeSharedState;
-    void* shared_state() override { return nullptr; }
-    [[nodiscard]] Dependency* read_blocked_by() override { return nullptr; }
-    [[nodiscard]] WriteDependency* write_blocked_by() override { return 
_call_func(); }
-    void set_write_blocked_by(std::function<WriteDependency*()> call_func) {
-        _call_func = call_func;
-    }
-    std::function<WriteDependency*()> _call_func;
-};
-
 struct AggSharedState {
 public:
     AggSharedState() {
@@ -412,7 +353,7 @@ public:
 class AggDependency final : public WriteDependency {
 public:
     using SharedState = AggSharedState;
-    AggDependency(int id) : WriteDependency(id, "AggDependency") {
+    AggDependency(int id, int node_id) : WriteDependency(id, node_id, 
"AggDependency") {
         _mem_tracker = std::make_unique<MemTracker>("AggregateOperator:");
     }
     ~AggDependency() override = default;
@@ -421,20 +362,20 @@ public:
         if (_is_streaming_agg_state()) {
             if (_agg_state.data_queue->_cur_blocks_nums_in_queue[0] == 0 &&
                 !_agg_state.data_queue->_is_finished[0]) {
-                _ready_for_read = false;
+                Dependency::block_reading();
             }
         } else {
-            _ready_for_read = false;
+            Dependency::block_reading();
         }
     }
 
     void block_writing() override {
         if (_is_streaming_agg_state()) {
             if (!_agg_state.data_queue->has_enough_space_to_push()) {
-                _ready_for_write = false;
+                WriteDependency::block_writing();
             }
         } else {
-            _ready_for_write = false;
+            WriteDependency::block_writing();
         }
     }
 
@@ -518,7 +459,7 @@ public:
 class SortDependency final : public WriteDependency {
 public:
     using SharedState = SortSharedState;
-    SortDependency(int id) : WriteDependency(id, "SortDependency") {}
+    SortDependency(int id, int node_id) : WriteDependency(id, node_id, 
"SortDependency") {}
     ~SortDependency() override = default;
     void* shared_state() override { return (void*)&_sort_state; };
 
@@ -538,15 +479,15 @@ public:
 class UnionDependency final : public WriteDependency {
 public:
     using SharedState = UnionSharedState;
-    UnionDependency(int id) : WriteDependency(id, "UnionDependency") {}
+    UnionDependency(int id, int node_id) : WriteDependency(id, node_id, 
"UnionDependency") {}
     ~UnionDependency() override = default;
+
     void* shared_state() override { return (void*)_union_state.get(); }
     void set_shared_state(std::shared_ptr<UnionSharedState> union_state) {
         _union_state = union_state;
     }
-    void set_ready_for_write() override {}
-    void set_ready_for_read() override {}
-    [[nodiscard]] Dependency* read_blocked_by() override {
+
+    [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
         if (_union_state->child_count() == 0) {
             return nullptr;
         }
@@ -566,27 +507,20 @@ private:
 struct MultiCastSharedState {
 public:
     MultiCastSharedState(const RowDescriptor& row_desc, ObjectPool* pool, int 
cast_sender_count)
-            : multi_cast_data_streamer(row_desc, pool, cast_sender_count) {}
+            : multi_cast_data_streamer(row_desc, pool, cast_sender_count, 
true) {}
     pipeline::MultiCastDataStreamer multi_cast_data_streamer;
 };
 
 class MultiCastDependency final : public WriteDependency {
 public:
     using SharedState = MultiCastSharedState;
-    MultiCastDependency(int id) : WriteDependency(id, "MultiCastDependency") {}
+    MultiCastDependency(int id, int node_id)
+            : WriteDependency(id, node_id, "MultiCastDependency") {}
     ~MultiCastDependency() override = default;
     void* shared_state() override { return (void*)_multi_cast_state.get(); };
     void set_shared_state(std::shared_ptr<MultiCastSharedState> 
multi_cast_state) {
         _multi_cast_state = multi_cast_state;
     }
-    WriteDependency* read_blocked_by() override {
-        if 
(_multi_cast_state->multi_cast_data_streamer.can_read(_consumer_id)) {
-            return nullptr;
-        }
-        return this;
-    }
-    int _consumer_id {};
-    void set_consumer_id(int consumer_id) { _consumer_id = consumer_id; }
 
 private:
     std::shared_ptr<MultiCastSharedState> _multi_cast_state;
@@ -617,7 +551,7 @@ public:
 class AnalyticDependency final : public WriteDependency {
 public:
     using SharedState = AnalyticSharedState;
-    AnalyticDependency(int id) : WriteDependency(id, "AnalyticDependency") {}
+    AnalyticDependency(int id, int node_id) : WriteDependency(id, node_id, 
"AnalyticDependency") {}
     ~AnalyticDependency() override = default;
 
     void* shared_state() override { return (void*)&_analytic_state; };
@@ -675,7 +609,7 @@ struct HashJoinSharedState : public JoinSharedState {
 class HashJoinDependency final : public WriteDependency {
 public:
     using SharedState = HashJoinSharedState;
-    HashJoinDependency(int id) : WriteDependency(id, "HashJoinDependency") {}
+    HashJoinDependency(int id, int node_id) : WriteDependency(id, node_id, 
"HashJoinDependency") {}
     ~HashJoinDependency() override = default;
 
     void* shared_state() override { return (void*)&_join_state; }
@@ -707,7 +641,8 @@ struct NestedLoopJoinSharedState : public JoinSharedState {
 class NestedLoopJoinDependency final : public WriteDependency {
 public:
     using SharedState = NestedLoopJoinSharedState;
-    NestedLoopJoinDependency(int id) : WriteDependency(id, 
"NestedLoopJoinDependency") {}
+    NestedLoopJoinDependency(int id, int node_id)
+            : WriteDependency(id, node_id, "NestedLoopJoinDependency") {}
     ~NestedLoopJoinDependency() override = default;
 
     void* shared_state() override { return (void*)&_join_state; }
@@ -727,22 +662,28 @@ public:
 class PartitionSortDependency final : public WriteDependency {
 public:
     using SharedState = PartitionSortNodeSharedState;
-    PartitionSortDependency(int id) : WriteDependency(id, 
"PartitionSortDependency"), _eos(false) {}
+    PartitionSortDependency(int id, int node_id)
+            : WriteDependency(id, node_id, "PartitionSortDependency"), 
_eos(false) {}
     ~PartitionSortDependency() override = default;
     void* shared_state() override { return (void*)&_partition_sort_state; };
-    void set_ready_for_write() override {}
-    void block_writing() override {}
+    void set_ready_for_write() override {
+        throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not 
reach here!");
+    }
+    void block_writing() override {
+        throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not 
reach here!");
+    }
 
-    [[nodiscard]] Dependency* read_blocked_by() override {
-        if (config::enable_fuzzy_mode && !(_ready_for_read || _eos) &&
-            _should_log(_read_dependency_watcher.elapsed_time())) {
-            LOG(WARNING) << "========Dependency may be blocked by some 
reasons: " << name() << " "
-                         << id();
+    void block_reading() override {
+        if (_eos) {
+            return;
         }
-        return _ready_for_read || _eos ? nullptr : this;
+        Dependency::block_reading();
     }
 
-    void set_eos() { _eos = true; }
+    void set_eos() {
+        _eos = true;
+        WriteDependency::set_ready_for_read();
+    }
 
 private:
     PartitionSortNodeSharedState _partition_sort_state;
@@ -752,12 +693,17 @@ private:
 class AsyncWriterDependency final : public WriteDependency {
 public:
     ENABLE_FACTORY_CREATOR(AsyncWriterDependency);
-    AsyncWriterDependency(int id) : WriteDependency(id, 
"AsyncWriterDependency") {}
+    AsyncWriterDependency(int id, int node_id)
+            : WriteDependency(id, node_id, "AsyncWriterDependency") {}
     ~AsyncWriterDependency() override = default;
     void* shared_state() override { return nullptr; }
 };
 
+class SetDependency;
+
 struct SetSharedState {
+public:
+    SetSharedState(int num_deps) { 
probe_finished_children_dependency.resize(num_deps, nullptr); }
     /// default init
     //record memory during running
     int64_t mem_used = 0;
@@ -782,14 +728,15 @@ struct SetSharedState {
     /// init in build side
     int child_quantity;
     vectorized::VExprContextSPtrs build_child_exprs;
-    std::vector<bool> probe_finished_children_index; // use in probe side
+    std::vector<SetDependency*> probe_finished_children_dependency;
 
     /// init in probe side
     std::vector<vectorized::VExprContextSPtrs> probe_child_exprs_lists;
 
     std::atomic<bool> ready_for_read = false;
 
-public:
+    void set_probe_finished_children(int child_id);
+
     /// called in setup_local_state
     void hash_table_init() {
         if (child_exprs_lists[0].size() == 1 && (!build_not_ignore_null[0])) {
@@ -845,28 +792,24 @@ public:
 class SetDependency final : public WriteDependency {
 public:
     using SharedState = SetSharedState;
-    SetDependency(int id) : WriteDependency(id, "SetDependency") {}
+    SetDependency(int id, int node_id) : WriteDependency(id, node_id, 
"SetDependency") {}
     ~SetDependency() override = default;
     void* shared_state() override { return (void*)_set_state.get(); }
 
     void set_shared_state(std::shared_ptr<SetSharedState> set_state) { 
_set_state = set_state; }
 
     // Which dependency current pipeline task is blocked by. `nullptr` if this 
dependency is ready.
-    [[nodiscard]] Dependency* read_blocked_by() override {
+    [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
         if (config::enable_fuzzy_mode && !_set_state->ready_for_read &&
             _should_log(_read_dependency_watcher.elapsed_time())) {
             LOG(WARNING) << "========Dependency may be blocked by some 
reasons: " << name() << " "
-                         << id();
+                         << id() << " " << _node_id << " block tasks: " << 
_blocked_task.size();
         }
-        return _set_state->ready_for_read ? nullptr : this;
-    }
-
-    [[nodiscard]] WriteDependency* write_blocked_by() override {
-        if (is_set_probe) {
-            DCHECK((_cur_child_id - 1) < 
_set_state->probe_finished_children_index.size());
-            return _set_state->probe_finished_children_index[_cur_child_id - 
1] ? nullptr : this;
+        std::unique_lock<std::mutex> lc(_task_lock);
+        if (!_set_state->ready_for_read && task) {
+            add_block_task(task);
         }
-        return nullptr;
+        return _set_state->ready_for_read ? nullptr : this;
     }
 
     // Notify downstream pipeline tasks this dependency is ready.
@@ -877,15 +820,16 @@ public:
         _read_dependency_watcher.stop();
         _set_state->ready_for_read = true;
     }
+
     void set_cur_child_id(int id) {
-        _cur_child_id = id;
-        is_set_probe = true;
+        _set_state->probe_finished_children_dependency[id] = this;
+        if (id != 0) {
+            block_writing();
+        }
     }
 
 private:
     std::shared_ptr<SetSharedState> _set_state;
-    int _cur_child_id;
-    bool is_set_probe {false};
 };
 
 using PartitionedBlock = std::pair<std::shared_ptr<vectorized::Block>,
@@ -894,14 +838,37 @@ struct LocalExchangeSharedState {
 public:
     ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
     std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> data_queue;
+    std::vector<Dependency*> source_dependencies;
     std::atomic<int> running_sink_operators = 0;
+    void add_running_sink_operators() { running_sink_operators++; }
+    void sub_running_sink_operators() {
+        auto val = running_sink_operators.fetch_sub(1);
+        if (val == 1) {
+            _set_ready_for_read();
+        }
+    }
+    void _set_ready_for_read() {
+        for (auto* dep : source_dependencies) {
+            DCHECK(dep);
+            dep->set_ready_for_read();
+        }
+    }
+    void set_dep_by_channel_id(Dependency* dep, int channel_id) {
+        source_dependencies[channel_id] = dep;
+        dep->block_reading();
+    }
+    void set_ready_for_read(int channel_id) {
+        auto* dep = source_dependencies[channel_id];
+        DCHECK(dep);
+        dep->set_ready_for_read();
+    }
 };
 
 struct LocalExchangeDependency final : public WriteDependency {
 public:
     using SharedState = LocalExchangeSharedState;
-    LocalExchangeDependency(int id)
-            : WriteDependency(id, "LocalExchangeDependency"),
+    LocalExchangeDependency(int id, int node_id)
+            : WriteDependency(id, node_id, "LocalExchangeDependency"),
               _local_exchange_shared_state(nullptr) {}
     ~LocalExchangeDependency() override = default;
     void* shared_state() override { return _local_exchange_shared_state.get(); 
}
@@ -910,27 +877,8 @@ public:
         _local_exchange_shared_state = state;
     }
 
-    void set_channel_id(int channel_id) { _channel_id = channel_id; }
-
-    Dependency* read_blocked_by() override {
-        if (config::enable_fuzzy_mode && !_should_run() &&
-            _should_log(_read_dependency_watcher.elapsed_time())) {
-            LOG(WARNING) << "========Dependency may be blocked by some 
reasons: " << name() << " "
-                         << id();
-        }
-        return _should_run() ? nullptr : this;
-    }
-
 private:
-    bool _should_run() const {
-        DCHECK(_local_exchange_shared_state != nullptr);
-        return 
_local_exchange_shared_state->data_queue[_channel_id].size_approx() > 0 ||
-               _local_exchange_shared_state->running_sink_operators == 0;
-    }
-
     std::shared_ptr<LocalExchangeSharedState> _local_exchange_shared_state;
-    int _channel_id;
 };
 
-} // namespace pipeline
-} // namespace doris
+} // namespace doris::pipeline
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
index eba75c0fc1f..a793a22761f 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
@@ -27,7 +27,7 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo
     _distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");
     auto& p = _parent->cast<LocalExchangeSinkOperatorX>();
     RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner));
-    _shared_state->running_sink_operators++;
+    _shared_state->add_running_sink_operators();
     return Status::OK();
 }
 
@@ -60,6 +60,7 @@ Status LocalExchangeSinkLocalState::split_rows(RuntimeState* 
state,
         if (size > 0) {
             data_queue[i].enqueue({new_block, {row_idx, start, size}});
         }
+        _shared_state->set_ready_for_read(i);
     }
 
     return Status::OK();
@@ -83,7 +84,7 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block*
     }
 
     if (source_state == SourceState::FINISHED) {
-        local_state._shared_state->running_sink_operators--;
+        local_state._shared_state->sub_running_sink_operators();
     }
 
     return Status::OK();
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index 127e14dba6b..a1bff19cb2b 100644
--- 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++ 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -27,7 +27,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* 
state, LocalStateInfo&
     _shared_state = (LocalExchangeSharedState*)_dependency->shared_state();
     DCHECK(_shared_state != nullptr);
     _channel_id = info.task_idx;
-    _dependency->set_channel_id(_channel_id);
+    _shared_state->set_dep_by_channel_id(_dependency, _channel_id);
     _get_block_failed_counter =
             ADD_COUNTER_WITH_LEVEL(profile(), "GetBlockFailedTime", 
TUnit::UNIT, 1);
     _copy_data_timer = ADD_TIMER(profile(), "CopyDataTime");
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 9e6df06da01..cc21a389651 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -288,7 +288,7 @@ void 
DataSinkOperatorX<LocalStateType>::get_dependency(vector<DependencySPtr>& d
     if constexpr (!std::is_same_v<typename LocalStateType::Dependency, 
FakeDependency>) {
         auto& dests = dests_id();
         for (auto& dest_id : dests) {
-            dependency.push_back(std::make_shared<DependencyType>(dest_id));
+            dependency.push_back(std::make_shared<DependencyType>(dest_id, 
_node_id));
         }
     } else {
         dependency.push_back(nullptr);
@@ -341,7 +341,7 @@ Status 
PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
         }
     } else {
         auto& deps = info.dependencys;
-        deps.front() = std::make_shared<FakeDependency>(0);
+        deps.front() = std::make_shared<FakeDependency>(0, 0);
         _dependency = (DependencyType*)deps.front().get();
     }
 
@@ -403,7 +403,7 @@ Status 
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
         }
     } else {
         auto& deps = info.dependencys;
-        deps.front() = std::make_shared<FakeDependency>(0);
+        deps.front() = std::make_shared<FakeDependency>(0, 0);
         _dependency = (DependencyType*)deps.front().get();
     }
     _rows_input_counter = ADD_COUNTER_WITH_LEVEL(_profile, "InputRows", 
TUnit::UNIT, 1);
@@ -481,15 +481,14 @@ Status AsyncWriterSink<Writer, 
Parent>::init(RuntimeState* state, LocalSinkState
         RETURN_IF_ERROR(
                 _parent->cast<Parent>()._output_vexpr_ctxs[i]->clone(state, 
_output_vexpr_ctxs[i]));
     }
-    
static_cast<AsyncWriterSinkDependency*>(_dependency)->set_write_blocked_by([this]()
 {
-        return this->write_blocked_by();
-    });
     _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
-    _async_writer_dependency = 
AsyncWriterDependency::create_shared(_parent->operator_id());
+    _async_writer_dependency =
+            AsyncWriterDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
     _writer->set_dependency(_async_writer_dependency.get(), 
_finish_dependency.get());
 
     _wait_for_dependency_timer =
             ADD_TIMER(_profile, "WaitForDependency[" + 
_async_writer_dependency->name() + "]Time");
+    _finish_dependency->should_finish_after_check();
     return Status::OK();
 }
 
@@ -507,8 +506,8 @@ Status AsyncWriterSink<Writer, Parent>::sink(RuntimeState* 
state, vectorized::Bl
 }
 
 template <typename Writer, typename Parent>
-WriteDependency* AsyncWriterSink<Writer, Parent>::write_blocked_by() {
-    return _writer->write_blocked_by();
+WriteDependency* AsyncWriterSink<Writer, 
Parent>::write_blocked_by(PipelineXTask* task) {
+    return _writer->write_blocked_by(task);
 }
 
 template <typename Writer, typename Parent>
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 265579bfaab..4e8f030d505 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -625,9 +625,9 @@ public:
 };
 
 template <typename Writer, typename Parent>
-class AsyncWriterSink : public 
PipelineXSinkLocalState<AsyncWriterSinkDependency> {
+class AsyncWriterSink : public PipelineXSinkLocalState<FakeDependency> {
 public:
-    using Base = PipelineXSinkLocalState<AsyncWriterSinkDependency>;
+    using Base = PipelineXSinkLocalState<FakeDependency>;
     AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state)
             : Base(parent, state), _async_writer_dependency(nullptr) {}
 
@@ -637,8 +637,8 @@ public:
 
     Status sink(RuntimeState* state, vectorized::Block* block, SourceState 
source_state);
 
-    WriteDependency* write_blocked_by();
-
+    WriteDependency* write_blocked_by(PipelineXTask* task);
+    WriteDependency* dependency() override { return 
_async_writer_dependency.get(); }
     Status close(RuntimeState* state, Status exec_status) override;
 
     Status try_close(RuntimeState* state, Status exec_status) override;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 7113989ee1e..28e1be496f4 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -422,8 +422,8 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
             auto task = std::make_unique<PipelineXTask>(
                     _pipelines[pip_idx], _total_tasks++, 
_runtime_states[i].get(), this,
                     _runtime_states[i]->runtime_profile(),
-                    _op_id_to_le_state.count(
-                            
_pipelines[pip_idx]->operator_xs().front()->operator_id()) > 0
+                    _op_id_to_le_state.contains(
+                            
_pipelines[pip_idx]->operator_xs().front()->operator_id())
                             ? _op_id_to_le_state
                                       
[_pipelines[pip_idx]->operator_xs().front()->operator_id()]
                             : nullptr,
@@ -592,13 +592,14 @@ Status 
PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, OperatorX
     _dag[downstream_pipeline_id].push_back(cur_pipe->id());
 
     DataSinkOperatorXPtr sink;
-    sink.reset(new LocalExchangeSinkOperatorX(
-            local_exchange_id, _runtime_state->query_parallel_instance_num(), 
texprs));
+    auto num_instances = _runtime_state->query_parallel_instance_num();
+    sink.reset(new LocalExchangeSinkOperatorX(local_exchange_id, 
num_instances, texprs));
     RETURN_IF_ERROR(cur_pipe->set_sink(sink));
     RETURN_IF_ERROR(cur_pipe->sink_x()->init());
 
     auto shared_state = LocalExchangeSharedState::create_shared();
-    
shared_state->data_queue.resize(_runtime_state->query_parallel_instance_num());
+    shared_state->data_queue.resize(num_instances);
+    shared_state->source_dependencies.resize(num_instances, nullptr);
     _op_id_to_le_state.insert({local_exchange_id, shared_state});
     return Status::OK();
 }
@@ -1029,4 +1030,18 @@ bool 
PipelineXFragmentContext::_has_inverted_index_or_partial_update(TOlapTableS
     return false;
 }
 
+std::string PipelineXFragmentContext::debug_string() {
+    fmt::memory_buffer debug_string_buffer;
+    for (size_t j = 0; j < _tasks.size(); j++) {
+        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
+        for (size_t i = 0; i < _tasks[j].size(); i++) {
+            if (_tasks[j][i]->get_state() == PipelineTaskState::FINISHED) {
+                continue;
+            }
+            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i, 
_tasks[j][i]->debug_string());
+        }
+    }
+
+    return fmt::to_string(debug_string_buffer);
+}
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 6fa91aedf12..9e7ff42219f 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -115,6 +115,8 @@ public:
 
     [[nodiscard]] int max_operator_id() const { return _operator_id; }
 
+    std::string debug_string() override;
+
 private:
     void _close_action() override;
     Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& 
request) override;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 140fecbb1d5..7295f38a124 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -91,14 +91,14 @@ Status PipelineXTask::prepare(RuntimeState* state, const 
TPipelineInstanceParams
     }
 
     _block = doris::vectorized::Block::create_unique();
-    RETURN_IF_ERROR(extract_dependencies());
+    RETURN_IF_ERROR(_extract_dependencies());
     // We should make sure initial state for task are runnable so that we can 
do some preparation jobs (e.g. initialize runtime filters).
     set_state(PipelineTaskState::RUNNABLE);
     _prepared = true;
     return Status::OK();
 }
 
-Status PipelineXTask::extract_dependencies() {
+Status PipelineXTask::_extract_dependencies() {
     for (auto op : _operators) {
         auto result = _state->get_local_state_result(op->operator_id());
         if (!result) {
@@ -179,7 +179,22 @@ Status PipelineXTask::_open() {
     SCOPED_TIMER(_open_timer);
     _dry_run = _sink->should_dry_run(_state);
     for (auto& o : _operators) {
-        
RETURN_IF_ERROR(_state->get_local_state(o->operator_id())->open(_state));
+        auto* local_state = _state->get_local_state(o->operator_id());
+        for (size_t i = 0; i < 2; i++) {
+            auto st = local_state->open(_state);
+            if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
+                _blocked_dep = _filter_dependency->filter_blocked_by(this);
+                if (_blocked_dep) {
+                    set_state(PipelineTaskState::BLOCKED_FOR_RF);
+                    set_use_blocking_queue(false);
+                    RETURN_IF_ERROR(st);
+                } else if (i == 1) {
+                    CHECK(false) << debug_string();
+                }
+            } else {
+                break;
+            }
+        }
     }
     
RETURN_IF_ERROR(_state->get_sink_local_state(_sink->operator_id())->open(_state));
     _opened = true;
@@ -204,10 +219,6 @@ Status PipelineXTask::execute(bool* eos) {
             SCOPED_RAW_TIMER(&time_spent);
             auto st = _open();
             if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
-                set_state(PipelineTaskState::BLOCKED_FOR_RF);
-                return Status::OK();
-            } else if (st.is<ErrorCode::PIP_WAIT_FOR_SC>()) {
-                set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
                 return Status::OK();
             }
             RETURN_IF_ERROR(st);
@@ -326,17 +337,10 @@ std::string PipelineXTask::debug_string() {
     fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
                    print_id(_state->fragment_instance_id()));
 
-    fmt::format_to(debug_string_buffer, "RuntimeUsage: {}\n",
-                   PrettyPrinter::print(get_runtime_ns(), TUnit::TIME_NS));
-    {
-        std::stringstream profile_ss;
-        _fresh_profile_counter();
-        _task_profile->pretty_print(&profile_ss, "");
-        fmt::format_to(debug_string_buffer, "Profile: {}\n", profile_ss.str());
-    }
-    fmt::format_to(debug_string_buffer,
-                   "PipelineTask[this = {}, state = {}]\noperators: ", 
(void*)this,
-                   get_state_name(_cur_state));
+    fmt::format_to(
+            debug_string_buffer,
+            "PipelineTask[this = {}, state = {}, data state = {}, dry run = 
{}]\noperators: ",
+            (void*)this, get_state_name(_cur_state), (int)_data_state, 
_dry_run);
     for (size_t i = 0; i < _operators.size(); i++) {
         fmt::format_to(
                 debug_string_buffer, "\n{}",
@@ -345,7 +349,42 @@ std::string PipelineXTask::debug_string() {
     fmt::format_to(debug_string_buffer, "\n{}",
                    _opened ? _sink->debug_string(_state, _operators.size())
                            : _sink->debug_string(_operators.size()));
+    fmt::format_to(debug_string_buffer, "\nRead Dependency Information: \n");
+    for (size_t i = 0; i < _read_dependencies.size(); i++) {
+        fmt::format_to(debug_string_buffer, "{}{}\n", std::string(i * 2, ' '),
+                       _read_dependencies[i]->debug_string());
+    }
+
+    fmt::format_to(debug_string_buffer, "Write Dependency Information: \n");
+    fmt::format_to(debug_string_buffer, "{}\n", 
_write_dependencies->debug_string());
+
+    fmt::format_to(debug_string_buffer, "Runtime Filter Dependency 
Information: \n");
+    fmt::format_to(debug_string_buffer, "{}\n", 
_filter_dependency->debug_string());
+
+    fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n");
+    for (size_t i = 0; i < _finish_dependencies.size(); i++) {
+        fmt::format_to(debug_string_buffer, "{}{}\n", std::string(i * 2, ' '),
+                       _finish_dependencies[i]->debug_string());
+    }
     return fmt::to_string(debug_string_buffer);
 }
 
+void PipelineXTask::try_wake_up(Dependency* wake_up_dep) {
+    // call by dependency
+    VecDateTimeValue now = VecDateTimeValue::local_time();
+    // TODO(gabriel): task will never be wake up if canceled / timeout
+    if (query_context()->is_cancelled()) {
+        _make_run();
+        return;
+    }
+    if (query_context()->is_timeout(now)) {
+        query_context()->cancel(true, "", Status::Cancelled(""));
+    }
+    _make_run();
+}
+
+void PipelineXTask::_make_run() {
+    static_cast<void>(get_task_queue()->push_back(this));
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 90fdda921f0..bc50f1e89de 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -74,51 +74,21 @@ public:
         if (_dry_run) {
             return true;
         }
-        for (auto* op_dep : _read_dependencies) {
-            auto* dep = op_dep->read_blocked_by();
-            if (dep != nullptr) {
-                dep->start_read_watcher();
-                push_blocked_task_to_dependency(dep);
-                return false;
-            }
-        }
-        return true;
+        return _read_blocked_dependency() == nullptr;
     }
 
     bool runtime_filters_are_ready_or_timeout() override {
-        auto* dep = _filter_dependency->filter_blocked_by();
-        if (dep != nullptr) {
-            push_blocked_task_to_dependency(dep);
-            return false;
-        }
-        return true;
+        throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not 
reach here!");
+        return false;
     }
 
-    bool sink_can_write() override {
-        auto* dep = _write_dependencies->write_blocked_by();
-        if (dep != nullptr) {
-            dep->start_write_watcher();
-            push_blocked_task_to_dependency(dep);
-            return false;
-        }
-        return true;
-    }
+    bool sink_can_write() override { return _write_blocked_dependency() == 
nullptr; }
 
     Status finalize() override;
 
     std::string debug_string() override;
 
-    bool is_pending_finish() override {
-        for (auto* fin_dep : _finish_dependencies) {
-            auto* dep = fin_dep->finish_blocked_by();
-            if (dep != nullptr) {
-                dep->start_finish_watcher();
-                push_blocked_task_to_dependency(dep);
-                return true;
-            }
-        }
-        return false;
-    }
+    bool is_pending_finish() override { return _finish_blocked_dependency() != 
nullptr; }
 
     std::vector<DependencySPtr>& get_downstream_dependency() { return 
_downstream_dependency; }
 
@@ -147,9 +117,9 @@ public:
         return _upstream_dependency[id];
     }
 
-    Status extract_dependencies();
+    bool is_pipelineX() const override { return true; }
 
-    void push_blocked_task_to_dependency(Dependency* dep) {}
+    void try_wake_up(Dependency* wake_up_dep);
 
     DataSinkOperatorXPtr sink() const { return _sink; }
 
@@ -157,7 +127,60 @@ public:
 
     OperatorXs operatorXs() { return _operators; }
 
+    bool push_blocked_task_to_queue() {
+        /**
+         * Push task into blocking queue if:
+         * 1. `_use_blocking_queue` is true.
+         * 2. Or this task is blocked by FE two phase execution 
(BLOCKED_FOR_DEPENDENCY).
+         */
+        return _use_blocking_queue || get_state() == 
PipelineTaskState::BLOCKED_FOR_DEPENDENCY;
+    }
+    void set_use_blocking_queue(bool use_blocking_queue) {
+        if (_blocked_dep->is_or_dep()) {
+            _use_blocking_queue = true;
+            return;
+        }
+        _use_blocking_queue = use_blocking_queue;
+    }
+
 private:
+    Dependency* _write_blocked_dependency() {
+        _blocked_dep = _write_dependencies->write_blocked_by(this);
+        if (_blocked_dep != nullptr) {
+            set_use_blocking_queue(false);
+            static_cast<WriteDependency*>(_blocked_dep)->start_write_watcher();
+            return _blocked_dep;
+        }
+        return nullptr;
+    }
+
+    Dependency* _finish_blocked_dependency() {
+        for (auto* fin_dep : _finish_dependencies) {
+            _blocked_dep = fin_dep->finish_blocked_by(this);
+            if (_blocked_dep != nullptr) {
+                set_use_blocking_queue(false);
+                
static_cast<FinishDependency*>(_blocked_dep)->start_finish_watcher();
+                return _blocked_dep;
+            }
+        }
+        return nullptr;
+    }
+
+    Dependency* _read_blocked_dependency() {
+        for (auto* op_dep : _read_dependencies) {
+            _blocked_dep = op_dep->read_blocked_by(this);
+            if (_blocked_dep != nullptr) {
+                // TODO(gabriel):
+                set_use_blocking_queue(true);
+                _blocked_dep->start_read_watcher();
+                return _blocked_dep;
+            }
+        }
+        return nullptr;
+    }
+
+    Status _extract_dependencies();
+    void _make_run();
     void set_close_pipeline_time() override {}
     void _init_profile() override;
     void _fresh_profile_counter() override;
@@ -180,6 +203,10 @@ private:
     std::shared_ptr<LocalExchangeSharedState> _local_exchange_state;
     int _task_idx;
     bool _dry_run = false;
+
+    Dependency* _blocked_dep {nullptr};
+
+    std::atomic<bool> _use_blocking_queue {true};
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index aa3891a5a2c..9ce2711c27a 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -34,6 +34,8 @@
 #include "common/logging.h"
 #include "common/signal_handler.h"
 #include "pipeline/pipeline_task.h"
+#include "pipeline/pipeline_x/dependency.h"
+#include "pipeline/pipeline_x/pipeline_x_task.h"
 #include "pipeline/task_queue.h"
 #include "pipeline_fragment_context.h"
 #include "runtime/query_context.h"
@@ -75,6 +77,11 @@ Status BlockedTaskScheduler::add_blocked_task(PipelineTask* 
task) {
         return Status::InternalError("BlockedTaskScheduler shutdown");
     }
     std::unique_lock<std::mutex> lock(_task_mutex);
+    if (task->is_pipelineX() && 
!static_cast<PipelineXTask*>(task)->push_blocked_task_to_queue()) {
+        // put this task into current dependency's blocking queue and wait for 
event notification
+        // instead of using a separate BlockedTaskScheduler.
+        return Status::OK();
+    }
     _blocked_tasks.push_back(task);
     _task_cond.notify_one();
     return Status::OK();
@@ -222,24 +229,30 @@ void TaskScheduler::_do_work(size_t index) {
         if (!task) {
             continue;
         }
+        if (task->is_pipelineX() && task->is_running()) {
+            static_cast<void>(_task_queue->push_back(task, index));
+            continue;
+        }
+        task->set_running(true);
         task->set_task_queue(_task_queue.get());
         auto* fragment_ctx = task->fragment_context();
         signal::query_id_hi = fragment_ctx->get_query_id().hi;
         signal::query_id_lo = fragment_ctx->get_query_id().lo;
         bool canceled = fragment_ctx->is_canceled();
 
-        auto check_state = task->get_state();
-        if (check_state == PipelineTaskState::PENDING_FINISH) {
-            DCHECK(!task->is_pending_finish()) << "must not pending close " << 
task->debug_string();
+        auto state = task->get_state();
+        if (state == PipelineTaskState::PENDING_FINISH) {
+            DCHECK(task->is_pipelineX() || !task->is_pending_finish())
+                    << "must not pending close " << task->debug_string();
             Status exec_status = 
fragment_ctx->get_query_context()->exec_status();
             _try_close_task(task,
                             canceled ? PipelineTaskState::CANCELED : 
PipelineTaskState::FINISHED,
                             exec_status);
             continue;
         }
-        DCHECK(check_state != PipelineTaskState::FINISHED &&
-               check_state != PipelineTaskState::CANCELED)
-                << "task already finish";
+
+        DCHECK(state != PipelineTaskState::FINISHED && state != 
PipelineTaskState::CANCELED)
+                << "task already finish: " << task->debug_string();
 
         if (canceled) {
             // may change from pending FINISH,should called cancel
@@ -253,7 +266,13 @@ void TaskScheduler::_do_work(size_t index) {
             continue;
         }
 
-        DCHECK(check_state == PipelineTaskState::RUNNABLE);
+        if (task->is_pipelineX()) {
+            task->set_state(PipelineTaskState::RUNNABLE);
+        }
+
+        DCHECK(task->is_pipelineX() || task->get_state() == 
PipelineTaskState::RUNNABLE)
+                << "state:" << get_state_name(task->get_state())
+                << " task: " << task->debug_string();
         // task exec
         bool eos = false;
         auto status = Status::OK();
@@ -313,6 +332,7 @@ void TaskScheduler::_do_work(size_t index) {
         }
 
         auto pipeline_state = task->get_state();
+        task->set_running(false);
         switch (pipeline_state) {
         case PipelineTaskState::BLOCKED_FOR_SOURCE:
         case PipelineTaskState::BLOCKED_FOR_SINK:
@@ -324,7 +344,8 @@ void TaskScheduler::_do_work(size_t index) {
             static_cast<void>(_task_queue->push_back(task, index));
             break;
         default:
-            DCHECK(false) << "error state after run task, " << 
get_state_name(pipeline_state);
+            DCHECK(false) << "error state after run task, " << 
get_state_name(pipeline_state)
+                          << " task: " << task->debug_string();
             break;
         }
     }
@@ -344,21 +365,24 @@ void TaskScheduler::_try_close_task(PipelineTask* task, 
PipelineTaskState state,
         cancel();
         // Call `close` if `try_close` failed to make sure allocated resources 
are released
         static_cast<void>(task->close(exec_status));
-    } else if (!task->is_pending_finish()) {
-        status = task->close(exec_status);
-        if (!status.ok() && state != PipelineTaskState::CANCELED) {
-            cancel();
-        }
-    }
-
-    if (task->is_pending_finish()) {
+    } else if (!task->is_pipelineX() && task->is_pending_finish()) {
         task->set_state(PipelineTaskState::PENDING_FINISH);
         static_cast<void>(_blocked_task_scheduler->add_blocked_task(task));
+        task->set_running(false);
         return;
+    } else if (task->is_pending_finish()) {
+        task->set_state(PipelineTaskState::PENDING_FINISH);
+        task->set_running(false);
+        return;
+    }
+    status = task->close(exec_status);
+    if (!status.ok() && state != PipelineTaskState::CANCELED) {
+        cancel();
     }
     task->set_state(state);
     task->set_close_pipeline_time();
     task->release_dependency();
+    task->set_running(false);
     task->fragment_context()->close_a_pipeline();
 }
 
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 9b85ec420ee..070f2b6a5a7 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -65,7 +65,6 @@ private:
 
     static constexpr auto EMPTY_TIMES_TO_YIELD = 64;
 
-private:
     void _schedule();
     void _make_task_run(std::list<PipelineTask*>& local_tasks,
                         std::list<PipelineTask*>::iterator& task_itr,
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index e4b703e792f..a7a89bfd421 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -797,6 +797,24 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params,
     return Status::OK();
 }
 
+std::string FragmentMgr::dump_pipeline_tasks() {
+    fmt::memory_buffer debug_string_buffer;
+    auto t = MonotonicNanos();
+    size_t i = 0;
+    {
+        std::lock_guard<std::mutex> lock(_lock);
+        fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts are 
still running!\n",
+                       _pipeline_map.size());
+        for (auto& it : _pipeline_map) {
+            fmt::format_to(debug_string_buffer, "No.{} (elapse time = {}, 
InstanceId = {}) : {}\n",
+                           i, t - it.second->create_time(), print_id(it.first),
+                           it.second->debug_string());
+            i++;
+        }
+    }
+    return fmt::to_string(debug_string_buffer);
+}
+
 Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
                                        const FinishCallback& cb) {
     VLOG_ROW << "query: " << print_id(params.query_id) << " exec_plan_fragment 
params is "
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 93f010a5ecd..08f95bd3357 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -143,6 +143,8 @@ public:
         return _query_ctx_map.size();
     }
 
+    std::string dump_pipeline_tasks();
+
 private:
     void cancel_unlocked_impl(const TUniqueId& id, const 
PPlanFragmentCancelReason& reason,
                               const std::unique_lock<std::mutex>& state_lock, 
bool is_pipeline,
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index c4f24cfdf8f..cb8f275574e 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -41,6 +41,7 @@
 #include "http/action/meta_action.h"
 #include "http/action/metrics_action.h"
 #include "http/action/pad_rowset_action.h"
+#include "http/action/pipeline_task_action.h"
 #include "http/action/pprof_actions.h"
 #include "http/action/reload_tablet_action.h"
 #include "http/action/reset_rpc_channel_action.h"
@@ -165,6 +166,11 @@ Status HttpService::start() {
     HealthAction* health_action = _pool.add(new HealthAction());
     _ev_http_server->register_handler(HttpMethod::GET, "/api/health", 
health_action);
 
+    // Register BE health action
+    PipelineTaskAction* pipeline_task_action = _pool.add(new 
PipelineTaskAction());
+    _ev_http_server->register_handler(HttpMethod::GET, 
"/api/running_pipeline_tasks",
+                                      pipeline_task_action);
+
     // Register Tablets Info action
     TabletsInfoAction* tablets_info_action =
             _pool.add(new TabletsInfoAction(_env, TPrivilegeHier::GLOBAL, 
TPrivilegeType::ADMIN));
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index d3688507fcd..ef6a9d415de 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -65,6 +65,7 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, 
doris::vectorized::V
     ctx_id = UniqueId::gen_uid().to_string();
     if (_scanners.empty()) {
         _is_finished = true;
+        _set_scanner_done();
     }
     if (limit < 0) {
         limit = -1;
@@ -135,10 +136,6 @@ Status ScannerContext::init() {
     // 4. This ctx will be submitted to the scanner scheduler right after init.
     // So set _num_scheduling_ctx to 1 here.
     _num_scheduling_ctx = 1;
-    if (_finish_dependency) {
-        std::lock_guard l(_transfer_lock);
-        _finish_dependency->block_finishing();
-    }
 
     _num_unfinished_scanners = _scanners.size();
 
@@ -230,9 +227,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
         auto state = _scanner_scheduler->submit(this);
         if (state.ok()) {
             _num_scheduling_ctx++;
-            if (_finish_dependency) {
-                _finish_dependency->block_finishing();
-            }
         } else {
             set_status_on_error(state, false);
         }
@@ -304,12 +298,11 @@ Status ScannerContext::validate_block_schema(Block* 
block) {
 }
 
 void ScannerContext::set_should_stop() {
-    if (_scanner_done_dependency) {
-        _scanner_done_dependency->set_ready_for_read();
-    }
     std::lock_guard l(_transfer_lock);
     _should_stop = true;
+    _set_scanner_done();
     _blocks_queue_added_cv.notify_one();
+    set_ready_to_finish();
 }
 
 void ScannerContext::inc_num_running_scanners(int32_t inc) {
@@ -320,19 +313,20 @@ void ScannerContext::inc_num_running_scanners(int32_t 
inc) {
 void ScannerContext::dec_num_scheduling_ctx() {
     std::lock_guard l(_transfer_lock);
     _num_scheduling_ctx--;
-    if (_finish_dependency) {
-        if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
-            _finish_dependency->set_ready_to_finish();
-        } else {
-            _finish_dependency->block_finishing();
-        }
-    }
-
+    set_ready_to_finish();
     if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
         _ctx_finish_cv.notify_one();
     }
 }
 
+void ScannerContext::set_ready_to_finish() {
+    // `_should_stop == true` means this task has already ended and wait for 
pending finish now.
+    if (_finish_dependency && _should_stop && _num_running_scanners == 0 &&
+        _num_scheduling_ctx == 0) {
+        _finish_dependency->set_ready_to_finish();
+    }
+}
+
 bool ScannerContext::set_status_on_error(const Status& status, bool need_lock) 
{
     std::unique_lock l(_transfer_lock, std::defer_lock);
     if (need_lock) {
@@ -342,10 +336,8 @@ bool ScannerContext::set_status_on_error(const Status& 
status, bool need_lock) {
         _process_status = status;
         _status_error = true;
         _blocks_queue_added_cv.notify_one();
-        if (_scanner_done_dependency) {
-            _scanner_done_dependency->set_ready_for_read();
-        }
         _should_stop = true;
+        _set_scanner_done();
         return true;
     }
     return false;
@@ -437,6 +429,12 @@ bool ScannerContext::no_schedule() {
     return _num_running_scanners == 0 && _num_scheduling_ctx == 0;
 }
 
+void ScannerContext::_set_scanner_done() {
+    if (_scanner_done_dependency) {
+        _scanner_done_dependency->set_ready_for_read();
+    }
+}
+
 std::string ScannerContext::debug_string() {
     return fmt::format(
             "id: {}, sacnners: {}, blocks in queue: {},"
@@ -455,9 +453,6 @@ void ScannerContext::reschedule_scanner_ctx() {
     //todo(wb) rethinking is it better to mark current scan_context failed 
when submit failed many times?
     if (state.ok()) {
         _num_scheduling_ctx++;
-        if (_finish_dependency) {
-            _finish_dependency->block_finishing();
-        }
     } else {
         set_status_on_error(state, false);
     }
@@ -474,17 +469,12 @@ void 
ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
     // We have to decrease _num_running_scanners before schedule, otherwise
     // schedule does not woring due to _num_running_scanners.
     _num_running_scanners--;
-    if (_finish_dependency && _num_running_scanners == 0 && 
_num_scheduling_ctx == 0) {
-        _finish_dependency->set_ready_to_finish();
-    }
+    set_ready_to_finish();
 
     if (should_be_scheduled()) {
         auto state = _scanner_scheduler->submit(this);
         if (state.ok()) {
             _num_scheduling_ctx++;
-            if (_finish_dependency) {
-                _finish_dependency->block_finishing();
-            }
         } else {
             set_status_on_error(state, false);
         }
@@ -499,9 +489,7 @@ void 
ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
         (--_num_unfinished_scanners) == 0) {
         _dispose_coloate_blocks_not_in_queue();
         _is_finished = true;
-        if (_scanner_done_dependency) {
-            _scanner_done_dependency->set_ready_for_read();
-        }
+        _set_scanner_done();
         _blocks_queue_added_cv.notify_one();
     }
     _ctx_finish_cv.notify_one();
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 244aedf87a3..10b4775ceff 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -120,6 +120,8 @@ public:
 
     void inc_num_running_scanners(int32_t scanner_inc);
 
+    void set_ready_to_finish();
+
     int get_num_running_scanners() const { return _num_running_scanners; }
 
     void dec_num_scheduling_ctx();
@@ -185,6 +187,8 @@ private:
 protected:
     virtual void _dispose_coloate_blocks_not_in_queue() {}
 
+    void _set_scanner_done();
+
     RuntimeState* _state;
     VScanNode* _parent;
     pipeline::ScanLocalStateBase* _local_state;
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index be291828f0f..891161f0710 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -363,7 +363,7 @@ VDataStreamRecvr::VDataStreamRecvr(
         for (size_t i = 0; i < num_queues; i++) {
             _sender_to_local_channel_dependency[i] =
                     
pipeline::LocalExchangeChannelDependency::create_shared(_dest_node_id,
-                                                                            
_mem_available);
+                                                                            
_dest_node_id);
         }
     }
     _sender_queues.reserve(num_queues);
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp 
b/be/src/vec/sink/writer/async_result_writer.cpp
index 9f2f597494c..4ae7c3d3644 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -41,7 +41,6 @@ void 
AsyncResultWriter::set_dependency(pipeline::AsyncWriterDependency* dep,
                                        pipeline::FinishDependency* finish_dep) 
{
     _dependency = dep;
     _finish_dependency = finish_dep;
-    _finish_dependency->block_finishing();
 }
 
 Status AsyncResultWriter::sink(Block* block, bool eos) {
@@ -181,10 +180,10 @@ std::unique_ptr<Block> 
AsyncResultWriter::_get_free_block(doris::vectorized::Blo
     return b;
 }
 
-pipeline::WriteDependency* AsyncResultWriter::write_blocked_by() {
+pipeline::WriteDependency* 
AsyncResultWriter::write_blocked_by(pipeline::PipelineXTask* task) {
     std::lock_guard l(_m);
     DCHECK(_dependency != nullptr);
-    return _dependency->write_blocked_by();
+    return _dependency->write_blocked_by(task);
 }
 
 } // namespace vectorized
diff --git a/be/src/vec/sink/writer/async_result_writer.h 
b/be/src/vec/sink/writer/async_result_writer.h
index 780f8b506ef..75cc6529ba7 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -36,6 +36,7 @@ namespace pipeline {
 class AsyncWriterDependency;
 class WriteDependency;
 class FinishDependency;
+class PipelineXTask;
 
 } // namespace pipeline
 
@@ -79,7 +80,7 @@ public:
         return _data_queue_is_available() || _is_finished();
     }
 
-    pipeline::WriteDependency* write_blocked_by();
+    pipeline::WriteDependency* write_blocked_by(pipeline::PipelineXTask* task);
 
     [[nodiscard]] bool is_pending_finish() const { return 
!_writer_thread_closed; }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to