This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b3512d9423 [pipelineX](bug) Fix cancel timeout (#27396)
1b3512d9423 is described below

commit 1b3512d9423168f80b4d0f9b13fbe8fc8da87fde
Author: Gabriel <[email protected]>
AuthorDate: Wed Nov 22 22:31:34 2023 +0800

    [pipelineX](bug) Fix cancel timeout (#27396)
---
 be/src/pipeline/exec/aggregation_sink_operator.h   |  3 +-
 be/src/pipeline/exec/aggregation_source_operator.h |  3 +-
 be/src/pipeline/exec/analytic_sink_operator.h      |  4 +-
 be/src/pipeline/exec/analytic_source_operator.h    |  4 +-
 be/src/pipeline/exec/es_scan_operator.cpp          |  5 +-
 be/src/pipeline/exec/exchange_sink_operator.cpp    | 16 +++---
 be/src/pipeline/exec/exchange_sink_operator.h      | 22 +++++--
 be/src/pipeline/exec/exchange_source_operator.cpp  |  5 +-
 be/src/pipeline/exec/exchange_source_operator.h    |  4 +-
 be/src/pipeline/exec/file_scan_operator.cpp        |  5 +-
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  4 +-
 be/src/pipeline/exec/hashjoin_build_sink.h         |  8 +--
 be/src/pipeline/exec/hashjoin_probe_operator.h     |  4 +-
 be/src/pipeline/exec/meta_scan_operator.cpp        |  2 +-
 be/src/pipeline/exec/multi_cast_data_stream_sink.h |  4 +-
 .../pipeline/exec/multi_cast_data_stream_source.h  |  4 +-
 .../exec/nested_loop_join_build_operator.h         |  4 +-
 .../exec/nested_loop_join_probe_operator.h         |  4 +-
 be/src/pipeline/exec/olap_scan_operator.cpp        |  8 ++-
 .../pipeline/exec/partition_sort_sink_operator.cpp | 10 ++--
 .../pipeline/exec/partition_sort_sink_operator.h   |  4 +-
 .../pipeline/exec/partition_sort_source_operator.h | 22 ++++++-
 be/src/pipeline/exec/result_sink_operator.cpp      |  4 +-
 be/src/pipeline/exec/result_sink_operator.h        |  4 +-
 be/src/pipeline/exec/scan_operator.cpp             | 28 ++++-----
 be/src/pipeline/exec/scan_operator.h               | 10 ++--
 be/src/pipeline/exec/set_probe_sink_operator.h     |  4 +-
 be/src/pipeline/exec/set_sink_operator.h           |  3 +-
 be/src/pipeline/exec/set_source_operator.h         |  3 +-
 be/src/pipeline/exec/sort_sink_operator.h          |  3 +-
 be/src/pipeline/exec/sort_source_operator.h        |  3 +-
 be/src/pipeline/exec/union_sink_operator.h         |  4 +-
 be/src/pipeline/exec/union_source_operator.cpp     |  4 +-
 be/src/pipeline/exec/union_source_operator.h       |  5 +-
 be/src/pipeline/pipeline_task.h                    | 10 ++++
 be/src/pipeline/pipeline_x/dependency.cpp          | 28 ++++++---
 be/src/pipeline/pipeline_x/dependency.h            | 67 +++++++++++-----------
 .../local_exchange/local_exchange_sink_operator.h  |  4 +-
 .../local_exchange_source_operator.h               |  4 +-
 be/src/pipeline/pipeline_x/operator.cpp            | 51 +++++++---------
 be/src/pipeline/pipeline_x/operator.h              | 12 ++--
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  7 ++-
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     | 23 ++++----
 be/src/pipeline/pipeline_x/pipeline_x_task.h       |  7 +++
 be/src/runtime/fragment_mgr.cpp                    | 10 ++++
 be/src/runtime/query_context.cpp                   | 43 ++++++++++++++
 be/src/runtime/query_context.h                     | 18 +++---
 be/src/vec/runtime/vdata_stream_recvr.cpp          | 10 ++--
 48 files changed, 318 insertions(+), 200 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 3f1ce260365..2d16df2be25 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -48,7 +48,8 @@ public:
 class AggSinkDependency final : public Dependency {
 public:
     using SharedState = AggSharedState;
-    AggSinkDependency(int id, int node_id) : Dependency(id, node_id, 
"AggSinkDependency", true) {}
+    AggSinkDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "AggSinkDependency", true, query_ctx) {}
     ~AggSinkDependency() override = default;
 
     void set_ready() override {
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h 
b/be/src/pipeline/exec/aggregation_source_operator.h
index 79671fb9c74..9c6d3e0fd0d 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -51,7 +51,8 @@ public:
 class AggSourceDependency final : public Dependency {
 public:
     using SharedState = AggSharedState;
-    AggSourceDependency(int id, int node_id) : Dependency(id, node_id, 
"AggSourceDependency") {}
+    AggSourceDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "AggSourceDependency", query_ctx) {}
     ~AggSourceDependency() override = default;
 
     void block() override {
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index a90321f5e31..1e8152a28fa 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -48,8 +48,8 @@ public:
 class AnalyticSinkDependency final : public Dependency {
 public:
     using SharedState = AnalyticSharedState;
-    AnalyticSinkDependency(int id, int node_id)
-            : Dependency(id, node_id, "AnalyticSinkDependency", true) {}
+    AnalyticSinkDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "AnalyticSinkDependency", true, 
query_ctx) {}
     ~AnalyticSinkDependency() override = default;
 };
 
diff --git a/be/src/pipeline/exec/analytic_source_operator.h 
b/be/src/pipeline/exec/analytic_source_operator.h
index 0a741181f87..8b76fbfe263 100644
--- a/be/src/pipeline/exec/analytic_source_operator.h
+++ b/be/src/pipeline/exec/analytic_source_operator.h
@@ -49,8 +49,8 @@ public:
 class AnalyticSourceDependency final : public Dependency {
 public:
     using SharedState = AnalyticSharedState;
-    AnalyticSourceDependency(int id, int node_id)
-            : Dependency(id, node_id, "AnalyticSourceDependency") {}
+    AnalyticSourceDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "AnalyticSourceDependency", query_ctx) {}
     ~AnalyticSourceDependency() override = default;
 };
 
diff --git a/be/src/pipeline/exec/es_scan_operator.cpp 
b/be/src/pipeline/exec/es_scan_operator.cpp
index b9112917954..8567db90948 100644
--- a/be/src/pipeline/exec/es_scan_operator.cpp
+++ b/be/src/pipeline/exec/es_scan_operator.cpp
@@ -55,7 +55,7 @@ Status EsScanLocalState::_init_profile() {
 
 Status EsScanLocalState::_process_conjuncts() {
     RETURN_IF_ERROR(Base::_process_conjuncts());
-    if (Base::_scan_dependency->eos()) {
+    if (Base::_eos) {
         return Status::OK();
     }
 
@@ -66,7 +66,8 @@ Status EsScanLocalState::_process_conjuncts() {
 
 Status EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* 
scanners) {
     if (_scan_ranges.empty()) {
-        Base::_scan_dependency->set_eos();
+        _eos = true;
+        _scan_dependency->set_ready();
         return Status::OK();
     }
 
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 616a1bd76fe..f0e03596cc1 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -174,16 +174,16 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
 
     register_channels(_sink_buffer.get());
 
-    _exchange_sink_dependency =
-            AndDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
-    _queue_dependency =
-            ExchangeSinkQueueDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
+    _exchange_sink_dependency = AndDependency::create_shared(
+            _parent->operator_id(), _parent->node_id(), 
state->get_query_ctx());
+    _queue_dependency = ExchangeSinkQueueDependency::create_shared(
+            _parent->operator_id(), _parent->node_id(), 
state->get_query_ctx());
     _sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
     _exchange_sink_dependency->add_child(_queue_dependency);
     if ((p._part_type == TPartitionType::UNPARTITIONED || channels.size() == 
1) &&
         !only_local_exchange) {
-        _broadcast_dependency =
-                BroadcastDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
+        _broadcast_dependency = BroadcastDependency::create_shared(
+                _parent->operator_id(), _parent->node_id(), 
state->get_query_ctx());
         
_broadcast_dependency->set_available_block(config::num_broadcast_buffer);
         _broadcast_pb_blocks.reserve(config::num_broadcast_buffer);
         for (size_t i = 0; i < config::num_broadcast_buffer; i++) {
@@ -198,8 +198,8 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
         size_t dep_id = 0;
         _local_channels_dependency.resize(local_size);
         _wait_channel_timer.resize(local_size);
-        auto deps_for_channels =
-                AndDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
+        auto deps_for_channels = AndDependency::create_shared(
+                _parent->operator_id(), _parent->node_id(), 
state->get_query_ctx());
         for (auto channel : channels) {
             if (channel->is_local()) {
                 _local_channels_dependency[dep_id] = 
channel->get_local_channel_dependency();
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 89b024a6c6e..19a326d5c6e 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -67,18 +67,28 @@ private:
 class ExchangeSinkQueueDependency final : public Dependency {
 public:
     ENABLE_FACTORY_CREATOR(ExchangeSinkQueueDependency);
-    ExchangeSinkQueueDependency(int id, int node_id)
-            : Dependency(id, node_id, "ResultQueueDependency", true) {}
+    ExchangeSinkQueueDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "ResultQueueDependency", true, 
query_ctx) {}
     ~ExchangeSinkQueueDependency() override = default;
 };
 
 class BroadcastDependency final : public Dependency {
 public:
     ENABLE_FACTORY_CREATOR(BroadcastDependency);
-    BroadcastDependency(int id, int node_id)
-            : Dependency(id, node_id, "BroadcastDependency", true), 
_available_block(0) {}
+    BroadcastDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "BroadcastDependency", true, query_ctx),
+              _available_block(0) {}
     ~BroadcastDependency() override = default;
 
+    std::string debug_string(int indentation_level = 0) override {
+        fmt::memory_buffer debug_string_buffer;
+        fmt::format_to(debug_string_buffer,
+                       "{}{}: id={}, block task = {}, ready={}, 
_available_block = {}",
+                       std::string(indentation_level * 2, ' '), _name, 
_node_id,
+                       _blocked_task.size(), _ready, _available_block.load());
+        return fmt::to_string(debug_string_buffer);
+    }
+
     void set_available_block(int available_block) { _available_block = 
available_block; }
 
     void return_available_block() {
@@ -128,8 +138,8 @@ private:
 class LocalExchangeChannelDependency final : public Dependency {
 public:
     ENABLE_FACTORY_CREATOR(LocalExchangeChannelDependency);
-    LocalExchangeChannelDependency(int id, int node_id)
-            : Dependency(id, node_id, "LocalExchangeChannelDependency", true) 
{}
+    LocalExchangeChannelDependency(int id, int node_id, QueryContext* 
query_ctx)
+            : Dependency(id, node_id, "LocalExchangeChannelDependency", true, 
query_ctx) {}
     ~LocalExchangeChannelDependency() override = default;
     // TODO(gabriel): blocked by memory
 };
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 00cead5b2ca..3b630cfcfcf 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -51,13 +51,14 @@ Status ExchangeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
             state, p.input_row_desc(), state->fragment_instance_id(), 
p.node_id(), p.num_senders(),
             profile(), p.is_merging(), p.sub_plan_query_statistics_recvr());
-    source_dependency = AndDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
+    source_dependency = AndDependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                     state->get_query_ctx());
     const auto& queues = stream_recvr->sender_queues();
     deps.resize(queues.size());
     metrics.resize(queues.size());
     for (size_t i = 0; i < queues.size(); i++) {
         deps[i] = 
ExchangeDataDependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
-                                                        queues[i]);
+                                                        
state->get_query_ctx(), queues[i]);
         queues[i]->set_dependency(deps[i]);
         source_dependency->add_child(deps[i]);
     }
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index 4a2bae8298d..8e745def1ba 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -53,9 +53,9 @@ public:
 struct ExchangeDataDependency final : public Dependency {
 public:
     ENABLE_FACTORY_CREATOR(ExchangeDataDependency);
-    ExchangeDataDependency(int id, int node_id,
+    ExchangeDataDependency(int id, int node_id, QueryContext* query_ctx,
                            vectorized::VDataStreamRecvr::SenderQueue* 
sender_queue)
-            : Dependency(id, node_id, "DataDependency") {}
+            : Dependency(id, node_id, "DataDependency", query_ctx) {}
 };
 
 class ExchangeSourceOperatorX;
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index 369ad607c6f..819575211c9 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -32,7 +32,8 @@ namespace doris::pipeline {
 
 Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* 
scanners) {
     if (_scan_ranges.empty()) {
-        Base::_scan_dependency->set_eos();
+        _eos = true;
+        _scan_dependency->set_ready();
         return Status::OK();
     }
 
@@ -95,7 +96,7 @@ Status FileScanLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
 
 Status FileScanLocalState::_process_conjuncts() {
     RETURN_IF_ERROR(ScanLocalState<FileScanLocalState>::_process_conjuncts());
-    if (Base::_scan_dependency->eos()) {
+    if (Base::_eos) {
         return Status::OK();
     }
     // TODO: Push conjuncts down to reader.
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 53b20f53cd8..41b030b4e18 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -48,8 +48,8 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo
     RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info));
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
-    _shared_hash_table_dependency =
-            SharedHashTableDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
+    _shared_hash_table_dependency = SharedHashTableDependency::create_shared(
+            _parent->operator_id(), _parent->node_id(), 
state->get_query_ctx());
     auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
     _shared_state->join_op_variants = p._join_op_variants;
     if (p._is_broadcast_join && 
state->enable_share_hash_table_for_broadcast_join()) {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index e3f10a1feb0..b0618d59927 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -49,16 +49,16 @@ class HashJoinBuildSinkOperatorX;
 class SharedHashTableDependency final : public Dependency {
 public:
     ENABLE_FACTORY_CREATOR(SharedHashTableDependency);
-    SharedHashTableDependency(int id, int node_id)
-            : Dependency(id, node_id, "SharedHashTableDependency", true) {}
+    SharedHashTableDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "SharedHashTableDependency", true, 
query_ctx) {}
     ~SharedHashTableDependency() override = default;
 };
 
 class HashJoinBuildSinkDependency final : public Dependency {
 public:
     using SharedState = HashJoinSharedState;
-    HashJoinBuildSinkDependency(int id, int node_id)
-            : Dependency(id, node_id, "HashJoinBuildSinkDependency", true) {}
+    HashJoinBuildSinkDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "HashJoinBuildSinkDependency", true, 
query_ctx) {}
     ~HashJoinBuildSinkDependency() override = default;
 };
 
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 64f3cce2d7a..583bba1b006 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -64,8 +64,8 @@ using HashTableCtxVariants = std::variant<
 class HashJoinProbeDependency final : public Dependency {
 public:
     using SharedState = HashJoinSharedState;
-    HashJoinProbeDependency(int id, int node_id)
-            : Dependency(id, node_id, "HashJoinProbeDependency") {}
+    HashJoinProbeDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "HashJoinProbeDependency", query_ctx) {}
     ~HashJoinProbeDependency() override = default;
 };
 
diff --git a/be/src/pipeline/exec/meta_scan_operator.cpp 
b/be/src/pipeline/exec/meta_scan_operator.cpp
index 2de19bb2ced..749fbcf333a 100644
--- a/be/src/pipeline/exec/meta_scan_operator.cpp
+++ b/be/src/pipeline/exec/meta_scan_operator.cpp
@@ -22,7 +22,7 @@
 namespace doris::pipeline {
 
 Status MetaScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* 
scanners) {
-    if (Base::_scan_dependency->eos()) {
+    if (Base::_eos) {
         return Status::OK();
     }
 
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h 
b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
index c9b3dcb4798..a2ad07e5297 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
@@ -44,8 +44,8 @@ public:
 class MultiCastSinkDependency final : public Dependency {
 public:
     using SharedState = MultiCastSharedState;
-    MultiCastSinkDependency(int id, int node_id)
-            : Dependency(id, node_id, "MultiCastSinkDependency", true) {}
+    MultiCastSinkDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "MultiCastSinkDependency", true, 
query_ctx) {}
     ~MultiCastSinkDependency() override = default;
 };
 
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h 
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 6c3a4cfbbc0..86034a76ce7 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -95,8 +95,8 @@ private:
 class MultiCastSourceDependency final : public Dependency {
 public:
     using SharedState = MultiCastSharedState;
-    MultiCastSourceDependency(int id, int node_id)
-            : Dependency(id, node_id, "MultiCastSourceDependency") {}
+    MultiCastSourceDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "MultiCastSourceDependency", query_ctx) 
{}
     ~MultiCastSourceDependency() override = default;
 };
 
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h 
b/be/src/pipeline/exec/nested_loop_join_build_operator.h
index a02fb8ec1a1..0097b75c0ab 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h
@@ -47,8 +47,8 @@ public:
 class NestedLoopJoinBuildSinkDependency final : public Dependency {
 public:
     using SharedState = NestedLoopJoinSharedState;
-    NestedLoopJoinBuildSinkDependency(int id, int node_id)
-            : Dependency(id, node_id, "NestedLoopJoinBuildSinkDependency", 
true) {}
+    NestedLoopJoinBuildSinkDependency(int id, int node_id, QueryContext* 
query_ctx)
+            : Dependency(id, node_id, "NestedLoopJoinBuildSinkDependency", 
true, query_ctx) {}
     ~NestedLoopJoinBuildSinkDependency() override = default;
 };
 
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h 
b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
index 34f123ba32a..d7a9b54e897 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
@@ -54,8 +54,8 @@ public:
 class NestedLoopJoinProbeDependency final : public Dependency {
 public:
     using SharedState = NestedLoopJoinSharedState;
-    NestedLoopJoinProbeDependency(int id, int node_id)
-            : Dependency(id, node_id, "NestedLoopJoinProbeDependency") {}
+    NestedLoopJoinProbeDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "NestedLoopJoinProbeDependency", 
query_ctx) {}
     ~NestedLoopJoinProbeDependency() override = default;
 };
 
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index acda79bdfb3..059f961501d 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -133,7 +133,7 @@ Status OlapScanLocalState::_init_profile() {
 Status OlapScanLocalState::_process_conjuncts() {
     SCOPED_TIMER(_process_conjunct_timer);
     RETURN_IF_ERROR(ScanLocalState::_process_conjuncts());
-    if (ScanLocalState::_scan_dependency->eos()) {
+    if (ScanLocalState::_eos) {
         return Status::OK();
     }
     RETURN_IF_ERROR(_build_key_ranges_and_filters());
@@ -213,7 +213,8 @@ bool OlapScanLocalState::_storage_no_merge() {
 
 Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* 
scanners) {
     if (_scan_ranges.empty()) {
-        ScanLocalState::_scan_dependency->set_eos();
+        _eos = true;
+        _scan_dependency->set_ready();
         return Status::OK();
     }
     SCOPED_TIMER(_scanner_init_timer);
@@ -408,7 +409,8 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() {
                     iter->second));
         }
         if (eos) {
-            ScanLocalState::_scan_dependency->set_eos();
+            _eos = true;
+            _scan_dependency->set_ready();
         }
 
         for (auto& iter : _colname_to_value_range) {
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp 
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 168d499e661..d7a2bc4c920 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -18,11 +18,10 @@
 #include "partition_sort_sink_operator.h"
 
 #include "common/status.h"
+#include "partition_sort_source_operator.h"
 #include "vec/common/hash_table/hash.h"
 
-namespace doris {
-
-namespace pipeline {
+namespace doris::pipeline {
 
 OperatorPtr PartitionSortSinkOperatorBuilder::build_operator() {
     return std::make_shared<PartitionSortSinkOperator>(this, _node);
@@ -154,7 +153,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
 
         COUNTER_SET(local_state._hash_table_size_counter, 
int64_t(local_state._num_partition));
         //so all data from child have sink completed
-        local_state._dependency->set_eos();
+        
((PartitionSortSourceDependency*)local_state._shared_state->source_dep)->set_always_ready();
     }
 
     return Status::OK();
@@ -291,5 +290,4 @@ void PartitionSortSinkLocalState::_init_hash_method() {
     }
 }
 
-} // namespace pipeline
-} // namespace doris
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h 
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index a77df27cacf..dbfb7ad5d0e 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -53,8 +53,8 @@ public:
 class PartitionSortSinkDependency final : public Dependency {
 public:
     using SharedState = PartitionSortNodeSharedState;
-    PartitionSortSinkDependency(int id, int node_id)
-            : Dependency(id, node_id, "PartitionSortSinkDependency", true) {}
+    PartitionSortSinkDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "PartitionSortSinkDependency", true, 
query_ctx) {}
     ~PartitionSortSinkDependency() override = default;
 };
 
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h 
b/be/src/pipeline/exec/partition_sort_source_operator.h
index 0ef89c5068a..df22bde6365 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.h
+++ b/be/src/pipeline/exec/partition_sort_source_operator.h
@@ -52,9 +52,27 @@ public:
 class PartitionSortSourceDependency final : public Dependency {
 public:
     using SharedState = PartitionSortNodeSharedState;
-    PartitionSortSourceDependency(int id, int node_id)
-            : Dependency(id, node_id, "PartitionSortSourceDependency") {}
+    PartitionSortSourceDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "PartitionSortSourceDependency", 
query_ctx) {}
     ~PartitionSortSourceDependency() override = default;
+
+    void block() override {
+        if (_always_ready) {
+            return;
+        }
+        Dependency::block();
+    }
+
+    void set_always_ready() {
+        if (_always_ready) {
+            return;
+        }
+        _always_ready = true;
+        set_ready();
+    }
+
+private:
+    std::atomic<bool> _always_ready {false};
 };
 
 class PartitionSortSourceOperatorX;
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index 7f6d1673e95..8c314b995bb 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -62,8 +62,8 @@ Status ResultSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info)
     RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
             state->fragment_instance_id(), 
vectorized::RESULT_SINK_BUFFER_SIZE, &_sender, true,
             state->execution_timeout()));
-    _result_sink_dependency =
-            ResultSinkDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
+    _result_sink_dependency = ResultSinkDependency::create_shared(
+            _parent->operator_id(), _parent->node_id(), 
state->get_query_ctx());
     _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", 
TUnit::UNIT, 1);
     _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", 
TUnit::UNIT, 1);
     
((PipBufferControlBlock*)_sender.get())->set_dependency(_result_sink_dependency);
diff --git a/be/src/pipeline/exec/result_sink_operator.h 
b/be/src/pipeline/exec/result_sink_operator.h
index e7af819e621..eedd2d4c053 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -46,8 +46,8 @@ public:
 class ResultSinkDependency final : public Dependency {
 public:
     ENABLE_FACTORY_CREATOR(ResultSinkDependency);
-    ResultSinkDependency(int id, int node_id)
-            : Dependency(id, node_id, "ResultSinkDependency", true) {}
+    ResultSinkDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "ResultSinkDependency", true, query_ctx) 
{}
     ~ResultSinkDependency() override = default;
 };
 
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 15b77af6ff9..93c1eb9f982 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -123,7 +123,8 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, 
LocalStateInfo& info)
     RETURN_IF_ERROR(RuntimeFilterConsumer::init(state));
 
     _scan_dependency = 
ScanDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(),
-                                                     
PipelineXLocalState<>::_parent->node_id());
+                                                     
PipelineXLocalState<>::_parent->node_id(),
+                                                     state->get_query_ctx());
 
     auto& p = _parent->cast<typename Derived::Parent>();
     set_scan_ranges(state, info.scan_ranges);
@@ -147,10 +148,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, 
LocalStateInfo& info)
     RETURN_IF_ERROR(_init_profile());
     // if you want to add some profile in scan node, even it have not new 
VScanner object
     // could add here, not in the _init_profile() function
-    _get_next_timer = ADD_TIMER(_runtime_profile, "GetNextTime");
-
     _prepare_rf_timer(_runtime_profile.get());
-    _alloc_resource_timer = ADD_TIMER(_runtime_profile, 
"AllocateResourceTime");
 
     static const std::string timer_name = "WaitForDependencyTime";
     _wait_for_dependency_timer = ADD_TIMER(_runtime_profile, timer_name);
@@ -171,10 +169,10 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) 
{
     RETURN_IF_ERROR(_acquire_runtime_filter());
     RETURN_IF_ERROR(_process_conjuncts());
 
-    auto status = _scan_dependency->eos() ? Status::OK() : _prepare_scanners();
+    auto status = _eos ? Status::OK() : _prepare_scanners();
     if (_scanner_ctx) {
         _finish_dependency->block();
-        DCHECK(!_scan_dependency->eos() && _num_scanners->value() > 0);
+        DCHECK(!_eos && _num_scanners->value() > 0);
         RETURN_IF_ERROR(_scanner_ctx->init());
         
RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get()));
     }
