This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new 79ae613136 [fix](tablet sink) check data valid of tablet sink data
(#23531)
79ae613136 is described below
commit 79ae613136d5b4a4e1cf8d3f939aec84e6f80c92
Author: TengJianPing <[email protected]>
AuthorDate: Sun Aug 27 17:32:46 2023 +0800
[fix](tablet sink) check data valid of tablet sink data (#23531)
* [fix](tablet sink) check data valid of tablet sink data
* add check
---
be/src/runtime/tablets_channel.cpp | 8 ++++++++
be/src/vec/sink/vtablet_sink.cpp | 6 ++++++
2 files changed, 14 insertions(+)
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index 70a0304056..b6501ede43 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -467,6 +467,7 @@ Status TabletsChannel::add_batch(const
TabletWriterAddRequest& request,
}
}
+ size_t row_count = 0;
auto get_send_data = [&]() {
if constexpr (std::is_same_v<TabletWriterAddRequest,
PTabletWriterAddBatchRequest>) {
return RowBatch(*_row_desc, request.row_batch());
@@ -476,6 +477,13 @@ Status TabletsChannel::add_batch(const
TabletWriterAddRequest& request,
};
auto send_data = get_send_data();
+ if constexpr (std::is_same_v<TabletWriterAddRequest,
PTabletWriterAddBatchRequest>) {
+ row_count = send_data.num_rows();
+ } else {
+ row_count = send_data.rows();
+ }
+ CHECK(row_count == request.tablet_ids_size())
+ << "block rows: " << row_count << ", tablet_ids_size: " <<
request.tablet_ids_size();
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
response->mutable_tablet_errors();
for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index b1a4e17545..e8d022682b 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -286,6 +286,8 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
// tablet_ids has already set when add row
request.set_packet_seq(_next_packet_seq);
auto block = mutable_block->to_block();
+ CHECK(block.rows() == request.tablet_ids_size())
+ << "block rows: " << block.rows() << ", tablet_ids_size: " <<
request.tablet_ids_size();
if (block.rows() > 0) {
SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
size_t uncompressed_bytes = 0, compressed_bytes = 0;
@@ -298,6 +300,10 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
_add_block_closure->clear_in_flight();
return;
}
+ {
+ vectorized::Block tmp_block(*request.mutable_block());
+ CHECK(block.rows() == tmp_block.rows());
+ }
if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) {
LOG(WARNING) << "send block too large, this rpc may failed. send
size: "
<< compressed_bytes << ", threshold: " <<
config::brpc_max_body_size
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]