This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 2891f8026bc [fix](runtime) Avoid merging results into one large result
in BufferControlBlock (#49571) (#49639)
2891f8026bc is described below
commit 2891f8026bc6e2ed49b2929721d747a150451024
Author: Jerry Hu <[email protected]>
AuthorDate: Fri Jun 20 17:31:24 2025 +0800
[fix](runtime) Avoid merging results into one large result in
BufferControlBlock (#49571) (#49639)
### What problem does this PR solve?
Pick #37990 and #49571
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/runtime/buffer_control_block.cpp | 8 +++++++-
be/src/runtime/buffer_control_block.h | 1 +
be/src/vec/sink/vmysql_result_writer.cpp | 32 ++++++++++++++++++++++++++++++--
be/src/vec/sink/vmysql_result_writer.h | 2 ++
4 files changed, 40 insertions(+), 3 deletions(-)
diff --git a/be/src/runtime/buffer_control_block.cpp
b/be/src/runtime/buffer_control_block.cpp
index 62c1763861a..da4d8ef4fe6 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -23,6 +23,7 @@
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
// IWYU pragma: no_include <bits/chrono.h>
+#include <algorithm>
#include <chrono> // IWYU pragma: keep
#include <ostream>
#include <string>
@@ -130,16 +131,21 @@ Status
BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result)
}
if (_waiting_rpc.empty()) {
+ size_t bytes = 0;
+ std::for_each(result->result_batch.rows.cbegin(),
result->result_batch.rows.cend(),
+ [&bytes](const std::string& row) { bytes += row.size();
});
// Merge result into batch to reduce rpc times
if (!_batch_queue.empty() &&
((_batch_queue.back()->result_batch.rows.size() + num_rows) <
_buffer_limit) &&
- !result->eos) {
+ !result->eos && (bytes + _last_batch_bytes) <=
config::thrift_max_message_size) {
std::vector<std::string>& back_rows =
_batch_queue.back()->result_batch.rows;
std::vector<std::string>& result_rows = result->result_batch.rows;
back_rows.insert(back_rows.end(),
std::make_move_iterator(result_rows.begin()),
std::make_move_iterator(result_rows.end()));
+ _last_batch_bytes += bytes;
} else {
_batch_queue.push_back(std::move(result));
+ _last_batch_bytes = bytes;
}
_buffer_rows += num_rows;
_data_arrival.notify_one();
diff --git a/be/src/runtime/buffer_control_block.h
b/be/src/runtime/buffer_control_block.h
index 0bb38c54e00..4087108c1f0 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -103,6 +103,7 @@ protected:
std::atomic_int _buffer_rows;
const int _buffer_limit;
int64_t _packet_num;
+ size_t _last_batch_bytes = 0;
// blocking queue for batch
ResultQueue _batch_queue;
diff --git a/be/src/vec/sink/vmysql_result_writer.cpp
b/be/src/vec/sink/vmysql_result_writer.cpp
index 622e3811164..4ce8a6f827b 100644
--- a/be/src/vec/sink/vmysql_result_writer.cpp
+++ b/be/src/vec/sink/vmysql_result_writer.cpp
@@ -601,9 +601,8 @@ int
VMysqlResultWriter<is_binary_format>::_add_one_cell(const ColumnPtr& column_
template <bool is_binary_format>
Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
SCOPED_TIMER(_append_row_batch_timer);
- Status status = Status::OK();
if (UNLIKELY(input_block.rows() == 0)) {
- return status;
+ return Status::OK();
}
DCHECK(_output_vexpr_ctxs.empty() != true);
@@ -613,7 +612,36 @@ Status
VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
Block block;
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
input_block, &block));
+
+ const auto total_bytes = block.bytes();
+ if (total_bytes > config::thrift_max_message_size) [[unlikely]] {
+ const auto total_rows = block.rows();
+ const auto sub_block_count = (total_bytes +
config::thrift_max_message_size - 1) /
+ config::thrift_max_message_size;
+ const auto sub_block_rows = (total_rows + sub_block_count - 1) /
sub_block_count;
+
+ size_t offset = 0;
+ while (offset < total_rows) {
+ size_t rows = std::min(static_cast<size_t>(sub_block_rows),
total_rows - offset);
+ auto sub_block = block.clone_empty();
+ for (size_t i = 0; i != block.columns(); ++i) {
+ sub_block.get_by_position(i).column =
+ block.get_by_position(i).column->cut(offset, rows);
+ }
+ offset += rows;
+
+ RETURN_IF_ERROR(_append_block(sub_block));
+ }
+ return Status::OK();
+ }
+
+ return _append_block(block);
+}
+
+template <bool is_binary_format>
+Status VMysqlResultWriter<is_binary_format>::_append_block(Block& block) {
// convert one batch
+ Status status = Status::OK();
auto result = std::make_unique<TFetchDataResult>();
auto num_rows = block.rows();
result->result_batch.rows.resize(num_rows);
diff --git a/be/src/vec/sink/vmysql_result_writer.h
b/be/src/vec/sink/vmysql_result_writer.h
index 1761a1a3bef..77f6f8e5e67 100644
--- a/be/src/vec/sink/vmysql_result_writer.h
+++ b/be/src/vec/sink/vmysql_result_writer.h
@@ -66,6 +66,8 @@ private:
int _add_one_cell(const ColumnPtr& column_ptr, size_t row_idx, const
DataTypePtr& type,
MysqlRowBuffer<is_binary_format>& buffer, int scale =
-1);
+ Status _append_block(Block& block);
+
BufferControlBlock* _sinker;
const VExprContextSPtrs& _output_vexpr_ctxs;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]