This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new 79ae613136 [fix](tablet sink) check data valid of tablet sink data 
(#23531)
79ae613136 is described below

commit 79ae613136d5b4a4e1cf8d3f939aec84e6f80c92
Author: TengJianPing <[email protected]>
AuthorDate: Sun Aug 27 17:32:46 2023 +0800

    [fix](tablet sink) check data valid of tablet sink data (#23531)
    
    * [fix](tablet sink) check data valid of tablet sink data
    
    * add check
---
 be/src/runtime/tablets_channel.cpp | 8 ++++++++
 be/src/vec/sink/vtablet_sink.cpp   | 6 ++++++
 2 files changed, 14 insertions(+)

diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 70a0304056..b6501ede43 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -467,6 +467,7 @@ Status TabletsChannel::add_batch(const 
TabletWriterAddRequest& request,
         }
     }
 
+    size_t row_count = 0;
     auto get_send_data = [&]() {
         if constexpr (std::is_same_v<TabletWriterAddRequest, 
PTabletWriterAddBatchRequest>) {
             return RowBatch(*_row_desc, request.row_batch());
@@ -476,6 +477,13 @@ Status TabletsChannel::add_batch(const 
TabletWriterAddRequest& request,
     };
 
     auto send_data = get_send_data();
+    if constexpr (std::is_same_v<TabletWriterAddRequest, 
PTabletWriterAddBatchRequest>) {
+        row_count = send_data.num_rows();
+    } else {
+        row_count = send_data.rows();
+    }
+    CHECK(row_count == request.tablet_ids_size())
+            << "block rows: " << row_count << ", tablet_ids_size: " << 
request.tablet_ids_size();
     google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
             response->mutable_tablet_errors();
     for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index b1a4e17545..e8d022682b 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -286,6 +286,8 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
     // tablet_ids has already set when add row
     request.set_packet_seq(_next_packet_seq);
     auto block = mutable_block->to_block();
+    CHECK(block.rows() == request.tablet_ids_size())
+        << "block rows: " << block.rows() << ", tablet_ids_size: " << 
request.tablet_ids_size();
     if (block.rows() > 0) {
         SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
         size_t uncompressed_bytes = 0, compressed_bytes = 0;
@@ -298,6 +300,10 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
             _add_block_closure->clear_in_flight();
             return;
         }
+        {
+            vectorized::Block tmp_block(*request.mutable_block());
+            CHECK(block.rows() == tmp_block.rows());
+        }
         if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) {
             LOG(WARNING) << "send block too large, this rpc may failed. send 
size: "
                          << compressed_bytes << ", threshold: " << 
config::brpc_max_body_size


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to