This is an automated email from the ASF dual-hosted git repository.

zclllyybb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bd21de83599 [fix](exchange) Drain exchange receiver queues outside 
lock (#63331)
bd21de83599 is described below

commit bd21de835990f23003f57f039a99a60197e87abb
Author: zclllyybb <[email protected]>
AuthorDate: Tue May 19 10:22:06 2026 +0800

    [fix](exchange) Drain exchange receiver queues outside lock (#63331)
    
    Problem Summary:
    The real queue mutation paths all re-check _is_cancelled before touching
    _block_queue, but cancel() still used a two-lock pattern and close() ran
    delayed RPC done callbacks while holding the queue mutex. That made
    ownership of queued BlockItem callbacks harder to reason about and left
    some debug reads of queue state outside the mutex.
    
    This change makes cancel() and close() move queued BlockItems into a
    local list while holding _lock, then runs call_done() after the member
    queue is fully detached and the lock is released. It also locks
    get_batch()'s debug-only invariant read and debug_string()'s queue-state
    read.
---
 be/src/exec/exchange/vdata_stream_recvr.cpp       |  55 +++++------
 be/src/exec/exchange/vdata_stream_recvr.h         |   3 +
 be/test/exec/pipeline/vdata_stream_recvr_test.cpp | 109 ++++++++++++++++++++++
 3 files changed, 140 insertions(+), 27 deletions(-)

diff --git a/be/src/exec/exchange/vdata_stream_recvr.cpp 
b/be/src/exec/exchange/vdata_stream_recvr.cpp
index 50dcff5bbd9..7768db708ea 100644
--- a/be/src/exec/exchange/vdata_stream_recvr.cpp
+++ b/be/src/exec/exchange/vdata_stream_recvr.cpp
@@ -53,24 +53,22 @@ 
VDataStreamRecvr::SenderQueue::SenderQueue(VDataStreamRecvr* parent_recvr, int n
 }
 
 VDataStreamRecvr::SenderQueue::~SenderQueue() {
-    for (auto& block_item : _block_queue) {
-        block_item.call_done(_recvr);
-    }
+    run_block_queue_done_callbacks(_block_queue);
     _block_queue.clear();
 }
 
 Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) {
-#ifndef NDEBUG
-    if (!_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0) {
-        throw doris::Exception(ErrorCode::INTERNAL_ERROR,
-                               "_is_cancelled: {}, _block_queue_empty: {}, "
-                               "_num_remaining_senders: {}",
-                               _is_cancelled, _block_queue.empty(), 
_num_remaining_senders);
-    }
-#endif
     BlockItem block_item;
     {
         INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
+#ifndef NDEBUG
+        if (!_is_cancelled && _block_queue.empty() && _num_remaining_senders > 
0) {
+            throw doris::Exception(ErrorCode::INTERNAL_ERROR,
+                                   "_is_cancelled: {}, _block_queue_empty: {}, 
"
+                                   "_num_remaining_senders: {}",
+                                   _is_cancelled, _block_queue.empty(), 
_num_remaining_senders);
+        }
+#endif
         //check and get block_item from data_queue
         if (_is_cancelled) {
             RETURN_IF_ERROR(_cancel_status);
@@ -127,14 +125,21 @@ void 
VDataStreamRecvr::SenderQueue::set_source_ready(std::lock_guard<std::mutex>
     }
 }
 
+void VDataStreamRecvr::SenderQueue::run_block_queue_done_callbacks(
+        std::list<BlockItem>& block_queue) {
+    for (auto& block_item : block_queue) {
+        block_item.call_done(_recvr);
+    }
+}
+
 std::string VDataStreamRecvr::SenderQueue::debug_string() {
+    std::lock_guard<std::mutex> l(_lock);
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer,
                    "_num_remaining_senders = {}, block_queue size = {}, 
_is_cancelled: {}, "
                    "_cancel_status: {}, _sender_eos_set: (",
                    _num_remaining_senders, _block_queue.size(), _is_cancelled,
                    _cancel_status.to_string());
-    std::lock_guard<std::mutex> l(_lock);
     for (auto& i : _sender_eos_set) {
         fmt::format_to(debug_string_buffer, "{}, ", i);
     }
@@ -329,6 +334,7 @@ void VDataStreamRecvr::SenderQueue::decrement_senders(int 
be_number) {
 }
 
 void VDataStreamRecvr::SenderQueue::cancel(Status cancel_status) {
+    std::list<BlockItem> block_queue;
     {
         INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
         if (_is_cancelled) {
@@ -340,29 +346,24 @@ void VDataStreamRecvr::SenderQueue::cancel(Status 
cancel_status) {
         VLOG_QUERY << "cancelled stream: _fragment_instance_id="
                    << print_id(_recvr->fragment_instance_id())
                    << " node_id=" << _recvr->dest_node_id();
+        block_queue.splice(block_queue.end(), _block_queue);
     }
-    {
-        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
-        for (auto& block_item : _block_queue) {
-            block_item.call_done(_recvr);
-        }
-        _block_queue.clear();
-    }
+    run_block_queue_done_callbacks(block_queue);
 }
 
 void VDataStreamRecvr::SenderQueue::close() {
     // If _is_cancelled is not set to true, there may be concurrent send
     // which add batch to _block_queue. The batch added after _block_queue
     // is clear will be memory leak
-    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
-    _is_cancelled = true;
-    set_source_ready(l);
-
-    for (auto& block_item : _block_queue) {
-        block_item.call_done(_recvr);
+    std::list<BlockItem> block_queue;
+    {
+        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
+        _is_cancelled = true;
+        set_source_ready(l);
+        block_queue.splice(block_queue.end(), _block_queue);
     }
-    // Delete any batches queued in _block_queue
-    _block_queue.clear();
+    // Release delayed RPC callbacks after the queue state is fully closed.
+    run_block_queue_done_callbacks(block_queue);
 }
 
 VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr,
diff --git a/be/src/exec/exchange/vdata_stream_recvr.h 
b/be/src/exec/exchange/vdata_stream_recvr.h
index 9a5d0d35759..2b395ee61f4 100644
--- a/be/src/exec/exchange/vdata_stream_recvr.h
+++ b/be/src/exec/exchange/vdata_stream_recvr.h
@@ -201,6 +201,8 @@ public:
     void set_dependency(std::shared_ptr<Dependency> dependency) { 
_source_dependency = dependency; }
 
 protected:
+    struct BlockItem;
+
     void add_blocks_memory_usage(int64_t size);
 
     void sub_blocks_memory_usage(int64_t size);
@@ -209,6 +211,7 @@ protected:
     friend class ExchangeLocalState;
 
     void set_source_ready(std::lock_guard<std::mutex>&);
+    void run_block_queue_done_callbacks(std::list<BlockItem>& block_queue);
 
     // Not managed by this class
     VDataStreamRecvr* _recvr = nullptr;
diff --git a/be/test/exec/pipeline/vdata_stream_recvr_test.cpp 
b/be/test/exec/pipeline/vdata_stream_recvr_test.cpp
index f0c4e05c7e6..9f8d1b13a26 100644
--- a/be/test/exec/pipeline/vdata_stream_recvr_test.cpp
+++ b/be/test/exec/pipeline/vdata_stream_recvr_test.cpp
@@ -19,7 +19,10 @@
 
 #include <gtest/gtest.h>
 
+#include <atomic>
+#include <functional>
 #include <memory>
+#include <mutex>
 #include <thread>
 #include <vector>
 
@@ -306,6 +309,7 @@ TEST_F(DataStreamRecvrTest, TestRandomCloseSender) {
 }
 
 class MockClosure : public google::protobuf::Closure {
+public:
     MockClosure() = default;
 
     ~MockClosure() override = default;
@@ -323,6 +327,111 @@ void to_pblock(Block& block, PBlock* pblock) {
                                 
segment_v2::CompressionTypePB::NO_COMPRESSION));
 }
 
+void run_concurrent_remote_add_and_drain(VDataStreamRecvr::SenderQueue* sender,
+                                         MockVDataStreamRecvr* recvr, bool 
use_cancel) {
+    constexpr int num_threads = 4;
+    constexpr int blocks_per_thread = 200;
+    constexpr int num_blocks = num_threads * blocks_per_thread;
+    constexpr int min_delayed_callbacks_before_drain = 16;
+
+    recvr->always_exceeds_limit = true;
+
+    std::atomic_bool start {false};
+    std::atomic_int add_attempts {0};
+    std::atomic_int delayed_callbacks {0};
+    std::atomic_int callbacks_run {0};
+    std::mutex closures_lock;
+    std::vector<std::shared_ptr<MockClosure>> closures;
+    closures.reserve(num_blocks);
+
+    auto add_func = [&](int be_number) {
+        while (!start.load(std::memory_order_acquire)) {
+            std::this_thread::yield();
+        }
+
+        for (int packet_seq = 0; packet_seq < blocks_per_thread; ++packet_seq) 
{
+            add_attempts.fetch_add(1, std::memory_order_relaxed);
+
+            auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 
4, 5});
+            auto pblock = std::make_unique<PBlock>();
+            to_pblock(block, pblock.get());
+
+            auto closure = std::make_shared<MockClosure>();
+            closure->_cb = [&]() { callbacks_run.fetch_add(1, 
std::memory_order_relaxed); };
+            {
+                std::lock_guard<std::mutex> lock(closures_lock);
+                closures.push_back(closure);
+            }
+
+            google::protobuf::Closure* done = closure.get();
+            auto st = sender->add_block(std::move(pblock), be_number, 
packet_seq, &done, 0, 0);
+            EXPECT_TRUE(st) << st.msg();
+            if (done != nullptr) {
+                done->Run();
+            } else {
+                delayed_callbacks.fetch_add(1, std::memory_order_relaxed);
+            }
+        }
+    };
+
+    auto drain_func = [&]() {
+        while (!start.load(std::memory_order_acquire)) {
+            std::this_thread::yield();
+        }
+        while (delayed_callbacks.load(std::memory_order_relaxed) <
+                       min_delayed_callbacks_before_drain &&
+               add_attempts.load(std::memory_order_relaxed) < num_blocks) {
+            std::this_thread::yield();
+        }
+
+        if (use_cancel) {
+            sender->cancel(Status::Cancelled("test cancel"));
+        } else {
+            sender->close();
+        }
+    };
+
+    std::vector<std::thread> threads;
+    for (int i = 0; i < num_threads; ++i) {
+        threads.emplace_back(add_func, i + 1);
+    }
+    threads.emplace_back(drain_func);
+
+    start.store(true, std::memory_order_release);
+    for (auto& thread : threads) {
+        thread.join();
+    }
+
+    EXPECT_EQ(callbacks_run.load(std::memory_order_relaxed), num_blocks);
+    EXPECT_EQ(sender->_block_queue.size(), 0);
+}
+
+TEST_F(DataStreamRecvrTest, TestRemoteCancelConcurrentWithAddBlock) {
+    create_recvr(4, false);
+    ASSERT_EQ(recvr->sender_queues().size(), 1);
+
+    auto* sender = recvr->sender_queues().back();
+    run_concurrent_remote_add_and_drain(sender, recvr.get(), true);
+
+    Block block;
+    bool eos = false;
+    auto st = sender->get_batch(&block, &eos);
+    EXPECT_FALSE(st.ok());
+}
+
+TEST_F(DataStreamRecvrTest, TestRemoteCloseConcurrentWithAddBlock) {
+    create_recvr(4, false);
+    ASSERT_EQ(recvr->sender_queues().size(), 1);
+
+    auto* sender = recvr->sender_queues().back();
+    run_concurrent_remote_add_and_drain(sender, recvr.get(), false);
+
+    Block block;
+    bool eos = false;
+    auto st = sender->get_batch(&block, &eos);
+    EXPECT_FALSE(st.ok());
+}
+
 TEST_F(DataStreamRecvrTest, TestRemoteSender) {
     create_recvr(3, false);
     EXPECT_EQ(recvr->sender_queues().size(), 1);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to