This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f755809a901 [fix](sink) The issue with 2GB limit of protocol buffer
(#37990)
f755809a901 is described below
commit f755809a9019d8b3340026b00d926df967725b77
Author: Jerry Hu <[email protected]>
AuthorDate: Mon Jul 22 13:31:45 2024 +0800
[fix](sink) The issue with 2GB limit of protocol buffer (#37990)
## Proposed changes
```
Fail to serialize doris.PFetchDataResult
```
If the size of `PFetchDataResult` is greater than 2G, protocol buffer
cannot serialize the message.
---
be/src/runtime/buffer_control_block.cpp | 8 +++++
be/src/vec/sink/vmysql_result_writer.cpp | 58 ++++++++++++++++++++++++--------
be/src/vec/sink/vmysql_result_writer.h | 2 ++
3 files changed, 54 insertions(+), 14 deletions(-)
diff --git a/be/src/runtime/buffer_control_block.cpp
b/be/src/runtime/buffer_control_block.cpp
index 845afb9a84b..6f8022a0034 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -24,6 +24,7 @@
#include <google/protobuf/stubs/callback.h>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
+#include <limits>
#include <ostream>
#include <string>
#include <utility>
@@ -80,6 +81,13 @@ void GetResultBatchCtx::on_data(const
std::unique_ptr<TFetchDataResult>& t_resul
result->set_packet_seq(packet_seq);
result->set_eos(eos);
}
+
+ /// The size limit of proto buffer message is 2G
+ if (result->ByteSizeLong() > std::numeric_limits<int32_t>::max()) {
+ st = Status::InternalError("Message size exceeds 2GB: {}",
result->ByteSizeLong());
+ result->clear_row_batch();
+ result->set_empty_batch(true);
+ }
st.to_protobuf(result->mutable_status());
{ done->Run(); }
delete this;
diff --git a/be/src/vec/sink/vmysql_result_writer.cpp
b/be/src/vec/sink/vmysql_result_writer.cpp
index d700d43165d..7fcc7fcf76f 100644
--- a/be/src/vec/sink/vmysql_result_writer.cpp
+++ b/be/src/vec/sink/vmysql_result_writer.cpp
@@ -30,6 +30,7 @@
#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
+#include "common/config.h"
#include "gutil/integral_types.h"
#include "olap/hll.h"
#include "runtime/buffer_control_block.h"
@@ -140,23 +141,11 @@ Status VMysqlResultWriter<is_binary_format>::_set_options(
}
template <bool is_binary_format>
-Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block&
input_block) {
- SCOPED_TIMER(_append_row_batch_timer);
+Status VMysqlResultWriter<is_binary_format>::_write_one_block(RuntimeState*
state, Block& block) {
Status status = Status::OK();
- if (UNLIKELY(input_block.rows() == 0)) {
- return status;
- }
-
- DCHECK(_output_vexpr_ctxs.empty() != true);
-
- // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec
- // failed, just return the error status
- Block block;
-
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
-
input_block, &block));
+ auto num_rows = block.rows();
// convert one batch
auto result = std::make_unique<TFetchDataResult>();
- auto num_rows = block.rows();
result->result_batch.rows.resize(num_rows);
uint64_t bytes_sent = 0;
{
@@ -249,6 +238,47 @@ Status
VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& i
return status;
}
+template <bool is_binary_format>
+Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block&
input_block) {
+ SCOPED_TIMER(_append_row_batch_timer);
+ Status status = Status::OK();
+ if (UNLIKELY(input_block.rows() == 0)) {
+ return status;
+ }
+
+ DCHECK(_output_vexpr_ctxs.empty() != true);
+
+ // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec
+ // failed, just return the error status
+ Block block;
+
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
+
input_block, &block));
+ const auto total_bytes = block.bytes();
+
+ if (total_bytes > config::thrift_max_message_size) [[unlikely]] {
+ const auto total_rows = block.rows();
+ const auto sub_block_count = (total_bytes +
config::thrift_max_message_size - 1) /
+ config::thrift_max_message_size;
+ const auto sub_block_rows = (total_rows + sub_block_count - 1) /
sub_block_count;
+
+ size_t offset = 0;
+ while (offset < total_rows) {
+ size_t rows = std::min(sub_block_rows, total_rows - offset);
+ auto sub_block = block.clone_empty();
+ for (size_t i = 0; i != block.columns(); ++i) {
+ sub_block.get_by_position(i).column =
+ block.get_by_position(i).column->cut(offset, rows);
+ }
+ offset += rows;
+
+ RETURN_IF_ERROR(_write_one_block(state, sub_block));
+ }
+ return Status::OK();
+ }
+
+ return _write_one_block(state, block);
+}
+
template <bool is_binary_format>
Status VMysqlResultWriter<is_binary_format>::close(Status) {
COUNTER_SET(_sent_rows_counter, _written_rows);
diff --git a/be/src/vec/sink/vmysql_result_writer.h
b/be/src/vec/sink/vmysql_result_writer.h
index 1b165ecb748..b89b8cf1b90 100644
--- a/be/src/vec/sink/vmysql_result_writer.h
+++ b/be/src/vec/sink/vmysql_result_writer.h
@@ -66,6 +66,8 @@ private:
int _add_one_cell(const ColumnPtr& column_ptr, size_t row_idx, const
DataTypePtr& type,
MysqlRowBuffer<is_binary_format>& buffer, int scale =
-1);
+ Status _write_one_block(RuntimeState* state, Block& block);
+
BufferControlBlock* _sinker = nullptr;
const VExprContextSPtrs& _output_vexpr_ctxs;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]