This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 68e7cc57eff [fix](runtime) Avoid merging results into one large result
in BufferControlBlock (#49602)
68e7cc57eff is described below
commit 68e7cc57eff33c4682ef256f0994d347a8bb02d8
Author: Jerry Hu <[email protected]>
AuthorDate: Sun Mar 30 13:52:17 2025 +0800
[fix](runtime) Avoid merging results into one large result in
BufferControlBlock (#49602)
### What problem does this PR solve?
Avoid rpc error: `FLOW_CONTROL_ERROR`
Problem Summary:
Merging into an excessively large batch can cause BRPC transmission
failure with error: `FLOW_CONTROL_ERROR`
---
be/src/runtime/result_block_buffer.cpp | 12 +++++++++++-
be/src/runtime/result_block_buffer.h | 4 ++++
2 files changed, 15 insertions(+), 1 deletion(-)
diff --git a/be/src/runtime/result_block_buffer.cpp
b/be/src/runtime/result_block_buffer.cpp
index 9b26c0d4b28..47af430a17c 100644
--- a/be/src/runtime/result_block_buffer.cpp
+++ b/be/src/runtime/result_block_buffer.cpp
@@ -31,6 +31,7 @@
#include <vector>
#include "arrow/type_fwd.h"
+#include "common/config.h"
#include "pipeline/dependency.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
@@ -189,10 +190,15 @@ Status
ResultBlockBuffer<ResultCtxType>::add_batch(RuntimeState* state,
if (_waiting_rpc.empty()) {
auto sz = 0;
auto num_rows = 0;
+ size_t batch_size = 0;
if constexpr (std::is_same_v<InBlockType, vectorized::Block>) {
num_rows = result->rows();
+ batch_size = result->bytes();
} else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
num_rows = result->result_batch.rows.size();
+ for (const auto& row : result->result_batch.rows) {
+ batch_size += row.size();
+ }
}
if (!_result_batch_queue.empty()) {
if constexpr (std::is_same_v<InBlockType, vectorized::Block>) {
@@ -200,7 +206,8 @@ Status
ResultBlockBuffer<ResultCtxType>::add_batch(RuntimeState* state,
} else if constexpr (std::is_same_v<InBlockType,
TFetchDataResult>) {
sz = _result_batch_queue.back()->result_batch.rows.size();
}
- if (sz + num_rows < _buffer_limit) {
+ if (sz + num_rows < _buffer_limit &&
+ (batch_size + _last_batch_bytes) <=
config::thrift_max_message_size) {
if constexpr (std::is_same_v<InBlockType, vectorized::Block>) {
auto last_block = _result_batch_queue.back();
for (size_t i = 0; i < last_block->columns(); i++) {
@@ -214,15 +221,18 @@ Status
ResultBlockBuffer<ResultCtxType>::add_batch(RuntimeState* state,
back_rows.insert(back_rows.end(),
std::make_move_iterator(result_rows.begin()),
std::make_move_iterator(result_rows.end()));
}
+ _last_batch_bytes += batch_size;
} else {
_instance_rows_in_queue.emplace_back();
_result_batch_queue.push_back(std::move(result));
+ _last_batch_bytes = batch_size;
_arrow_data_arrival
.notify_one(); // Only valid for
get_arrow_batch(std::shared_ptr<vectorized::Block>,)
}
} else {
_instance_rows_in_queue.emplace_back();
_result_batch_queue.push_back(std::move(result));
+ _last_batch_bytes = batch_size;
_arrow_data_arrival
.notify_one(); // Only valid for
get_arrow_batch(std::shared_ptr<vectorized::Block>,)
}
diff --git a/be/src/runtime/result_block_buffer.h
b/be/src/runtime/result_block_buffer.h
index 0160fde3787..4cd738c2009 100644
--- a/be/src/runtime/result_block_buffer.h
+++ b/be/src/runtime/result_block_buffer.h
@@ -102,6 +102,10 @@ protected:
// protects all subsequent data in this block
std::mutex _lock;
+ // The last batch size in bytes.
+ // Determine whether to merge multiple batches based on the size of each
batch to avoid getting an excessively large batch after merging.
+ size_t _last_batch_bytes = 0;
+
// get arrow flight result is a sync method, need wait for data ready and
return result.
// TODO, waiting for data will block pipeline, so use a request pool to
save requests waiting for data.
std::condition_variable _arrow_data_arrival;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]