This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new b2f9fde16a0 [Bug](exchange) fix dcheck fail when VDataStreamRecvr 
input empty block (#22992) (#36721)
b2f9fde16a0 is described below

commit b2f9fde16a0a0ff71e3d5c689730ae5ae8b8e824
Author: Jerry Hu <[email protected]>
AuthorDate: Fri Jul 12 10:43:51 2024 +0800

    [Bug](exchange) fix dcheck fail when VDataStreamRecvr input empty block 
(#22992) (#36721)
    
    fix dcheck fail when VDataStreamRecvr input empty block
    
    pick #22992
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 .../vec/runtime/shared_hash_table_controller.cpp   |  3 +--
 be/src/vec/runtime/vdata_stream_recvr.cpp          | 24 +++++++++++++++-------
 be/src/vec/runtime/vdata_stream_recvr.h            |  4 +++-
 3 files changed, 21 insertions(+), 10 deletions(-)

diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp 
b/be/src/vec/runtime/shared_hash_table_controller.cpp
index 9353bdbbec2..490a2cfdcda 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.cpp
+++ b/be/src/vec/runtime/shared_hash_table_controller.cpp
@@ -57,8 +57,7 @@ bool SharedHashTableController::should_build_hash_table(const 
TUniqueId& fragmen
 
 SharedHashTableContextPtr SharedHashTableController::get_context(int 
my_node_id) {
     std::lock_guard<std::mutex> lock(_mutex);
-    auto it = _shared_contexts.find(my_node_id);
-    if (it == _shared_contexts.cend()) {
+    if (!_shared_contexts.count(my_node_id)) {
         _shared_contexts.insert({my_node_id, 
std::make_shared<SharedHashTableContext>()});
     }
     return _shared_contexts[my_node_id];
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 6e25e7160f8..f0bc73682f1 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -160,7 +160,11 @@ Status VDataStreamRecvr::SenderQueue::add_block(const 
PBlock& pblock, int be_num
     COUNTER_UPDATE(_recvr->_rows_produced_counter, block->rows());
     COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
 
-    _block_queue.emplace_back(std::move(block), block_byte_size);
+    bool empty = !block->rows();
+
+    if (!empty) {
+        _block_queue.emplace_back(std::move(block), block_byte_size);
+    }
     // if done is nullptr, this function can't delay this response
     if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) {
         MonotonicStopWatch monotonicStopWatch;
@@ -169,8 +173,10 @@ Status VDataStreamRecvr::SenderQueue::add_block(const 
PBlock& pblock, int be_num
         _pending_closures.emplace_back(*done, monotonicStopWatch);
         *done = nullptr;
     }
-    _recvr->update_blocks_memory_usage(block_byte_size);
-    _data_arrival_cv.notify_one();
+    _recvr->_blocks_memory_usage->add(block_byte_size);
+    if (!empty) {
+        _data_arrival_cv.notify_one();
+    }
     return Status::OK();
 }
 
@@ -207,8 +213,12 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* 
block, bool use_move) {
     COUNTER_UPDATE(_recvr->_rows_produced_counter, block->rows());
     COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
 
-    _block_queue.emplace_back(std::move(nblock), block_mem_size);
-    _data_arrival_cv.notify_one();
+    bool empty = !nblock->rows();
+
+    if (!empty) {
+        _block_queue.emplace_back(std::move(nblock), block_mem_size);
+        _data_arrival_cv.notify_one();
+    }
 
     // Careful: Accessing members of _recvr that are allocated by Object pool
     // should be done before the following logic, because the _lock will be 
released
@@ -389,8 +399,8 @@ bool VDataStreamRecvr::sender_queue_empty(int sender_id) {
 }
 
 bool VDataStreamRecvr::ready_to_read() {
-    for (size_t i = 0; i < _sender_queues.size(); i++) {
-        if (_sender_queues[i]->should_wait()) {
+    for (const auto& queue : _sender_queues) {
+        if (queue->should_wait()) {
             return false;
         }
     }
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index b92aeadc8b1..8d467412a60 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -239,7 +239,9 @@ public:
     }
 
     void add_block(Block* block, bool use_move) override {
-        if (block->rows() == 0) return;
+        if (block->rows() == 0) {
+            return;
+        }
         {
             std::unique_lock<std::mutex> l(_lock);
             if (_is_cancelled) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to