This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ad3e4d5b655 [opt](MultiCast) Avoid copying while holding a lock 
(#37462)
ad3e4d5b655 is described below

commit ad3e4d5b6556f2e4003acea92bf1b108aa827fc3
Author: Mryange <[email protected]>
AuthorDate: Mon Jul 15 16:55:52 2024 +0800

    [opt](MultiCast) Avoid copying while holding a lock (#37462)
    
    Previously, copying was done while holding a lock;
    Now, get block while holding the lock and then copy
---
 be/src/pipeline/exec/multi_cast_data_streamer.cpp | 115 +++++++++++++---------
 be/src/pipeline/exec/multi_cast_data_streamer.h   |  16 ++-
 2 files changed, 78 insertions(+), 53 deletions(-)

diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp 
b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
index deebf7d11bb..d44cf3974a6 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
@@ -23,63 +23,97 @@
 
 namespace doris::pipeline {
 
-MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, 
size_t mem_size)
-        : _used_count(used_count), _mem_size(mem_size) {
+MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, int 
un_finish_copy,
+                               size_t mem_size)
+        : _used_count(used_count), _un_finish_copy(un_finish_copy), 
_mem_size(mem_size) {
     _block = 
vectorized::Block::create_unique(block->get_columns_with_type_and_name());
     block->clear();
 }
 
 Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* 
block, bool* eos) {
-    std::lock_guard l(_mutex);
-    auto& pos_to_pull = _sender_pos_to_read[sender_idx];
-    if (pos_to_pull != _multi_cast_blocks.end()) {
-        if (pos_to_pull->_used_count == 1) {
-            DCHECK(pos_to_pull == _multi_cast_blocks.begin());
-            pos_to_pull->_block->swap(*block);
-
-            _cumulative_mem_size -= pos_to_pull->_mem_size;
-            pos_to_pull++;
-            _multi_cast_blocks.pop_front();
-        } else {
-            pos_to_pull->_block->create_same_struct_block(0)->swap(*block);
-            
RETURN_IF_ERROR(vectorized::MutableBlock(block).merge(*pos_to_pull->_block));
-            pos_to_pull->_used_count--;
-            pos_to_pull++;
+    int* un_finish_copy = nullptr;
+    int use_count = 0;
+    {
+        std::lock_guard l(_mutex);
+        auto& pos_to_pull = _sender_pos_to_read[sender_idx];
+        const auto end = _multi_cast_blocks.end();
+        DCHECK(pos_to_pull != end);
+
+        *block = *pos_to_pull->_block;
+
+        _cumulative_mem_size -= pos_to_pull->_mem_size;
+
+        pos_to_pull->_used_count--;
+        use_count = pos_to_pull->_used_count;
+        un_finish_copy = &pos_to_pull->_un_finish_copy;
+
+        pos_to_pull++;
+
+        if (pos_to_pull == end) {
+            _block_reading(sender_idx);
         }
+
+        *eos = _eos and pos_to_pull == end;
     }
-    *eos = _eos and pos_to_pull == _multi_cast_blocks.end();
-    if (pos_to_pull == _multi_cast_blocks.end()) {
-        _block_reading(sender_idx);
+
+    if (use_count == 0) {
+        // will clear _multi_cast_blocks
+        _wait_copy_block(block, *un_finish_copy);
+    } else {
+        _copy_block(block, *un_finish_copy);
     }
+
     return Status::OK();
 }
 
+void MultiCastDataStreamer::_copy_block(vectorized::Block* block, int& 
un_finish_copy) {
+    const auto rows = block->rows();
+    for (int i = 0; i < block->columns(); ++i) {
+        block->get_by_position(i).column = 
block->get_by_position(i).column->clone_resized(rows);
+    }
+
+    std::unique_lock l(_mutex);
+    un_finish_copy--;
+    if (un_finish_copy == 0) {
+        l.unlock();
+        _cv.notify_one();
+    }
+}
+
+void MultiCastDataStreamer::_wait_copy_block(vectorized::Block* block, int& 
un_finish_copy) {
+    std::unique_lock l(_mutex);
+    _cv.wait(l, [&]() { return un_finish_copy == 0; });
+    _multi_cast_blocks.pop_front();
+}
+
 Status MultiCastDataStreamer::push(RuntimeState* state, 
doris::vectorized::Block* block, bool eos) {
     auto rows = block->rows();
     COUNTER_UPDATE(_process_rows, rows);
 
-    auto block_mem_size = block->allocated_bytes();
-    std::lock_guard l(_mutex);
-    int need_process_count = _cast_sender_count - _closed_sender_count;
-    if (need_process_count == 0) {
-        return Status::EndOfFile("All data streamer is EOF");
-    }
-    // TODO: if the [queue back block rows + block->rows()] < batch_size, 
better
-    // do merge block. but need check the need_process_count and used_count 
whether
-    // equal
-    _multi_cast_blocks.emplace_back(block, need_process_count, block_mem_size);
+    const auto block_mem_size = block->allocated_bytes();
     _cumulative_mem_size += block_mem_size;
     COUNTER_SET(_peak_mem_usage, std::max(_cumulative_mem_size, 
_peak_mem_usage->value()));
 
-    auto end = _multi_cast_blocks.end();
-    end--;
-    for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
-        if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
-            _sender_pos_to_read[i] = end;
-            _set_ready_for_read(i);
+    {
+        std::lock_guard l(_mutex);
+        _multi_cast_blocks.emplace_back(block, _cast_sender_count, 
_cast_sender_count - 1,
+                                        block_mem_size);
+        // last elem
+        auto end = std::prev(_multi_cast_blocks.end());
+        for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
+            if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
+                _sender_pos_to_read[i] = end;
+                _set_ready_for_read(i);
+            }
+        }
+        _eos = eos;
+    }
+
+    if (_eos) {
+        for (auto* read_dep : _dependencies) {
+            read_dep->set_always_ready();
         }
     }
-    _eos = eos;
     return Status::OK();
 }
 
@@ -92,13 +126,6 @@ void MultiCastDataStreamer::_set_ready_for_read(int 
sender_idx) {
     dep->set_ready();
 }
 
-void MultiCastDataStreamer::_set_ready_for_read() {
-    for (auto* dep : _dependencies) {
-        DCHECK(dep);
-        dep->set_ready();
-    }
-}
-
 void MultiCastDataStreamer::_block_reading(int sender_idx) {
     if (_dependencies.empty()) {
         return;
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h 
b/be/src/pipeline/exec/multi_cast_data_streamer.h
index 2112ebaaf20..07e64016363 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.h
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.h
@@ -23,10 +23,11 @@ namespace doris::pipeline {
 
 class Dependency;
 struct MultiCastBlock {
-    MultiCastBlock(vectorized::Block* block, int used_count, size_t mem_size);
+    MultiCastBlock(vectorized::Block* block, int used_count, int need_copy, 
size_t mem_size);
 
     std::unique_ptr<vectorized::Block> _block;
     int _used_count;
+    int _un_finish_copy;
     size_t _mem_size;
 };
 
@@ -58,12 +59,6 @@ public:
 
     RuntimeProfile* profile() { return _profile; }
 
-    void set_eos() {
-        std::lock_guard l(_mutex);
-        _eos = true;
-        _set_ready_for_read();
-    }
-
     void set_dep_by_sender_idx(int sender_idx, Dependency* dep) {
         _dependencies[sender_idx] = dep;
         _block_reading(sender_idx);
@@ -71,17 +66,20 @@ public:
 
 private:
     void _set_ready_for_read(int sender_idx);
-    void _set_ready_for_read();
     void _block_reading(int sender_idx);
 
+    void _copy_block(vectorized::Block* block, int& un_finish_copy);
+
+    void _wait_copy_block(vectorized::Block* block, int& un_finish_copy);
+
     const RowDescriptor& _row_desc;
     RuntimeProfile* _profile = nullptr;
     std::list<MultiCastBlock> _multi_cast_blocks;
     std::vector<std::list<MultiCastBlock>::iterator> _sender_pos_to_read;
+    std::condition_variable _cv;
     std::mutex _mutex;
     bool _eos = false;
     int _cast_sender_count = 0;
-    int _closed_sender_count = 0;
     int64_t _cumulative_mem_size = 0;
 
     RuntimeProfile::Counter* _process_rows = nullptr;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to