This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new e1a9ec6231 [branch-2.0-pick] pick "[Fix](merge-on-write) Check the 
returned filtered rows from different replicas" #24191 (#24400)
e1a9ec6231 is described below

commit e1a9ec6231ce2b930e017072a48b8c1a89ca5f56
Author: bobhan1 <[email protected]>
AuthorDate: Thu Sep 14 21:26:30 2023 +0800

    [branch-2.0-pick] pick "[Fix](merge-on-write) Check the returned filtered 
rows from different replicas" #24191 (#24400)
---
 be/src/runtime/runtime_state.h                     |  4 +-
 be/src/vec/sink/vtablet_sink.cpp                   | 53 +++++++++++++++++++++-
 be/src/vec/sink/vtablet_sink.h                     | 17 +++++++
 .../test_partial_update_strict_mode.groovy         |  9 ++--
 4 files changed, 75 insertions(+), 8 deletions(-)

diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index ac9c2bcdb0..9ef2eb3c7a 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -269,8 +269,8 @@ public:
         _num_rows_load_unselected.fetch_add(num_rows);
     }
 
-    void update_num_rows_filtered_in_strict_mode_partial_update(int64_t 
num_rows) {
-        _num_rows_filtered_in_strict_mode_partial_update += num_rows;
+    void set_num_rows_filtered_in_strict_mode_partial_update(int64_t num_rows) 
{
+        _num_rows_filtered_in_strict_mode_partial_update = num_rows;
     }
 
     void set_per_fragment_instance_idx(int idx) { _per_fragment_instance_idx = 
idx; }
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 08fb10a099..ba720967a2 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -201,6 +201,13 @@ void IndexChannel::set_tablets_received_rows(
     }
 }
 
+void IndexChannel::set_tablets_filtered_rows(
+        const std::vector<std::pair<int64_t, int64_t>>& tablets_filtered_rows, 
int64_t node_id) {
+    for (const auto& [tablet_id, rows_num] : tablets_filtered_rows) {
+        _tablets_filtered_rows[tablet_id].emplace_back(node_id, rows_num);
+    }
+}
+
 Status IndexChannel::check_tablet_received_rows_consistency() {
     for (auto& tablet : _tablets_received_rows) {
         for (size_t i = 0; i < tablet.second.size(); i++) {
@@ -225,6 +232,30 @@ Status 
IndexChannel::check_tablet_received_rows_consistency() {
     return Status::OK();
 }
 
+Status IndexChannel::check_tablet_filtered_rows_consistency() {
+    for (auto& tablet : _tablets_filtered_rows) {
+        for (size_t i = 0; i < tablet.second.size(); i++) {
+            VLOG_NOTICE << "check_tablet_filtered_rows_consistency, load_id: " 
<< _parent->_load_id
+                        << ", txn_id: " << std::to_string(_parent->_txn_id)
+                        << ", tablet_id: " << tablet.first
+                        << ", node_id: " << tablet.second[i].first
+                        << ", rows_num: " << tablet.second[i].second;
+            if (i == 0) {
+                continue;
+            }
+            if (tablet.second[i].second != tablet.second[0].second) {
+                return Status::InternalError(
+                        "rows num filtered by multi replicas doest't match, 
load_id={}, txn_id={}, "
+                        "tablt_id={}, node_id={}, rows_num={}, node_id={}, 
rows_num={}",
+                        print_id(_parent->_load_id), _parent->_txn_id, 
tablet.first,
+                        tablet.second[i].first, tablet.second[i].second, 
tablet.second[0].first,
+                        tablet.second[0].second);
+            }
+        }
+    }
+    return Status::OK();
+}
+
 VNodeChannel::VNodeChannel(VOlapTableSink* parent, IndexChannel* 
index_channel, int64_t node_id)
         : _parent(parent), _index_channel(index_channel), _node_id(node_id) {
     _node_channel_tracker = std::make_shared<MemTracker>(fmt::format(
@@ -432,8 +463,8 @@ Status VNodeChannel::open_wait() {
                                                             
tablet.received_rows());
                     }
                     if (tablet.has_num_rows_filtered()) {
-                        
_state->update_num_rows_filtered_in_strict_mode_partial_update(
-                                tablet.num_rows_filtered());
+                        _tablets_filtered_rows.emplace_back(tablet.tablet_id(),
+                                                            
tablet.num_rows_filtered());
                     }
                     VLOG_CRITICAL << "master replica commit info: tabletId=" 
<< tablet.tablet_id()
                                   << ", backendId=" << _node_id
@@ -866,6 +897,7 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
 
         _index_channel->set_error_tablet_in_state(state);
         _index_channel->set_tablets_received_rows(_tablets_received_rows, 
_node_id);
+        _index_channel->set_tablets_filtered_rows(_tablets_filtered_rows, 
_node_id);
         return Status::OK();
     }
 
@@ -1398,6 +1430,8 @@ Status 
VOlapTableSink::_cancel_channel_and_check_intolerable_failure(
         status = index_st;
     } else if (Status st = ich->check_tablet_received_rows_consistency(); 
!st.ok()) {
         status = st;
+    } else if (Status st = ich->check_tablet_filtered_rows_consistency(); 
!st.ok()) {
+        status = st;
     }
     return status;
 }
@@ -1521,6 +1555,21 @@ Status VOlapTableSink::close(RuntimeState* state, Status 
exec_status) {
                                             &total_wait_exec_time_ns, 
&wait_exec_time,
                                             &total_add_batch_num);
                         });
+
+                // Due to the non-determinism of compaction, the rowsets of 
each replica may be different from each other on different
+                // BE nodes. The number of rows filtered in SegmentWriter 
depends on the historical rowsets located in the correspoding
+                // BE node. So we check the number of rows filtered on each 
succeccful BE to ensure the consistency of the current load
+                if (!_write_single_replica && _schema->is_strict_mode() &&
+                    _schema->is_partial_update()) {
+                    if (Status st = 
index_channel->check_tablet_filtered_rows_consistency();
+                        !st.ok()) {
+                        status = st;
+                    } else {
+                        
state->set_num_rows_filtered_in_strict_mode_partial_update(
+                                index_channel->num_rows_filtered());
+                    }
+                }
+
                 num_node_channels += index_channel->num_node_channels();
                 if (add_batch_exec_time > max_add_batch_exec_time_ns) {
                     max_add_batch_exec_time_ns = add_batch_exec_time;
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index 7b851c420e..26d2ba55f3 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -37,6 +37,7 @@
 #include <map>
 #include <memory>
 #include <mutex>
+#include <numeric>
 #include <ostream>
 #include <queue>
 #include <set>
@@ -370,6 +371,8 @@ protected:
     RuntimeState* _state;
     // rows number received per tablet, tablet_id -> rows_num
     std::vector<std::pair<int64_t, int64_t>> _tablets_received_rows;
+    // rows number filtered per tablet, tablet_id -> filtered_rows_num
+    std::vector<std::pair<int64_t, int64_t>> _tablets_filtered_rows;
 
     std::unique_ptr<vectorized::MutableBlock> _cur_mutable_block;
     PTabletWriterAddBlockRequest _cur_add_block_request;
@@ -418,9 +421,20 @@ public:
 
     void set_tablets_received_rows(
             const std::vector<std::pair<int64_t, int64_t>>& 
tablets_received_rows, int64_t node_id);
+    void set_tablets_filtered_rows(
+            const std::vector<std::pair<int64_t, int64_t>>& 
tablets_filtered_rows, int64_t node_id);
+    int64_t num_rows_filtered() {
+        CHECK(!_tablets_filtered_rows.empty());
+        // the Unique table has no roll up or materilized view
+        // we just add up filtered rows from all partitions
+        return std::accumulate(_tablets_filtered_rows.cbegin(), 
_tablets_filtered_rows.cend(), 0,
+                               [](int64_t sum, const auto& a) { return sum + 
a.second[0].second; });
+    }
 
     // check whether the rows num written by different replicas is consistent
     Status check_tablet_received_rows_consistency();
+    // check whether the rows num filtered by different replicas is consistent
+    Status check_tablet_filtered_rows_consistency();
 
     vectorized::VExprContextSPtr get_where_clause() { return _where_clause; }
 
@@ -456,6 +470,9 @@ private:
     // rows num received by DeltaWriter per tablet, tablet_id -> <node_Id, 
rows_num>
     // used to verify whether the rows num received by different replicas is 
consistent
     std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>> 
_tablets_received_rows;
+    // rows num filtered by DeltaWriter per tablet, tablet_id -> <node_Id, 
filtered_rows_num>
+    // used to verify whether the rows num filtered by different replicas is 
consistent
+    std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>> 
_tablets_filtered_rows;
 };
 
 // Write block data to Olap Table.
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy
index 256cf018d5..3b7a875f2c 100644
--- 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy
@@ -30,7 +30,7 @@ suite("test_partial_update_strict_mode", "p0") {
                 `last_access_time` datetime NULL 
             ) ENGINE = OLAP UNIQUE KEY(`id`) 
             COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) 
