This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f15d93f7b9 [fix](pipeline) fix exception safety issue in 
MultiCastDataStreamer (#36748)
9f15d93f7b9 is described below

commit 9f15d93f7b9b4adf298d484e265b2f1eebf9c955
Author: Jerry Hu <[email protected]>
AuthorDate: Wed Jun 26 17:33:20 2024 +0800

    [fix](pipeline) fix exception safety issue in MultiCastDataStreamer (#36748)
    
    ## Proposed changes
    
    ```cpp
    RETURN_IF_ERROR(vectorized::MutableBlock(block).merge(*pos_to_pull->_block))
    ```
    this line may throw an exception(cannot allocate)
---
 be/src/pipeline/exec/multi_cast_data_streamer.cpp | 20 +-------------------
 be/src/pipeline/exec/multi_cast_data_streamer.h   |  2 --
 2 files changed, 1 insertion(+), 21 deletions(-)

diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp 
b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
index d3047c42a2d..deebf7d11bb 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
@@ -41,9 +41,9 @@ Status MultiCastDataStreamer::pull(int sender_idx, 
doris::vectorized::Block* blo
             pos_to_pull++;
             _multi_cast_blocks.pop_front();
         } else {
-            pos_to_pull->_used_count--;
             pos_to_pull->_block->create_same_struct_block(0)->swap(*block);
             
RETURN_IF_ERROR(vectorized::MutableBlock(block).merge(*pos_to_pull->_block));
+            pos_to_pull->_used_count--;
             pos_to_pull++;
         }
     }
@@ -54,24 +54,6 @@ Status MultiCastDataStreamer::pull(int sender_idx, 
doris::vectorized::Block* blo
     return Status::OK();
 }
 
-void MultiCastDataStreamer::close_sender(int sender_idx) {
-    std::lock_guard l(_mutex);
-    auto& pos_to_pull = _sender_pos_to_read[sender_idx];
-    while (pos_to_pull != _multi_cast_blocks.end()) {
-        if (pos_to_pull->_used_count == 1) {
-            DCHECK(pos_to_pull == _multi_cast_blocks.begin());
-            _cumulative_mem_size -= pos_to_pull->_mem_size;
-            pos_to_pull++;
-            _multi_cast_blocks.pop_front();
-        } else {
-            pos_to_pull->_used_count--;
-            pos_to_pull++;
-        }
-    }
-    _closed_sender_count++;
-    _block_reading(sender_idx);
-}
-
 Status MultiCastDataStreamer::push(RuntimeState* state, 
doris::vectorized::Block* block, bool eos) {
     auto rows = block->rows();
     COUNTER_UPDATE(_process_rows, rows);
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h 
b/be/src/pipeline/exec/multi_cast_data_streamer.h
index 0a1276c4f1b..2112ebaaf20 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.h
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.h
@@ -52,8 +52,6 @@ public:
 
     Status pull(int sender_idx, vectorized::Block* block, bool* eos);
 
-    void close_sender(int sender_idx);
-
     Status push(RuntimeState* state, vectorized::Block* block, bool eos);
 
     const RowDescriptor& row_desc() { return _row_desc; }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to