This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d31d99bf34 [pipeline](load) opt the pipeline load code (#24708)
d31d99bf34 is described below

commit d31d99bf3443ad55c784d242178eba3d97e6b874
Author: HappenLee <[email protected]>
AuthorDate: Thu Sep 21 15:20:31 2023 +0800

    [pipeline](load) opt the pipeline load code (#24708)
    
    opt the pipeline load code
---
 be/src/vec/core/block.cpp                      |  8 +++++--
 be/src/vec/core/block.h                        |  2 +-
 be/src/vec/sink/writer/async_result_writer.cpp | 30 ++++++++++++++++++++------
 be/src/vec/sink/writer/async_result_writer.h   | 10 ++++++++-
 4 files changed, 39 insertions(+), 11 deletions(-)

diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index e133834e49..9988ae9944 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -1050,11 +1050,15 @@ std::string MutableBlock::dump_data(size_t row_limit) 
const {
     return out.str();
 }
 
-std::unique_ptr<Block> Block::create_same_struct_block(size_t size) const {
+std::unique_ptr<Block> Block::create_same_struct_block(size_t size, bool 
is_reserve) const {
     auto temp_block = Block::create_unique();
     for (const auto& d : data) {
         auto column = d.type->create_column();
-        column->resize(size);
+        if (is_reserve) {
+            column->reserve(size);
+        } else {
+            column->resize(size);
+        }
         temp_block->insert({std::move(column), d.type, d.name});
     }
     return temp_block;
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 5edf9d1e70..5f52258972 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -315,7 +315,7 @@ public:
 
     Status deserialize(const PBlock& pblock);
 
-    std::unique_ptr<Block> create_same_struct_block(size_t size) const;
+    std::unique_ptr<Block> create_same_struct_block(size_t size, bool 
is_reserve = false) const;
 
     /** Compares (*this) n-th row and rhs m-th row.
       * Returns negative number, 0, or positive number  (*this) n-th row is 
less, equal, greater than rhs m-th row respectively.
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp 
b/be/src/vec/sink/writer/async_result_writer.cpp
index 8d29c1441d..55cf580f09 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -39,19 +39,18 @@ Status AsyncResultWriter::sink(Block* block, bool eos) {
     auto status = Status::OK();
     std::unique_ptr<Block> add_block;
     if (rows) {
-        add_block = block->create_same_struct_block(0);
+        add_block = _get_free_block(block, rows);
     }
 
-    std::lock_guard l(_m);
     // if io task failed, just return error status to
     // end the query
     if (!_writer_status.ok()) {
         return _writer_status;
     }
 
+    std::lock_guard l(_m);
     _eos = eos;
     if (rows) {
-        
RETURN_IF_ERROR(MutableBlock::build_mutable_block(add_block.get()).merge(*block));
         _data_queue.emplace_back(std::move(add_block));
     } else if (_eos && _data_queue.empty()) {
         status = Status::EndOfFile("Run out of sink data");
@@ -81,7 +80,7 @@ void AsyncResultWriter::process_block(RuntimeState* state, 
RuntimeProfile* profi
 
     if (_writer_status.ok()) {
         while (true) {
-            {
+            if (!_eos && _data_queue.empty() && _writer_status.ok()) {
                 std::unique_lock l(_m);
                 while (!_eos && _data_queue.empty() && _writer_status.ok()) {
                     _cv.wait(l);
@@ -93,10 +92,13 @@ void AsyncResultWriter::process_block(RuntimeState* state, 
RuntimeProfile* profi
                 break;
             }
 
-            auto status = write(get_block_from_queue());
-            std::unique_lock l(_m);
-            _writer_status = status;
+            auto block = get_block_from_queue();
+            auto status = write(block);
+            _return_free_block(std::move(block));
+
             if (!status.ok()) {
+                std::unique_lock l(_m);
+                _writer_status = status;
                 break;
             }
         }
@@ -129,5 +131,19 @@ void AsyncResultWriter::force_close(Status s) {
     _cv.notify_one();
 }
 
+void AsyncResultWriter::_return_free_block(std::unique_ptr<Block> b) {
+    _free_blocks.enqueue(std::move(b));
+}
+
+std::unique_ptr<Block> 
AsyncResultWriter::_get_free_block(doris::vectorized::Block* block,
+                                                          int rows) {
+    std::unique_ptr<Block> b;
+    if (!_free_blocks.try_dequeue(b)) {
+        b = block->create_same_struct_block(rows, true);
+    }
+    b->swap(*block);
+    return b;
+}
+
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/sink/writer/async_result_writer.h 
b/be/src/vec/sink/writer/async_result_writer.h
index 9806d20945..371d30ea7c 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -16,6 +16,8 @@
 // under the License.
 
 #pragma once
+#include <concurrentqueue.h>
+
 #include <condition_variable>
 #include <queue>
 
@@ -60,7 +62,7 @@ public:
 
     virtual Status open(RuntimeState* state, RuntimeProfile* profile) = 0;
 
-    Status write(std::unique_ptr<Block> block) { return append_block(*block); }
+    Status write(std::unique_ptr<Block>& block) { return append_block(*block); 
}
 
     bool can_write() {
         std::lock_guard l(_m);
@@ -83,6 +85,10 @@ protected:
     Status _projection_block(Block& input_block, Block* output_block);
     const VExprContextSPtrs& _vec_output_expr_ctxs;
 
+    std::unique_ptr<Block> _get_free_block(Block*, int rows);
+
+    void _return_free_block(std::unique_ptr<Block>);
+
 private:
     static constexpr auto QUEUE_SIZE = 3;
     std::mutex _m;
@@ -92,6 +98,8 @@ private:
     bool _eos = false;
     bool _need_normal_close = true;
     bool _writer_thread_closed = false;
+
+    moodycamel::ConcurrentQueue<std::unique_ptr<Block>> _free_blocks;
 };
 
 } // namespace vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to