This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 2891f8026bc [fix](runtime) Avoid merging results into one large result 
in BufferControlBlock (#49571) (#49639)
2891f8026bc is described below

commit 2891f8026bc6e2ed49b2929721d747a150451024
Author: Jerry Hu <[email protected]>
AuthorDate: Fri Jun 20 17:31:24 2025 +0800

    [fix](runtime) Avoid merging results into one large result in 
BufferControlBlock (#49571) (#49639)
    
    ### What problem does this PR solve?
    
    Pick #37990 and #49571
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/runtime/buffer_control_block.cpp  |  8 +++++++-
 be/src/runtime/buffer_control_block.h    |  1 +
 be/src/vec/sink/vmysql_result_writer.cpp | 32 ++++++++++++++++++++++++++++++--
 be/src/vec/sink/vmysql_result_writer.h   |  2 ++
 4 files changed, 40 insertions(+), 3 deletions(-)

diff --git a/be/src/runtime/buffer_control_block.cpp 
b/be/src/runtime/buffer_control_block.cpp
index 62c1763861a..da4d8ef4fe6 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -23,6 +23,7 @@
 #include <glog/logging.h>
 #include <google/protobuf/stubs/callback.h>
 // IWYU pragma: no_include <bits/chrono.h>
+#include <algorithm>
 #include <chrono> // IWYU pragma: keep
 #include <ostream>
 #include <string>
@@ -130,16 +131,21 @@ Status 
BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result)
     }
 
     if (_waiting_rpc.empty()) {
+        size_t bytes = 0;
+        std::for_each(result->result_batch.rows.cbegin(), 
result->result_batch.rows.cend(),
+                      [&bytes](const std::string& row) { bytes += row.size(); 
});
         // Merge result into batch to reduce rpc times
         if (!_batch_queue.empty() &&
             ((_batch_queue.back()->result_batch.rows.size() + num_rows) < 
_buffer_limit) &&
-            !result->eos) {
+            !result->eos && (bytes + _last_batch_bytes) <= 
config::thrift_max_message_size) {
             std::vector<std::string>& back_rows = 
_batch_queue.back()->result_batch.rows;
             std::vector<std::string>& result_rows = result->result_batch.rows;
             back_rows.insert(back_rows.end(), 
std::make_move_iterator(result_rows.begin()),
                              std::make_move_iterator(result_rows.end()));
+            _last_batch_bytes += bytes;
         } else {
             _batch_queue.push_back(std::move(result));
+            _last_batch_bytes = bytes;
         }
         _buffer_rows += num_rows;
         _data_arrival.notify_one();
diff --git a/be/src/runtime/buffer_control_block.h 
b/be/src/runtime/buffer_control_block.h
index 0bb38c54e00..4087108c1f0 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -103,6 +103,7 @@ protected:
     std::atomic_int _buffer_rows;
     const int _buffer_limit;
     int64_t _packet_num;
+    size_t _last_batch_bytes = 0;
 
     // blocking queue for batch
     ResultQueue _batch_queue;
diff --git a/be/src/vec/sink/vmysql_result_writer.cpp 
b/be/src/vec/sink/vmysql_result_writer.cpp
index 622e3811164..4ce8a6f827b 100644
--- a/be/src/vec/sink/vmysql_result_writer.cpp
+++ b/be/src/vec/sink/vmysql_result_writer.cpp
@@ -601,9 +601,8 @@ int 
VMysqlResultWriter<is_binary_format>::_add_one_cell(const ColumnPtr& column_
 template <bool is_binary_format>
 Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
     SCOPED_TIMER(_append_row_batch_timer);
-    Status status = Status::OK();
     if (UNLIKELY(input_block.rows() == 0)) {
-        return status;
+        return Status::OK();
     }
 
     DCHECK(_output_vexpr_ctxs.empty() != true);
@@ -613,7 +612,36 @@ Status 
VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
     Block block;
     
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
                                                                        
input_block, &block));
+
+    const auto total_bytes = block.bytes();
+    if (total_bytes > config::thrift_max_message_size) [[unlikely]] {
+        const auto total_rows = block.rows();
+        const auto sub_block_count = (total_bytes + 
config::thrift_max_message_size - 1) /
+                                     config::thrift_max_message_size;
+        const auto sub_block_rows = (total_rows + sub_block_count - 1) / 
sub_block_count;
+
+        size_t offset = 0;
+        while (offset < total_rows) {
+            size_t rows = std::min(static_cast<size_t>(sub_block_rows), 
total_rows - offset);
+            auto sub_block = block.clone_empty();
+            for (size_t i = 0; i != block.columns(); ++i) {
+                sub_block.get_by_position(i).column =
+                        block.get_by_position(i).column->cut(offset, rows);
+            }
+            offset += rows;
+
+            RETURN_IF_ERROR(_append_block(sub_block));
+        }
+        return Status::OK();
+    }
+
+    return _append_block(block);
+}
+
+template <bool is_binary_format>
+Status VMysqlResultWriter<is_binary_format>::_append_block(Block& block) {
     // convert one batch
+    Status status = Status::OK();
     auto result = std::make_unique<TFetchDataResult>();
     auto num_rows = block.rows();
     result->result_batch.rows.resize(num_rows);
diff --git a/be/src/vec/sink/vmysql_result_writer.h 
b/be/src/vec/sink/vmysql_result_writer.h
index 1761a1a3bef..77f6f8e5e67 100644
--- a/be/src/vec/sink/vmysql_result_writer.h
+++ b/be/src/vec/sink/vmysql_result_writer.h
@@ -66,6 +66,8 @@ private:
     int _add_one_cell(const ColumnPtr& column_ptr, size_t row_idx, const 
DataTypePtr& type,
                       MysqlRowBuffer<is_binary_format>& buffer, int scale = 
-1);
 
+    Status _append_block(Block& block);
+
     BufferControlBlock* _sinker;
 
     const VExprContextSPtrs& _output_vexpr_ctxs;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to