-            BUCKETS AUTO PROPERTIES ( 
+            BUCKETS 4 PROPERTIES ( 
                 "replication_allocation" = "tag.location.default: 1", 
                 "storage_format" = "V2", 
                 "enable_unique_key_merge_on_write" = "true", 
@@ -80,7 +80,7 @@ suite("test_partial_update_strict_mode", "p0") {
                 `last_access_time` datetime NULL 
             ) ENGINE = OLAP UNIQUE KEY(`id`) 
             COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) 
-            BUCKETS AUTO PROPERTIES ( 
+            BUCKETS 4 PROPERTIES ( 
                 "replication_allocation" = "tag.location.default: 1", 
                 "storage_format" = "V2", 
                 "enable_unique_key_merge_on_write" = "true", 
@@ -129,7 +129,7 @@ suite("test_partial_update_strict_mode", "p0") {
                 `last_access_time` datetime NULL 
             ) ENGINE = OLAP UNIQUE KEY(`id`) 
             COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) 
-            BUCKETS AUTO PROPERTIES ( 
+            BUCKETS 4 PROPERTIES ( 
                 "replication_allocation" = "tag.location.default: 1", 
                 "storage_format" = "V2", 
                 "enable_unique_key_merge_on_write" = "true", 
@@ -179,7 +179,7 @@ suite("test_partial_update_strict_mode", "p0") {
                 `last_access_time` datetime NULL 
             ) ENGINE = OLAP UNIQUE KEY(`id`) 
             COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) 
-            BUCKETS AUTO PROPERTIES ( 
+            BUCKETS 4 PROPERTIES ( 
                 "replication_allocation" = "tag.location.default: 1", 
                 "storage_format" = "V2", 
                 "enable_unique_key_merge_on_write" = "true", 
@@ -211,4 +211,5 @@ suite("test_partial_update_strict_mode", "p0") {
 
     sql "sync"
     qt_sql """select * from ${tableName4} order by id;"""
+    sql """ DROP TABLE IF EXISTS ${tableName4}; """
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to