@@ -262,7 +260,8 @@ Status ScanLocalState<Derived>::_normalize_conjuncts() {
         std::visit(
                 [&](auto&& range) {
                     if (range.is_empty_value_range()) {
-                        _scan_dependency->set_eos();
+                        _eos = true;
+                        _scan_dependency->set_ready();
                     }
                 },
                 it.second.second);
@@ -561,7 +560,8 @@ Status 
ScanLocalState<Derived>::_eval_const_conjuncts(vectorized::VExpr* vexpr,
             constant_val = 
const_cast<char*>(const_column->get_data_at(0).data);
             if (constant_val == nullptr || 
!*reinterpret_cast<bool*>(constant_val)) {
                 *pdt = vectorized::VScanNode::PushDownType::ACCEPTABLE;
-                _scan_dependency->set_eos();
+                _eos = true;
+                _scan_dependency->set_ready();
             }
         } else if (const vectorized::ColumnVector<vectorized::UInt8>* 
bool_column =
                            
check_and_get_column<vectorized::ColumnVector<vectorized::UInt8>>(
@@ -578,7 +578,8 @@ Status 
ScanLocalState<Derived>::_eval_const_conjuncts(vectorized::VExpr* vexpr,
                 constant_val = 
const_cast<char*>(bool_column->get_data_at(0).data);
                 if (constant_val == nullptr || 
!*reinterpret_cast<bool*>(constant_val)) {
                     *pdt = vectorized::VScanNode::PushDownType::ACCEPTABLE;
-                    _scan_dependency->set_eos();
+                    _eos = true;
+                    _scan_dependency->set_ready();
                 }
             } else {
                 LOG(WARNING) << "Constant predicate in scan node should return 
a bool column with "
@@ -775,7 +776,8 @@ Status 
ScanLocalState<Derived>::_normalize_not_in_and_not_eq_predicate(
         HybridSetBase::IteratorBase* iter = state->hybrid_set->begin();
         auto fn_name = std::string("");
         if (!is_fixed_range && state->null_in_set) {
-            _scan_dependency->set_eos();
+            _eos = true;
+            _scan_dependency->set_ready();
         }
         while (iter->has_next()) {
             // column not in (nullptr) is always true
@@ -1168,7 +1170,8 @@ Status ScanLocalState<Derived>::_prepare_scanners() {
     std::list<vectorized::VScannerSPtr> scanners;
     RETURN_IF_ERROR(_init_scanners(&scanners));
     if (scanners.empty()) {
-        _scan_dependency->set_eos();
+        _eos = true;
+        _scan_dependency->set_ready();
     } else {
         COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
         RETURN_IF_ERROR(_start_scanners(scanners));
@@ -1350,7 +1353,6 @@ template <typename LocalStateType>
 Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, 
vectorized::Block* block,
                                                 SourceState& source_state) {
     auto& local_state = get_local_state(state);
-    SCOPED_TIMER(local_state._get_next_timer);
     SCOPED_TIMER(local_state.exec_time_counter());
     // in inverted index apply logic, in order to optimize query performance,
     // we built some temporary columns into block, these columns only used in 
scan node level,
@@ -1376,7 +1378,7 @@ Status 
ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
         }
     }
 
-    if (local_state._scan_dependency->eos()) {
+    if (local_state._eos) {
         source_state = SourceState::FINISHED;
         return Status::OK();
     }
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 06a6c611294..23adfffc3a6 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -59,8 +59,8 @@ public:
 class ScanDependency final : public Dependency {
 public:
     ENABLE_FACTORY_CREATOR(ScanDependency);
-    ScanDependency(int id, int node_id)
-            : Dependency(id, node_id, "ScanDependency"), _scanner_ctx(nullptr) 
{}
+    ScanDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "ScanDependency", query_ctx), 
_scanner_ctx(nullptr) {}
 
     // TODO(gabriel):
     [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override {
@@ -71,7 +71,7 @@ public:
         return Dependency::is_blocked_by(task);
     }
 
-    bool push_to_blocking_queue() override { return true; }
+    bool push_to_blocking_queue() const override { return true; }
 
     void block() override {
         if (_scanner_done) {
@@ -384,9 +384,7 @@ protected:
     // "_colname_to_value_range" and in "_not_in_value_ranges"
     std::vector<ColumnValueRangeType> _not_in_value_ranges;
 
-    RuntimeProfile::Counter* _get_next_timer = nullptr;
-    RuntimeProfile::Counter* _alloc_resource_timer = nullptr;
-    RuntimeProfile::Counter* _acquire_runtime_filter_timer = nullptr;
+    bool _eos = false;
 
     doris::Mutex _block_lock;
 };
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h 
b/be/src/pipeline/exec/set_probe_sink_operator.h
index cd1dbd6267a..89a2ab3bb38 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -70,8 +70,8 @@ private:
 class SetProbeSinkDependency final : public Dependency {
 public:
     using SharedState = SetSharedState;
-    SetProbeSinkDependency(int id, int node_id)
-            : Dependency(id, node_id, "SetProbeSinkDependency", true) {}
+    SetProbeSinkDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "SetProbeSinkDependency", true, 
query_ctx) {}
     ~SetProbeSinkDependency() override = default;
 
     void set_cur_child_id(int id) {
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index e318c354c88..b8ca789b780 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -63,7 +63,8 @@ private:
 class SetSinkDependency final : public Dependency {
 public:
     using SharedState = SetSharedState;
-    SetSinkDependency(int id, int node_id) : Dependency(id, node_id, 
"SetSinkDependency", true) {}
+    SetSinkDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "SetSinkDependency", true, query_ctx) {}
     ~SetSinkDependency() override = default;
 
     void set_cur_child_id(int id) {
diff --git a/be/src/pipeline/exec/set_source_operator.h 
b/be/src/pipeline/exec/set_source_operator.h
index cc7275444c6..44800f23f41 100644
--- a/be/src/pipeline/exec/set_source_operator.h
+++ b/be/src/pipeline/exec/set_source_operator.h
@@ -56,7 +56,8 @@ public:
 class SetSourceDependency final : public Dependency {
 public:
     using SharedState = SetSharedState;
-    SetSourceDependency(int id, int node_id) : Dependency(id, node_id, 
"SetSourceDependency") {}
+    SetSourceDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "SetSourceDependency", query_ctx) {}
     ~SetSourceDependency() override = default;
 };
 
diff --git a/be/src/pipeline/exec/sort_sink_operator.h 
b/be/src/pipeline/exec/sort_sink_operator.h
index 67305aad691..8730780b54c 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -48,7 +48,8 @@ public:
 class SortSinkDependency final : public Dependency {
 public:
     using SharedState = SortSharedState;
-    SortSinkDependency(int id, int node_id) : Dependency(id, node_id, 
"SortSinkDependency", true) {}
+    SortSinkDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "SortSinkDependency", true, query_ctx) {}
     ~SortSinkDependency() override = default;
 };
 
diff --git a/be/src/pipeline/exec/sort_source_operator.h 
b/be/src/pipeline/exec/sort_source_operator.h
index 3b615a58be2..efbddfd1050 100644
--- a/be/src/pipeline/exec/sort_source_operator.h
+++ b/be/src/pipeline/exec/sort_source_operator.h
@@ -48,7 +48,8 @@ public:
 class SortSourceDependency final : public Dependency {
 public:
     using SharedState = SortSharedState;
-    SortSourceDependency(int id, int node_id) : Dependency(id, node_id, 
"SortSourceDependency") {}
+    SortSourceDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "SortSourceDependency", query_ctx) {}
     ~SortSourceDependency() override = default;
 };
 
diff --git a/be/src/pipeline/exec/union_sink_operator.h 
b/be/src/pipeline/exec/union_sink_operator.h
index e135566417e..f3031dd1015 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -69,8 +69,8 @@ private:
 class UnionSinkDependency final : public Dependency {
 public:
     using SharedState = UnionSharedState;
-    UnionSinkDependency(int id, int node_id)
-            : Dependency(id, node_id, "UnionSinkDependency", true) {}
+    UnionSinkDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "UnionSinkDependency", true, query_ctx) 
{}
     ~UnionSinkDependency() override = default;
     void block() override {}
 };
diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index fc1dd124a0e..d824f9db7a0 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -115,8 +115,8 @@ Status UnionSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
         DCHECK(deps.size() == 1);
         DCHECK(deps.front() == nullptr);
         //child_count == 0 , we need to creat a  UnionDependency
-        deps.front() =
-                
std::make_shared<UnionSourceDependency>(_parent->operator_id(), 
_parent->node_id());
+        deps.front() = std::make_shared<UnionSourceDependency>(
+                _parent->operator_id(), _parent->node_id(), 
state->get_query_ctx());
         ((UnionSourceDependency*)deps.front().get())->set_shared_state(ss);
     }
     RETURN_IF_ERROR(Base::init(state, info));
diff --git a/be/src/pipeline/exec/union_source_operator.h 
b/be/src/pipeline/exec/union_source_operator.h
index 0c846b828f9..8b7060884e9 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -72,7 +72,8 @@ private:
 class UnionSourceDependency final : public Dependency {
 public:
     using SharedState = UnionSharedState;
-    UnionSourceDependency(int id, int node_id) : Dependency(id, node_id, 
"UnionSourceDependency") {}
+    UnionSourceDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "UnionSourceDependency", query_ctx) {}
     ~UnionSourceDependency() override = default;
 
     [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override {
@@ -85,7 +86,7 @@ public:
         }
         return this;
     }
-    bool push_to_blocking_queue() override { return true; }
+    bool push_to_blocking_queue() const override { return true; }
     void block() override {}
 };
 
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index a302dc7c34d..0b26c9d215d 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -104,6 +104,16 @@ inline const char* get_state_name(PipelineTaskState idx) {
     __builtin_unreachable();
 }
 
+inline bool is_final_state(PipelineTaskState idx) {
+    switch (idx) {
+    case PipelineTaskState::FINISHED:
+    case PipelineTaskState::CANCELED:
+        return true;
+    default:
+        return false;
+    }
+}
+
 class TaskQueue;
 class PriorityTaskQueue;
 
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp 
b/be/src/pipeline/pipeline_x/dependency.cpp
index c149da54fde..00a7d012c98 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -28,7 +28,7 @@
 
 namespace doris::pipeline {
 
-void Dependency::add_block_task(PipelineXTask* task) {
+void Dependency::_add_block_task(PipelineXTask* task) {
     DCHECK(_blocked_task.empty() || _blocked_task[_blocked_task.size() - 1] != 
task)
             << "Duplicate task: " << task->debug_string();
     _blocked_task.push_back(task);
@@ -54,16 +54,19 @@ void Dependency::set_ready() {
 }
 
 Dependency* Dependency::is_blocked_by(PipelineXTask* task) {
-    if (config::enable_fuzzy_mode && !_ready && 
_should_log(_watcher.elapsed_time())) {
-        LOG(WARNING) << "========Dependency may be blocked by some reasons: " 
<< name() << " "
-                     << _node_id << " block tasks: " << _blocked_task.size()
-                     << "task: " << (task ? 
task->fragment_context()->debug_string() : "");
+    std::unique_lock<std::mutex> lc(_task_lock);
+    auto ready = _ready.load() || _is_cancelled();
+    if (!ready && !push_to_blocking_queue() && task) {
+        _add_block_task(task);
     }
+    return ready ? nullptr : this;
+}
 
+Dependency* FinishDependency::is_blocked_by(PipelineXTask* task) {
     std::unique_lock<std::mutex> lc(_task_lock);
     auto ready = _ready.load();
     if (!ready && !push_to_blocking_queue() && task) {
-        add_block_task(task);
+        _add_block_task(task);
     }
     return ready ? nullptr : this;
 }
@@ -73,9 +76,9 @@ Dependency* 
RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
         return nullptr;
     }
     std::unique_lock<std::mutex> lc(_task_lock);
-    if (*_blocked_by_rf) {
+    if (*_blocked_by_rf && !_is_cancelled()) {
         if (LIKELY(task)) {
-            add_block_task(task);
+            _add_block_task(task);
         }
         return this;
     }
@@ -90,6 +93,15 @@ std::string Dependency::debug_string(int indentation_level) {
     return fmt::to_string(debug_string_buffer);
 }
 
+std::string RuntimeFilterDependency::debug_string(int indentation_level) {
+    fmt::memory_buffer debug_string_buffer;
+    fmt::format_to(debug_string_buffer,
+                   "{}{}: id={}, block task = {}, ready={}, _filters = {}, 
_blocked_by_rf = {}",
+                   std::string(indentation_level * 2, ' '), _name, _node_id, 
_blocked_task.size(),
+                   _ready, _filters.load(), _blocked_by_rf ? 
_blocked_by_rf->load() : false);
+    return fmt::to_string(debug_string_buffer);
+}
+
 std::string AndDependency::debug_string(int indentation_level) {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer, "{}{}: id={}, children=[",
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 662e789d65b..13ae1cd9d7d 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -61,30 +61,31 @@ struct BasicSharedState {
 
 class Dependency : public std::enable_shared_from_this<Dependency> {
 public:
-    Dependency(int id, int node_id, std::string name)
+    Dependency(int id, int node_id, std::string name, QueryContext* query_ctx)
             : _id(id),
               _node_id(node_id),
               _name(std::move(name)),
               _is_write_dependency(false),
-              _ready(false) {}
-    Dependency(int id, int node_id, std::string name, bool ready)
+              _ready(false),
+              _query_ctx(query_ctx) {}
+    Dependency(int id, int node_id, std::string name, bool ready, 
QueryContext* query_ctx)
             : _id(id),
               _node_id(node_id),
               _name(std::move(name)),
               _is_write_dependency(true),
-              _ready(ready) {}
+              _ready(ready),
+              _query_ctx(query_ctx) {}
     virtual ~Dependency() = default;
 
     [[nodiscard]] int id() const { return _id; }
     [[nodiscard]] virtual std::string name() const { return _name; }
-    void set_parent(std::weak_ptr<Dependency> parent) { _parent = parent; }
     void add_child(std::shared_ptr<Dependency> child) { 
_children.push_back(child); }
     std::shared_ptr<BasicSharedState> shared_state() { return _shared_state; }
     void set_shared_state(std::shared_ptr<BasicSharedState> shared_state) {
         _shared_state = shared_state;
     }
     virtual std::string debug_string(int indentation_level = 0);
-    virtual bool push_to_blocking_queue() { return false; }
+    virtual bool push_to_blocking_queue() const { return false; }
 
     // Start the watcher. We use it to count how long this dependency block 
the current pipeline task.
     void start_watcher() {
@@ -104,26 +105,9 @@ public:
         DCHECK(_shared_state->source_dep != nullptr) << debug_string();
         _shared_state->source_dep->set_ready();
     }
-    void set_eos() {
-        if (_eos) {
-            return;
-        }
-        _eos = true;
-        set_ready();
-        if (_is_write_dependency && _shared_state->source_dep != nullptr) {
-            _shared_state->source_dep->set_eos();
-        }
-    }
-    bool eos() const { return _eos.load(); }
 
     // Notify downstream pipeline tasks this dependency is blocked.
-    virtual void block() {
-        if (_eos) {
-            return;
-        }
-        _ready = false;
-    }
-    void add_block_task(PipelineXTask* task);
+    virtual void block() { _ready = false; }
 
 protected:
     bool _should_log(uint64_t cur_time) {
@@ -136,14 +120,19 @@ protected:
         _last_log_time = cur_time;
         return true;
     }
+    void _add_block_task(PipelineXTask* task);
+    bool _is_cancelled() const {
+        return push_to_blocking_queue() ? false : _query_ctx->is_cancelled();
+    }
 
     const int _id;
     const int _node_id;
     const std::string _name;
     const bool _is_write_dependency;
+    std::atomic<bool> _ready;
+    const QueryContext* _query_ctx;
 
     std::shared_ptr<BasicSharedState> _shared_state {nullptr};
-    std::atomic<bool> _ready;
     MonotonicStopWatch _watcher;
     std::weak_ptr<Dependency> _parent;
     std::list<std::shared_ptr<Dependency>> _children;
@@ -151,7 +140,6 @@ protected:
     uint64_t _last_log_time = 0;
     std::mutex _task_lock;
     std::vector<PipelineXTask*> _blocked_task;
-    std::atomic<bool> _eos {false};
 };
 
 struct FakeSharedState : public BasicSharedState {};
@@ -159,11 +147,21 @@ struct FakeSharedState : public BasicSharedState {};
 struct FakeDependency final : public Dependency {
 public:
     using SharedState = FakeSharedState;
-    FakeDependency(int id, int node_id) : Dependency(id, node_id, 
"FakeDependency") {}
+    FakeDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "FakeDependency", query_ctx) {}
 
     [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override { 
return nullptr; }
 };
 
+struct FinishDependency final : public Dependency {
+public:
+    using SharedState = FakeSharedState;
+    FinishDependency(int id, int node_id, std::string name, QueryContext* 
query_ctx)
+            : Dependency(id, node_id, name, true, query_ctx) {}
+
+    [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override;
+};
+
 class RuntimeFilterDependency;
 class RuntimeFilterTimer {
 public:
@@ -200,15 +198,17 @@ private:
 
 class RuntimeFilterDependency final : public Dependency {
 public:
-    RuntimeFilterDependency(int id, int node_id, std::string name)
-            : Dependency(id, node_id, name) {}
-    Dependency* is_blocked_by(PipelineXTask* task);
+    RuntimeFilterDependency(int id, int node_id, std::string name, 
QueryContext* query_ctx)
+            : Dependency(id, node_id, name, query_ctx) {}
+    Dependency* is_blocked_by(PipelineXTask* task) override;
     void add_filters(IRuntimeFilter* runtime_filter);
     void sub_filters();
     void set_blocked_by_rf(std::shared_ptr<std::atomic_bool> blocked_by_rf) {
         _blocked_by_rf = blocked_by_rf;
     }
 
+    std::string debug_string(int indentation_level = 0) override;
+
 protected:
     std::atomic_int _filters;
     std::shared_ptr<std::atomic_bool> _blocked_by_rf;
@@ -218,7 +218,8 @@ class AndDependency final : public Dependency {
 public:
     using SharedState = FakeSharedState;
     ENABLE_FACTORY_CREATOR(AndDependency);
-    AndDependency(int id, int node_id) : Dependency(id, node_id, 
"AndDependency") {}
+    AndDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "AndDependency", query_ctx) {}
 
     [[nodiscard]] std::string name() const override {
         fmt::memory_buffer debug_string_buffer;
@@ -371,8 +372,8 @@ class AsyncWriterDependency final : public Dependency {
 public:
     using SharedState = FakeSharedState;
     ENABLE_FACTORY_CREATOR(AsyncWriterDependency);
-    AsyncWriterDependency(int id, int node_id)
-            : Dependency(id, node_id, "AsyncWriterDependency", true) {}
+    AsyncWriterDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "AsyncWriterDependency", true, 
query_ctx) {}
     ~AsyncWriterDependency() override = default;
 };
 
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index c6c28fbe8d2..b6ce3fbeb9e 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -25,8 +25,8 @@ namespace doris::pipeline {
 struct LocalExchangeSinkDependency final : public Dependency {
 public:
     using SharedState = LocalExchangeSharedState;
-    LocalExchangeSinkDependency(int id, int node_id)
-            : Dependency(id, node_id, "LocalExchangeSinkDependency", true) {}
+    LocalExchangeSinkDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "LocalExchangeSinkDependency", true, 
query_ctx) {}
     ~LocalExchangeSinkDependency() override = default;
 };
 
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
index dfc89a86c9e..ebf18d9a249 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
@@ -25,8 +25,8 @@ namespace doris::pipeline {
 struct LocalExchangeSourceDependency final : public Dependency {
 public:
     using SharedState = LocalExchangeSharedState;
-    LocalExchangeSourceDependency(int id, int node_id)
-            : Dependency(id, node_id, "LocalExchangeSourceDependency") {}
+    LocalExchangeSourceDependency(int id, int node_id, QueryContext* query_ctx)
+            : Dependency(id, node_id, "LocalExchangeSourceDependency", 
query_ctx) {}
     ~LocalExchangeSourceDependency() override = default;
 };
 
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 4e3fec7abbf..050b198a22e 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -72,18 +72,12 @@
 
 namespace doris::pipeline {
 
-std::string PipelineXLocalStateBase::debug_string(int indentation_level) const 
{
-    return _parent->debug_string(indentation_level);
-}
-
 template <typename DependencyType>
 std::string PipelineXLocalState<DependencyType>::debug_string(int 
indentation_level) const {
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer, "{}",
-                   PipelineXLocalStateBase::debug_string(indentation_level));
+    fmt::format_to(debug_string_buffer, "{}", 
_parent->debug_string(indentation_level));
     if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
-        fmt::format_to(debug_string_buffer, "\nDependency: \n {}",
-                       _dependency->debug_string(indentation_level + 1));
+        fmt::format_to(debug_string_buffer, " Dependency: {}", 
_dependency->debug_string());
     }
     return fmt::to_string(debug_string_buffer);
 }
@@ -91,12 +85,9 @@ std::string 
PipelineXLocalState<DependencyType>::debug_string(int indentation_le
 template <typename DependencyType>
 std::string PipelineXSinkLocalState<DependencyType>::debug_string(int 
indentation_level) const {
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer, "{}",
-                   
PipelineXSinkLocalStateBase::debug_string(indentation_level));
+    fmt::format_to(debug_string_buffer, "{}", 
_parent->debug_string(indentation_level));
     if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
-        fmt::format_to(debug_string_buffer, "\n{}Dependency: \n {}",
-                       std::string(indentation_level * 2, ' '),
-                       _dependency->debug_string(indentation_level + 1));
+        fmt::format_to(debug_string_buffer, ", Dependency: {}", 
_dependency->debug_string());
     }
     return fmt::to_string(debug_string_buffer);
 }
@@ -245,10 +236,6 @@ std::string DataSinkOperatorXBase::debug_string(int 
indentation_level) const {
     return fmt::to_string(debug_string_buffer);
 }
 
-std::string PipelineXSinkLocalStateBase::debug_string(int indentation_level) 
const {
-    return _parent->debug_string(indentation_level);
-}
-
 std::string DataSinkOperatorXBase::debug_string(RuntimeState* state, int 
indentation_level) const {
     return 
state->get_sink_local_state(operator_id())->debug_string(indentation_level);
 }
@@ -294,7 +281,8 @@ template <>
 inline constexpr bool NeedToCreate<LocalExchangeSharedState> = false;
 
 template <typename LocalStateType>
-void DataSinkOperatorX<LocalStateType>::get_dependency(vector<DependencySPtr>& 
dependency) {
+void DataSinkOperatorX<LocalStateType>::get_dependency(vector<DependencySPtr>& 
dependency,
+                                                       QueryContext* ctx) {
     std::shared_ptr<typename LocalStateType::DependencyType::SharedState> ss = 
nullptr;
     if constexpr (NeedToCreate<typename 
LocalStateType::DependencyType::SharedState>) {
         ss.reset(new typename LocalStateType::DependencyType::SharedState());
@@ -302,8 +290,8 @@ void 
DataSinkOperatorX<LocalStateType>::get_dependency(vector<DependencySPtr>& d
     if constexpr (!std::is_same_v<typename LocalStateType::DependencyType, 
FakeDependency>) {
         auto& dests = dests_id();
         for (auto& dest_id : dests) {
-            dependency.push_back(
-                    std::make_shared<typename 
LocalStateType::DependencyType>(dest_id, _node_id));
+            dependency.push_back(std::make_shared<typename 
LocalStateType::DependencyType>(
+                    dest_id, _node_id, ctx));
             dependency.back()->set_shared_state(ss);
         }
     } else {
@@ -312,8 +300,8 @@ void 
DataSinkOperatorX<LocalStateType>::get_dependency(vector<DependencySPtr>& d
 }
 
 template <typename LocalStateType>
-DependencySPtr OperatorX<LocalStateType>::get_dependency() {
-    return std::make_shared<typename 
LocalStateType::DependencyType>(_operator_id, _node_id);
+DependencySPtr OperatorX<LocalStateType>::get_dependency(QueryContext* ctx) {
+    return std::make_shared<typename 
LocalStateType::DependencyType>(_operator_id, _node_id, ctx);
 }
 
 template <typename LocalStateType>
@@ -328,8 +316,9 @@ 
PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase*
                                                          RuntimeState* state)
         : _parent(parent),
           _state(state),
-          _finish_dependency(new Dependency(parent->operator_id(), 
parent->node_id(),
-                                            parent->get_name() + 
"_FINISH_DEPENDENCY", true)) {}
+          _finish_dependency(new FinishDependency(parent->operator_id(), 
parent->node_id(),
+                                                  parent->get_name() + 
"_FINISH_DEPENDENCY",
+                                                  state->get_query_ctx())) {}
 
 PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, 
OperatorXBase* parent)
         : _num_rows_returned(0),
@@ -337,10 +326,12 @@ 
PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXB
           _peak_memory_usage_counter(nullptr),
           _parent(parent),
           _state(state),
-          _finish_dependency(new Dependency(parent->operator_id(), 
parent->node_id(),
-                                            parent->get_name() + 
"_FINISH_DEPENDENCY", true)) {
+          _finish_dependency(new FinishDependency(parent->operator_id(), 
parent->node_id(),
+                                                  parent->get_name() + 
"_FINISH_DEPENDENCY",
+                                                  state->get_query_ctx())) {
     _filter_dependency = std::make_shared<RuntimeFilterDependency>(
-            parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FILTER_DEPENDENCY");
+            parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FILTER_DEPENDENCY",
+            state->get_query_ctx());
 }
 
 template <typename DependencyType>
@@ -421,7 +412,7 @@ Status 
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
         }
     } else {
         auto& deps = info.dependencys;
-        deps.front() = std::make_shared<FakeDependency>(0, 0);
+        deps.front() = std::make_shared<FakeDependency>(0, 0, 
state->get_query_ctx());
         _dependency = (DependencyType*)deps.front().get();
     }
     _rows_input_counter = ADD_COUNTER_WITH_LEVEL(_profile, "InputRows", 
TUnit::UNIT, 1);
@@ -499,8 +490,8 @@ Status AsyncWriterSink<Writer, Parent>::init(RuntimeState* 
state, LocalSinkState
                 _parent->cast<Parent>()._output_vexpr_ctxs[i]->clone(state, 
_output_vexpr_ctxs[i]));
     }
     _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
-    _async_writer_dependency =
-            AsyncWriterDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
+    _async_writer_dependency = AsyncWriterDependency::create_shared(
+            _parent->operator_id(), _parent->node_id(), 
state->get_query_ctx());
     _writer->set_dependency(_async_writer_dependency.get(), 
_finish_dependency.get());
 
     _wait_for_dependency_timer =
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 5afa080e6d3..943d8c4670b 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -97,7 +97,7 @@ public:
     void add_num_rows_returned(int64_t delta) { _num_rows_returned += delta; }
     void set_num_rows_returned(int64_t value) { _num_rows_returned = value; }
 
-    [[nodiscard]] virtual std::string debug_string(int indentation_level = 0) 
const;
+    [[nodiscard]] virtual std::string debug_string(int indentation_level = 0) 
const = 0;
 
     virtual Dependency* dependency() { return nullptr; }
 
@@ -176,7 +176,7 @@ public:
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, _op_name);
     }
     [[nodiscard]] std::string get_name() const override { return _op_name; }
-    virtual DependencySPtr get_dependency() = 0;
+    virtual DependencySPtr get_dependency(QueryContext* ctx) = 0;
 
     Status prepare(RuntimeState* state) override;
 
@@ -307,7 +307,7 @@ public:
         return state->get_local_state(operator_id())->template 
cast<LocalState>();
     }
 
-    DependencySPtr get_dependency() override;
+    DependencySPtr get_dependency(QueryContext* ctx) override;
 };
 
 template <typename DependencyArg = FakeDependency>
@@ -348,7 +348,7 @@ public:
     virtual Status close(RuntimeState* state, Status exec_status) = 0;
     virtual Status try_close(RuntimeState* state, Status exec_status) = 0;
 
-    [[nodiscard]] virtual std::string debug_string(int indentation_level) 
const;
+    [[nodiscard]] virtual std::string debug_string(int indentation_level) 
const = 0;
 
     template <class TARGET>
     TARGET& cast() {
@@ -456,7 +456,7 @@ public:
         return reinterpret_cast<const TARGET&>(*this);
     }
 
-    virtual void get_dependency(std::vector<DependencySPtr>& dependency) = 0;
+    virtual void get_dependency(std::vector<DependencySPtr>& dependency, 
QueryContext* ctx) = 0;
 
     Status close(RuntimeState* state) override {
         return Status::InternalError("Should not reach here!");
@@ -551,7 +551,7 @@ public:
     ~DataSinkOperatorX() override = default;
 
     Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) 
override;
-    void get_dependency(std::vector<DependencySPtr>& dependency) override;
+    void get_dependency(std::vector<DependencySPtr>& dependency, QueryContext* 
ctx) override;
 
     using LocalState = LocalStateType;
     [[nodiscard]] LocalState& get_local_state(RuntimeState* state) const {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index b087c3bcab8..e47b55e4914 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -131,7 +131,7 @@ void PipelineXFragmentContext::cancel(const 
PPlanFragmentCancelReason& reason,
             .tag("fragment_id", _fragment_id)
             .tag("reason", reason)
             .tag("error message", msg);
-    if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) {
+    if (_query_ctx->cancel(true, msg, Status::Cancelled(msg), _fragment_id)) {
         if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
             FOR_EACH_RUNTIME_STATE(LOG(WARNING) << "PipelineXFragmentContext 
cancel instance: "
                                                 << 
print_id(runtime_state->fragment_instance_id());)
@@ -149,6 +149,11 @@ void PipelineXFragmentContext::cancel(const 
PPlanFragmentCancelReason& reason,
         // TODO pipeline incomp
         // _exec_env->result_queue_mgr()->update_queue_status(id, 
Status::Aborted(msg));
     }
+    for (auto& tasks : _tasks) {
+        for (auto& task : tasks) {
+            task->clear_blocking_state();
+        }
+    }
 }
 
 Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& 
request) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 5b3524c69a2..703e4862f7c 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -57,9 +57,9 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t 
task_id, RuntimeSta
           _local_exchange_state(local_exchange_state),
           _task_idx(task_idx) {
     _pipeline_task_watcher.start();
-    _sink->get_dependency(_downstream_dependency);
+    _sink->get_dependency(_downstream_dependency, state->get_query_ctx());
     for (auto& op : _operators) {
-        _source_dependency.insert({op->operator_id(), op->get_dependency()});
+        _source_dependency.insert({op->operator_id(), 
op->get_dependency(state->get_query_ctx())});
     }
 }
 
@@ -357,21 +357,24 @@ std::string PipelineXTask::debug_string() {
                    _opened ? _sink->debug_string(_state, _operators.size())
                            : _sink->debug_string(_operators.size()));
     fmt::format_to(debug_string_buffer, "\nRead Dependency Information: \n");
-    for (size_t i = 0; i < _read_dependencies.size(); i++) {
-        fmt::format_to(debug_string_buffer, "{}{}\n", std::string(i * 2, ' '),
-                       _read_dependencies[i]->debug_string());
+    size_t i = 0;
+    for (; i < _read_dependencies.size(); i++) {
+        fmt::format_to(debug_string_buffer, "{}. {}\n", i,
+                       _read_dependencies[i]->debug_string(i + 1));
     }
 
     fmt::format_to(debug_string_buffer, "Write Dependency Information: \n");
-    fmt::format_to(debug_string_buffer, "{}\n", 
_write_dependencies->debug_string());
+    fmt::format_to(debug_string_buffer, "{}. {}\n", i, 
_write_dependencies->debug_string(1));
+    i++;
 
     fmt::format_to(debug_string_buffer, "Runtime Filter Dependency 
Information: \n");
-    fmt::format_to(debug_string_buffer, "{}\n", 
_filter_dependency->debug_string());
+    fmt::format_to(debug_string_buffer, "{}. {}\n", i, 
_filter_dependency->debug_string(1));
+    i++;
 
     fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n");
-    for (size_t i = 0; i < _finish_dependencies.size(); i++) {
-        fmt::format_to(debug_string_buffer, "{}{}\n", std::string(i * 2, ' '),
-                       _finish_dependencies[i]->debug_string());
+    for (size_t j = 0; j < _finish_dependencies.size(); j++, i++) {
+        fmt::format_to(debug_string_buffer, "{}. {}\n", i,
+                       _finish_dependencies[i]->debug_string(j + 1));
     }
     return fmt::to_string(debug_string_buffer);
 }
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index e0d0e58e658..46d006d0dac 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -142,6 +142,13 @@ public:
         }
         _use_blocking_queue = false;
     }
+    void clear_blocking_state() {
+        if (!is_final_state(get_state()) && get_state() != 
PipelineTaskState::PENDING_FINISH &&
+            _blocked_dep) {
+            _blocked_dep->set_ready();
+            _blocked_dep = nullptr;
+        }
+    }
 
 private:
     Dependency* _write_blocked_dependency() {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 210de93a7ec..c0153652d26 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -560,6 +560,12 @@ void FragmentMgr::remove_pipeline_context(
     f_context->instance_ids(ins_ids);
     bool all_done = q_context->countdown(ins_ids.size());
     for (const auto& ins_id : ins_ids) {
+        {
+            std::lock_guard<std::mutex> plock(q_context->pipeline_lock);
+            if 
(q_context->fragment_id_to_pipeline_ctx.contains(f_context->get_fragment_id())) 
{
+                
q_context->fragment_id_to_pipeline_ctx.erase(f_context->get_fragment_id());
+            }
+        }
         LOG_INFO("Removing query {} instance {}, all done? {}", 
print_id(query_id),
                  print_id(ins_id), all_done);
         _pipeline_map.erase(ins_id);
@@ -866,6 +872,10 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
 
             _cv.notify_all();
         }
+        {
+            std::lock_guard<std::mutex> lock(query_ctx->pipeline_lock);
+            query_ctx->fragment_id_to_pipeline_ctx.insert({params.fragment_id, 
context});
+        }
 
         RETURN_IF_ERROR(context->submit());
         return Status::OK();
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
new file mode 100644
index 00000000000..823893dbe74
--- /dev/null
+++ b/be/src/runtime/query_context.cpp
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/query_context.h"
+
+#include "pipeline/pipeline_fragment_context.h"
+
+namespace doris {
+
+bool QueryContext::cancel(bool v, std::string msg, Status new_status, int 
fragment_id) {
+    if (_is_cancelled) {
+        return false;
+    }
+    set_exec_status(new_status);
+    _is_cancelled.store(v);
+
+    set_ready_to_execute(true);
+    {
+        std::lock_guard<std::mutex> plock(pipeline_lock);
+        for (auto& ctx : fragment_id_to_pipeline_ctx) {
+            if (fragment_id == ctx.first) {
+                continue;
+            }
+            ctx.second->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, msg);
+        }
+    }
+    return true;
+}
+} // namespace doris
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index bb7ad20b90b..49c2deb0d8c 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -40,6 +40,11 @@
 #include "vec/runtime/shared_scanner_controller.h"
 
 namespace doris {
+
+namespace pipeline {
+class PipelineFragmentContext;
+} // namespace pipeline
+
 struct ReportStatusRequest {
     bool is_pipeline_x;
     const Status status;
@@ -138,16 +143,7 @@ public:
     }
 
     [[nodiscard]] bool is_cancelled() const { return _is_cancelled.load(); }
-    bool cancel(bool v, std::string msg, Status new_status) {
-        if (_is_cancelled) {
-            return false;
-        }
-        set_exec_status(new_status);
-        _is_cancelled.store(v);
-
-        set_ready_to_execute(true);
-        return true;
-    }
+    bool cancel(bool v, std::string msg, Status new_status, int fragment_id = 
-1);
 
     void set_exec_status(Status new_status) {
         if (new_status.ok()) {
@@ -267,6 +263,8 @@ public:
     std::shared_ptr<MemTrackerLimiter> query_mem_tracker;
 
     std::vector<TUniqueId> fragment_instance_ids;
+    std::map<int, std::shared_ptr<pipeline::PipelineFragmentContext>> 
fragment_id_to_pipeline_ctx;
+    std::mutex pipeline_lock;
 
     // plan node id -> TFileScanRangeParams
     // only for file scan node
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 94bc7a7bdee..d5d460e80b9 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -273,7 +273,7 @@ void VDataStreamRecvr::SenderQueue::decrement_senders(int 
be_number) {
               << " #senders=" << _num_remaining_senders;
     if (_num_remaining_senders == 0) {
         if (_dependency) {
-            _dependency->set_eos();
+            _dependency->set_ready();
         }
         _data_arrival_cv.notify_one();
     }
@@ -288,7 +288,7 @@ void VDataStreamRecvr::SenderQueue::cancel(Status 
cancel_status) {
         _is_cancelled = true;
         _cancel_status = cancel_status;
         if (_dependency) {
-            _dependency->set_eos();
+            _dependency->set_ready();
         }
         VLOG_QUERY << "cancelled stream: _fragment_instance_id="
                    << print_id(_recvr->fragment_instance_id())
@@ -318,7 +318,7 @@ void VDataStreamRecvr::SenderQueue::close() {
         std::lock_guard<std::mutex> l(_lock);
         _is_cancelled = true;
         if (_dependency) {
-            _dependency->set_eos();
+            _dependency->set_ready();
         }
 
         for (auto closure_pair : _pending_closures) {
@@ -362,8 +362,8 @@ VDataStreamRecvr::VDataStreamRecvr(
         _sender_to_local_channel_dependency.resize(num_queues);
         for (size_t i = 0; i < num_queues; i++) {
             _sender_to_local_channel_dependency[i] =
-                    
pipeline::LocalExchangeChannelDependency::create_shared(_dest_node_id,
-                                                                            
_dest_node_id);
+                    pipeline::LocalExchangeChannelDependency::create_shared(
+                            _dest_node_id, _dest_node_id, 
state->get_query_ctx());
         }
     }
     _sender_queues.reserve(num_queues);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to