This is an automated email from the ASF dual-hosted git repository.

Mryange pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 17bbba45a52 [fix](pipeline) avoid data queue sink dependency lost 
wakeup (#63055)
17bbba45a52 is described below

commit 17bbba45a52e6e6ace99773930f83dc1569897f7
Author: Mryange <[email protected]>
AuthorDate: Thu May 7 21:03:37 2026 +0800

    [fix](pipeline) avoid data queue sink dependency lost wakeup (#63055)
    
    ### What problem does this PR solve?
    
    Issue Number: N/A
    
    Problem Summary:
    `DataQueueTest.MultiTest` could intermittently hang after DataQueue
    moved sink dependency notifications outside the per-sub-queue lock. Root
    cause: `SubQueue` queue state and `sink_dependency` state were no longer
    serialized by `queue_lock`, so a producer could observe its sink
    dependency as blocked even after the queue had already become empty,
    leaving no future push/pop to wake it. This patch updates
    `sink_dependency->set_ready()` and `sink_dependency->block()` while
    holding `queue_lock`, keeping queue occupancy and sink readiness
    transitions atomic with respect to each other.
    
    
    
    Related PR: https://github.com/apache/doris/pull/62947
---
 be/src/exec/operator/data_queue.cpp | 44 ++++++++++++++-----------------------
 1 file changed, 17 insertions(+), 27 deletions(-)

diff --git a/be/src/exec/operator/data_queue.cpp 
b/be/src/exec/operator/data_queue.cpp
index f460103d2ea..752e972c3e6 100644
--- a/be/src/exec/operator/data_queue.cpp
+++ b/be/src/exec/operator/data_queue.cpp
@@ -29,38 +29,28 @@
 namespace doris {
 
 void SubQueue::try_pop(std::unique_ptr<Block>* output_block) {
-    bool need_notify_sink_ready = false;
-    {
-        LockGuard l(queue_lock);
-        if (!blocks.empty()) {
-            *output_block = std::move(blocks.front());
-            blocks.pop_front();
-            bytes_in_queue -= (*output_block)->allocated_bytes();
-            blocks_in_queue -= 1;
-            need_notify_sink_ready = blocks.empty();
+    LockGuard l(queue_lock);
+    if (!blocks.empty()) {
+        *output_block = std::move(blocks.front());
+        blocks.pop_front();
+        bytes_in_queue -= (*output_block)->allocated_bytes();
+        blocks_in_queue -= 1;
+        if (blocks.empty()) {
+            sink_dependency->set_ready();
         }
     }
-    // Notify outside of queue_lock to avoid nested locks.
-    if (need_notify_sink_ready) {
-        sink_dependency->set_ready();
-    }
 }
 
 bool SubQueue::try_push(std::unique_ptr<Block> block, std::atomic_uint32_t& 
total_counter) {
-    bool need_block_sink = false;
-    {
-        LockGuard l(queue_lock);
-        if (is_finished) {
-            return false;
-        }
-        total_counter++;
-        bytes_in_queue += block->allocated_bytes();
-        blocks.emplace_back(std::move(block));
-        blocks_in_queue += 1;
-        need_block_sink = (static_cast<int64_t>(blocks.size()) > 
max_blocks_in_queue.load());
-    }
-    // Notify outside of queue_lock to avoid nested locks.
-    if (need_block_sink) {
+    LockGuard l(queue_lock);
+    if (is_finished) {
+        return false;
+    }
+    total_counter++;
+    bytes_in_queue += block->allocated_bytes();
+    blocks.emplace_back(std::move(block));
+    blocks_in_queue += 1;
+    if (static_cast<int64_t>(blocks.size()) > max_blocks_in_queue.load()) {
         sink_dependency->block();
     }
     return true;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to