This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 11f8b53016b [fix](runtime) Avoid merging results into one large result 
in BufferControlBlock (#49571)
11f8b53016b is described below

commit 11f8b53016b49b2267cedd53d22c5d5d3472e5f5
Author: Jerry Hu <[email protected]>
AuthorDate: Thu Mar 27 19:49:58 2025 +0800

    [fix](runtime) Avoid merging results into one large result in 
BufferControlBlock (#49571)
---
 be/src/runtime/buffer_control_block.cpp | 9 ++++++++-
 be/src/runtime/buffer_control_block.h   | 1 +
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git a/be/src/runtime/buffer_control_block.cpp 
b/be/src/runtime/buffer_control_block.cpp
index af7ebf2ec64..37f9edf2867 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -23,6 +23,7 @@
 #include <glog/logging.h>
 #include <google/protobuf/stubs/callback.h>
 // IWYU pragma: no_include <bits/chrono.h>
+#include <algorithm>
 #include <chrono> // IWYU pragma: keep
 #include <limits>
 #include <ostream>
@@ -32,6 +33,7 @@
 
 #include "arrow/record_batch.h"
 #include "arrow/type_fwd.h"
+#include "common/config.h"
 #include "pipeline/exec/result_sink_operator.h"
 #include "runtime/thread_context.h"
 #include "util/runtime_profile.h"
@@ -206,17 +208,22 @@ Status 
BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result,
     }
 
     if (_waiting_rpc.empty()) {
+        size_t bytes = 0;
+        std::for_each(result->result_batch.rows.cbegin(), 
result->result_batch.rows.cend(),
+                      [&bytes](const std::string& row) { bytes += row.size(); 
});
         // Merge result into batch to reduce rpc times
         if (!_fe_result_batch_queue.empty() &&
             ((_fe_result_batch_queue.back()->result_batch.rows.size() + 
num_rows) <
              _buffer_limit) &&
-            !result->eos) {
+            !result->eos && (bytes + _last_batch_bytes) <= 
config::thrift_max_message_size) {
             std::vector<std::string>& back_rows = 
_fe_result_batch_queue.back()->result_batch.rows;
             std::vector<std::string>& result_rows = result->result_batch.rows;
             back_rows.insert(back_rows.end(), 
std::make_move_iterator(result_rows.begin()),
                              std::make_move_iterator(result_rows.end()));
+            _last_batch_bytes += bytes;
         } else {
             _fe_result_batch_queue.push_back(std::move(result));
+            _last_batch_bytes = bytes;
         }
         _buffer_rows += num_rows;
     } else {
diff --git a/be/src/runtime/buffer_control_block.h 
b/be/src/runtime/buffer_control_block.h
index 724d86a2dc5..1ee6c04a803 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -155,6 +155,7 @@ protected:
     std::atomic_int _buffer_rows;
     const int _buffer_limit;
     int64_t _packet_num;
+    size_t _last_batch_bytes = 0;
 
     // blocking queue for batch
     FeResultQueue _fe_result_batch_queue;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to