This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d98c82d6257 [Fix](cloud) `calc_sync_versions` should consider full 
compaction (#55630)
d98c82d6257 is described below

commit d98c82d6257768333f1975b8428c3894e3b8ada4
Author: bobhan1 <[email protected]>
AuthorDate: Thu Sep 4 21:37:39 2025 +0800

    [Fix](cloud) `calc_sync_versions` should consider full compaction (#55630)
    
    ### What problem does this PR solve?
    
    Currently, `MetaServiceImpl::get_rowset` use `calc_sync_versions` to
    eliminate unnecessary version ranges when BE sync rowset metas. One of
    the optimizations is as the following:
    ```cpp
    std::vector<std::pair<int64_t, int64_t>> calc_sync_versions(int64_t 
req_bc_cnt, int64_t bc_cnt,
                                                                int64_t 
req_cc_cnt, int64_t cc_cnt,
                                                                int64_t req_cp, 
int64_t cp,
                                                                int64_t 
req_start, int64_t req_end) {
        // ...
        if (req_cc_cnt < cc_cnt) {
            Version cc_version;
            if (req_cp < cp && req_cc_cnt + 1 == cc_cnt) {
                // * only one CC happened and CP changed
                // BE  [=][=][=][=][=====][=][=]
                //                  ^~~~~ req_cp
                // MS  [=][=][=][=][xxxxxxxxxxxxxx][=======][=][=]
                //                                  ^~~~~~~ ms_cp
                //                  ^____________^ related_versions: [req_cp, 
ms_cp - 1]
                //
                cc_version = {req_cp, cp - 1};
            } else {
        // ...
    }
    ```
    This optimization replies on the assumption that only cumulative
    compaction will change the cumulative point. However, full compaction
    can also change the cumulative point, which breaks the above replied
    assumption. This will cause data correctness problem in multi-cluster
    environment because it will make the tablet failed to sync some rowset
    metas forever.
    
    A data correctness problem has been observed in the following
    situaitions:
    
    1. For a certain tablet, base_compaction_cnt=14,
    cumulative_compaction_cnt=804, cumu_point=7458.
    On node A of the write cluster (cluster 0), a full compaction of
    [2-7464] and a cumulative compaction of [7465-7486] were performed. The
    stats then became base_compaction_cnt=15, cumulative_compaction_cnt=805,
    cumu_point=7465.
    2. On node B of the read cluster (cluster 1), during sync_rowset, we
    have:
    req_base_compaction_cnt=14, base_compaction_cnt=15,
    req_cumulative_compaction_cnt=804, cumulative_compaction_cnt=805,
    req_cp=7458, cp=7465,
    req_start=7487, req_end=int_max.
    3. calc_sync_version computes that the rowsets to be pulled are [0-7464]
    and [7487-int_max], but it misses the rowset [7465-7486] produced by
    cumulative compaction.
    4. Moreover, since the max_version of the tablet on cluster 1 node B has
    been updated, subsequent sync_rowset operations will also not pull the
    rowset [7465-7486].
    5. This causes duplicate keys problem on MOW table because new rowset
    will generate delete bitmap marks on [7465-7486].
    
    ---
    This PR forbids the above optimization when full compaction cnt is
    changed.
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [x] Regression test
        - [x] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 .../cloud/cloud_cumulative_compaction_policy.cpp   |  11 ++
 be/src/cloud/cloud_full_compaction.cpp             |  11 ++
 be/src/cloud/cloud_meta_mgr.cpp                    |   2 +
 be/src/cloud/cloud_tablet.h                        |   3 +
 cloud/src/meta-service/meta_service.cpp            |  22 ++-
 cloud/src/meta-service/meta_service_job.cpp        |   1 +
 cloud/test/meta_service_test.cpp                   |  55 +++++--
 gensrc/proto/cloud.proto                           |   3 +-
 .../cloud/test_cloud_calc_sync_version.out         | Bin 0 -> 526 bytes
 .../cloud/test_cloud_calc_sync_version.groovy      | 179 +++++++++++++++++++++
 10 files changed, 269 insertions(+), 18 deletions(-)

diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp 
b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
index d666311d343..6955355778f 100644
--- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
@@ -220,6 +220,17 @@ int64_t 
CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point(
         int64_t last_cumulative_point) {
     TEST_INJECTION_POINT_RETURN_WITH_VALUE("new_cumulative_point", int64_t(0), 
output_rowset.get(),
                                            last_cumulative_point);
+    
DBUG_EXECUTE_IF("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point",
 {
+        auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+        auto cumu_point = dp->param<int64_t>("cumu_point", -1);
+        if (target_tablet_id == tablet->tablet_id() && cumu_point != -1) {
+            LOG_INFO(
+                    
"[CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point] "
+                    "tablet_id={}, cumu_point={}",
+                    target_tablet_id, cumu_point);
+            return cumu_point;
+        }
+    });
     // for MoW table, if there's too many versions, the delete bitmap will 
grow to
     // a very big size, which may cause the tablet meta too big and the 
`save_meta`
     // operation too slow.
diff --git a/be/src/cloud/cloud_full_compaction.cpp 
b/be/src/cloud/cloud_full_compaction.cpp
index 9677c7e2446..1102d218001 100644
--- a/be/src/cloud/cloud_full_compaction.cpp
+++ b/be/src/cloud/cloud_full_compaction.cpp
@@ -148,6 +148,16 @@ Status CloudFullCompaction::pick_rowsets_to_compact() {
 }
 
 Status CloudFullCompaction::execute_compact() {
+    DBUG_EXECUTE_IF("CloudFullCompaction::execute_compact.block", {
+        auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+        LOG_INFO(
+                "[verbose] CloudFullCompaction::execute_compact.block, 
target_tablet_id={}, "
+                "tablet_id={}",
+                target_tablet_id, cloud_tablet()->tablet_id());
+        if (target_tablet_id == cloud_tablet()->tablet_id()) {
+            DBUG_BLOCK;
+        }
+    });
     
TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudFullCompaction::execute_compact_impl", 
Status::OK(),
                                       this);
 #ifndef __APPLE__
@@ -270,6 +280,7 @@ Status CloudFullCompaction::modify_rowsets() {
         cloud_tablet()->delete_rowsets(_input_rowsets, wrlock);
         cloud_tablet()->add_rowsets({_output_rowset}, false, wrlock);
         cloud_tablet()->set_base_compaction_cnt(stats.base_compaction_cnt());
+        cloud_tablet()->set_full_compaction_cnt(stats.full_compaction_cnt());
         cloud_tablet()->set_cumulative_layer_point(stats.cumulative_point());
         if (stats.cumulative_compaction_cnt() >= 
cloud_tablet()->cumulative_compaction_cnt()) {
             cloud_tablet()->reset_approximate_stats(stats.num_rowsets(), 
stats.num_segments(),
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index c93476d987c..ede5a74f8de 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -594,6 +594,7 @@ Status 
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
             }
             req.set_base_compaction_cnt(tablet->base_compaction_cnt());
             
req.set_cumulative_compaction_cnt(tablet->cumulative_compaction_cnt());
+            req.set_full_compaction_cnt(tablet->full_compaction_cnt());
             req.set_cumulative_point(tablet->cumulative_layer_point());
         }
         req.set_end_version(-1);
@@ -766,6 +767,7 @@ Status 
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
             tablet->last_cumu_compaction_success_time_ms = 
stats.last_cumu_compaction_time_ms();
             tablet->set_base_compaction_cnt(stats.base_compaction_cnt());
             
tablet->set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
+            tablet->set_full_compaction_cnt(stats.full_compaction_cnt());
             tablet->set_cumulative_layer_point(stats.cumulative_point());
             tablet->reset_approximate_stats(stats.num_rowsets(), 
stats.num_segments(),
                                             stats.num_rows(), 
stats.data_size());
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index 5e704952632..8a4c1ae5ced 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -134,12 +134,14 @@ public:
     int64_t max_version_unlocked() const override { return _max_version; }
     int64_t base_compaction_cnt() const { return _base_compaction_cnt; }
     int64_t cumulative_compaction_cnt() const { return 
_cumulative_compaction_cnt; }
+    int64_t full_compaction_cnt() const { return _full_compaction_cnt; }
     int64_t cumulative_layer_point() const {
         return _cumulative_point.load(std::memory_order_relaxed);
     }
 
     void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; }
     void set_cumulative_compaction_cnt(int64_t cnt) { 
_cumulative_compaction_cnt = cnt; }
+    void set_full_compaction_cnt(int64_t cnt) { _full_compaction_cnt = cnt; }
     void set_cumulative_layer_point(int64_t new_point);
 
     int64_t last_cumu_compaction_failure_time() { return 
_last_cumu_compaction_failure_millis; }
@@ -346,6 +348,7 @@ private:
 
     int64_t _base_compaction_cnt = 0;
     int64_t _cumulative_compaction_cnt = 0;
+    int64_t _full_compaction_cnt = 0;
     int64_t _max_version = -1;
     int64_t _base_size = 0;
     int64_t _alter_version = -1;
diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index a0a6362d3b1..a16cdc7def5 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -2814,6 +2814,7 @@ void internal_get_rowset(Transaction* txn, int64_t start, 
int64_t end,
 std::vector<std::pair<int64_t, int64_t>> calc_sync_versions(int64_t 
req_bc_cnt, int64_t bc_cnt,
                                                             int64_t 
req_cc_cnt, int64_t cc_cnt,
                                                             int64_t req_cp, 
int64_t cp,
+                                                            int64_t 
req_fc_cnt, int64_t fc_cnt,
                                                             int64_t req_start, 
int64_t req_end) {
     using Version = std::pair<int64_t, int64_t>;
     // combine `v1` `v2`  to `v1`, return true if success
@@ -2839,8 +2840,8 @@ std::vector<std::pair<int64_t, int64_t>> 
calc_sync_versions(int64_t req_bc_cnt,
 
     if (req_cc_cnt < cc_cnt) {
         Version cc_version;
-        if (req_cp < cp && req_cc_cnt + 1 == cc_cnt) {
-            // * only one CC happened and CP changed
+        if (req_cp < cp && req_cc_cnt + 1 == cc_cnt && req_fc_cnt == fc_cnt) {
+            // * only one CC happened and CP changed, and no full compaction 
happened
             // BE  [=][=][=][=][=====][=][=]
             //                  ^~~~~ req_cp
             // MS  [=][=][=][=][xxxxxxxxxxxxxx][=======][=][=]
@@ -2864,6 +2865,13 @@ std::vector<std::pair<int64_t, int64_t>> 
calc_sync_versions(int64_t req_bc_cnt,
             //                  ^_____________________^ related_versions: 
[req_cp, max]
             //                                           there may be holes if 
we don't return all version
             //                                           after ms_cp, however 
it can be optimized.
+            // * one CC happened and CP changed, and full compaction happened
+            // BE  [=][=][=][=][=][=][=][=][=][=]
+            //                  ^~~~~ req_cp
+            // MS  [xxxxxxxxxxxxxx][xxxxxxxxxxxxxx][=======][=][=]
+            //                      ^~~~~~~ ms_cp
+            //                  ^___________________________^ 
related_versions: [req_cp, max]
+            //
             cc_version = {req_cp, std::numeric_limits<int64_t>::max() - 1};
         }
         if (versions.empty() || !combine_if_overlapping(versions.front(), 
cc_version)) {
@@ -2954,6 +2962,7 @@ void 
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
     }
     int64_t req_bc_cnt = request->base_compaction_cnt();
     int64_t req_cc_cnt = request->cumulative_compaction_cnt();
+    int64_t req_fc_cnt = request->has_full_compaction_cnt() ? 
request->full_compaction_cnt() : 0;
     int64_t req_cp = request->cumulative_point();
 
     bool is_versioned_read = is_version_read_enabled(instance_id);
@@ -3057,6 +3066,8 @@ void 
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
 
         int64_t bc_cnt = tablet_stat.base_compaction_cnt();
         int64_t cc_cnt = tablet_stat.cumulative_compaction_cnt();
+        int64_t fc_cnt =
+                tablet_stat.has_full_compaction_cnt() ? 
tablet_stat.full_compaction_cnt() : 0;
         int64_t cp = tablet_stat.cumulative_point();
 
         response->mutable_stats()->CopyFrom(tablet_stat);
@@ -3068,17 +3079,18 @@ void 
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
         
//==========================================================================
         //      Find version ranges to be synchronized due to compaction
         
//==========================================================================
-        if (req_bc_cnt > bc_cnt || req_cc_cnt > cc_cnt || req_cp > cp) {
+        if (req_bc_cnt > bc_cnt || req_cc_cnt > cc_cnt || req_cp > cp || 
req_fc_cnt > fc_cnt) {
             code = MetaServiceCode::INVALID_ARGUMENT;
             ss << "no valid compaction_cnt or cumulative_point given. 
req_bc_cnt=" << req_bc_cnt
                << ", bc_cnt=" << bc_cnt << ", req_cc_cnt=" << req_cc_cnt << ", 
cc_cnt=" << cc_cnt
-               << ", req_cp=" << req_cp << ", cp=" << cp << " tablet_id=" << 
tablet_id;
+               << " req_fc_cnt=" << req_fc_cnt << ", fc_cnt=" << fc_cnt << ", 
req_cp=" << req_cp
+               << ", cp=" << cp << " tablet_id=" << tablet_id;
             msg = ss.str();
             LOG(WARNING) << msg;
             return;
         }
         auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
-                                           req_start, req_end);
+                                           req_fc_cnt, fc_cnt, req_start, 
req_end);
         if (!is_versioned_read) {
             for (auto [start, end] : versions) {
                 internal_get_rowset(txn.get(), start, end, instance_id, 
tablet_id, code, msg,
diff --git a/cloud/src/meta-service/meta_service_job.cpp 
b/cloud/src/meta-service/meta_service_job.cpp
index 654a032b6a1..21fe8b9663b 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -764,6 +764,7 @@ int compaction_update_tablet_stats(const 
TabletCompactionJobPB& compaction, Tabl
     } else if (compaction.type() == TabletCompactionJobPB::FULL) {
         // clang-format off
         stats->set_base_compaction_cnt(stats->base_compaction_cnt() + 1);
+        stats->set_full_compaction_cnt(stats->has_full_compaction_cnt() ? 
stats->full_compaction_cnt() + 1 : 1);
         if (compaction.output_cumulative_point() > stats->cumulative_point()) {
             // After supporting parallel cumu compaction, compaction with 
older cumu point may be committed after
             // new cumu point has been set, MUST NOT set cumu point back to 
old value
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 7936fabe146..f4e57c208ee 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -3812,7 +3812,7 @@ TEST(MetaServiceTest, FilterCopyFilesTest) {
 
 extern std::vector<std::pair<int64_t, int64_t>> calc_sync_versions(
         int64_t req_bc_cnt, int64_t bc_cnt, int64_t req_cc_cnt, int64_t 
cc_cnt, int64_t req_cp,
-        int64_t cp, int64_t req_start, int64_t req_end);
+        int64_t cp, int64_t req_fc_cnt, int64_t fc_cnt, int64_t req_start, 
int64_t req_end);
 
 TEST(MetaServiceTest, CalcSyncVersionsTest) {
     using Versions = std::vector<std::pair<int64_t, int64_t>>;
@@ -3828,7 +3828,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 1};
         auto [req_cp, cp] = std::tuple {5, 5};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         ASSERT_EQ(versions, (Versions {{8, 12}}));
     }
@@ -3844,7 +3844,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
         auto [req_cp, cp] = std::tuple {5, 10};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         ASSERT_EQ(versions, (Versions {{5, 12}})); // [5, 9] v [8, 12]
     }
@@ -3853,7 +3853,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
         auto [req_cp, cp] = std::tuple {5, 15};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         ASSERT_EQ(versions, (Versions {{5, 14}})); // [5, 14] v [8, 12]
     }
@@ -3870,7 +3870,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
         auto [req_cp, cp] = std::tuple {5, 5};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         ASSERT_EQ(versions, (Versions {{5, INT64_MAX - 1}})); // [5, max] v 
[8, 12]
     }
@@ -3886,7 +3886,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 3};
         auto [req_cp, cp] = std::tuple {5, 5};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         ASSERT_EQ(versions, (Versions {{5, INT64_MAX - 1}})); // [5, max] v 
[8, 12]
     }
@@ -3901,7 +3901,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 3};
         auto [req_cp, cp] = std::tuple {5, 15};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         ASSERT_EQ(versions, (Versions {{5, INT64_MAX - 1}})); // [5, max] v 
[8, 12]
     }
@@ -3917,7 +3917,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 1};
         auto [req_cp, cp] = std::tuple {5, 5};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         ASSERT_EQ(versions, (Versions {{0, 4}, {8, 12}}));
     }
@@ -3926,7 +3926,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 1};
         auto [req_cp, cp] = std::tuple {8, 8};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         ASSERT_EQ(versions, (Versions {{0, 12}})); // [0, 7] v [8, 12]
     }
@@ -3935,7 +3935,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
         auto [req_cp, cp] = std::tuple {5, 10};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         ASSERT_EQ(versions, (Versions {{0, 12}})); // [0, 4] v [5, 9] v [8, 12]
     }
@@ -3944,7 +3944,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
         auto [req_cp, cp] = std::tuple {5, 15};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         ASSERT_EQ(versions, (Versions {{0, 14}})); // [0, 4] v [5, 14] v [8, 
12]
     }
@@ -3953,11 +3953,42 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
         auto [req_cp, cp] = std::tuple {5, 5};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         // [0, 4] v [5, max] v [8, 12]
         ASSERT_EQ(versions, (Versions {{0, INT64_MAX - 1}}));
     }
+
+    {
+        // when there exists full compaction, we can't optimize by "* only one 
CC happened and CP changed"
+
+        // * one CC happened and CP changed, and full compaction happened
+        // BE  [=][=][=][=][=][=][=][=][=][=]
+        //                  ^~~~~ req_cp
+        // MS  [xxxxxxxxxxxxxx][xxxxxxxxxxxxxx][=======][=][=]
+        //                      ^~~~~~~ ms_cp
+        //                  ^___________________________^ related_versions: 
[req_cp, max]
+        //
+        auto [req_bc_cnt, bc_cnt] = std::tuple {0, 1};
+        auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
+        auto [req_cp, cp] = std::tuple {4, 7};
+        auto [req_start, req_end] = std::tuple {9, 12};
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 1,
+                                           req_start, req_end);
+        ASSERT_EQ(versions, (Versions {{0, INT64_MAX - 1}}));
+    }
+
+    {
+        // abnormal case:
+        auto [req_bc_cnt, bc_cnt] = std::tuple {0, 1};
+        auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
+        auto [req_cp, cp] = std::tuple {4, 7};
+        auto [req_start, req_end] = std::tuple {9, 12};
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
+                                           req_start, req_end);
+        // when not considering full compaction, the returned versions is 
wrong becasue rowsets in [7-8] are missed
+        ASSERT_EQ(versions, (Versions {{0, 6}, {9, 12}}));
+    }
 }
 
 TEST(MetaServiceTest, StageTest) {
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 0bde2977a7b..e277c24f8d7 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -816,7 +816,7 @@ message TabletStatsPB {
     optional int64 cumulative_point = 9;
     optional int64 last_base_compaction_time_ms = 10;
     optional int64 last_cumu_compaction_time_ms = 11;
-    optional int64 full_compaction_cnt = 12;
+    optional int64 full_compaction_cnt = 12; // used by calc_sync_versions() 
only
     optional int64 last_full_compaction_time_ms = 13;
     optional int64 index_size = 14;
     optional int64 segment_size = 15;
@@ -1243,6 +1243,7 @@ message GetRowsetRequest {
     // for compability reason we use FILL_WITH_DICT as default
     optional SchemaOp schema_op = 8  [default = FILL_WITH_DICT];
     optional string request_ip = 9;
+    optional int64 full_compaction_cnt = 10;
 }
 
 message GetRowsetResponse {
diff --git 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_calc_sync_version.out
 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_calc_sync_version.out
new file mode 100644
index 00000000000..ed930452a5d
Binary files /dev/null and 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_calc_sync_version.out
 differ
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_calc_sync_version.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_calc_sync_version.groovy
new file mode 100644
index 00000000000..12bdd2dd007
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_calc_sync_version.groovy
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_cloud_calc_sync_version","docker") {
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'cloud_tablet_rebalancer_interval_second=1',
+    ]
+    options.enableDebugPoints()
+    options.cloudMode = true
+
+    docker(options) {
+        def write_cluster = "write_cluster"
+        def read_cluster = "read_cluster"
+
+        // Add two clusters
+        cluster.addBackend(1, write_cluster)
+        cluster.addBackend(1, read_cluster)
+
+        sql "use @${write_cluster}"
+        logger.info("==== switch to write cluster")
+        def tableName = "test_cloud_calc_sync_version"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `k` int ,
+                `v` int ,
+            ) engine=olap
+            UNIQUE KEY(k)
+            DISTRIBUTED BY HASH(`k`) BUCKETS 1
+            properties(
+                "replication_num" = "1",
+                "disable_auto_compaction" = "true")
+            """
+
+        sql """ INSERT INTO ${tableName} VALUES (1,10)"""
+        sql """ INSERT INTO ${tableName} VALUES (2,20)"""
+        sql """ INSERT INTO ${tableName} VALUES (3,30)"""
+        qt_sql "select * from ${tableName} order by k;"
+
+        def check_rs_metas = { tbl, check_func -> 
+            def compactionUrl = sql_return_maparray("show tablets from 
${tbl};").get(0).CompactionStatus
+            def (code, out, err) = curl("GET", compactionUrl)
+            assert code == 0
+            def jsonMeta = parseJson(out.trim())
+            logger.info("==== rowsets: ${jsonMeta.rowsets}, cumu point: 
${jsonMeta["cumulative point"]}")
+            check_func(jsonMeta.rowsets, jsonMeta["cumulative point"])
+        }
+
+        def tabletStats = sql_return_maparray("show tablets from 
${tableName};")
+        def tabletId = tabletStats[0].TabletId
+        def tabletBackendId = tabletStats[0].BackendId
+        def tabletBackend
+        def backends = sql_return_maparray('show backends')
+        for (def be : backends) {
+            if (be.BackendId == tabletBackendId) {
+                tabletBackend = be
+                break;
+            }
+        }
+        logger.info("==== tablet ${tabletId} on backend ${tabletBackend.Host} 
with backendId=${tabletBackend.BackendId}");
+
+        GetDebugPoint().clearDebugPointsForAllBEs()
+        GetDebugPoint().clearDebugPointsForAllFEs()
+
+        def do_cumu_compaction = { def tbl, def tablet_id, int start, int end, 
int cp ->
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point",
 [tablet_id: "${tablet_id}", cumu_point: "${cp}"])
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
 [tablet_id: "${tablet_id}", start_version: "${start}", end_version: "${end}"])
+
+            trigger_and_wait_compaction(tbl, "cumulative")
+
+            
GetDebugPoint().disableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point")
+            
GetDebugPoint().disableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets")
+        }
+
+        try {
+            // [2-2],[3-3],[4-4] -> [2,4]
+            do_cumu_compaction(tableName, tabletId, 2, 4, 5)
+            qt_sql "select * from ${tableName} order by k;"
+            check_rs_metas(tableName, {def rowsets, def cumu_point ->
+                assert rowsets.size() == 2
+                assert cumu_point as int == 5
+                assert rowsets[1].contains("[2-4]")
+            })
+
+            sql """ INSERT INTO ${tableName} VALUES (4,40)""" // ver=5
+            sql """ INSERT INTO ${tableName} VALUES (5,50)""" // ver=6
+            sql "sync;"
+
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudFullCompaction::execute_compact.block",
 [tablet_id: "${tabletId}"])
+            def t1 = thread("full compaction") {
+                // [2,4],[5-5],[6-6] -> [2,6]
+                sql "use @${write_cluster}"
+                trigger_and_wait_compaction(tableName, "full")
+            }
+
+            sleep(1500)
+            sql """ INSERT INTO ${tableName} VALUES (1,60)""" // ver=7
+            sql """ INSERT INTO ${tableName} VALUES (2,70)""" // ver=8
+            sql "sync;"
+            qt_write_cluster_new_write "select * from ${tableName} order by k;"
+
+
+            // read cluster sync rowsets [2-4],[5-5],[6-6],[7-7],[8-8], 
bc_cnt=0, cc_cnt=1, cp=4
+            sql "use @${read_cluster}"
+            logger.info("==== switch to read cluster")
+            qt_read_cluster_query "select * from ${tableName} order by k;"
+            check_rs_metas(tableName, {def rowsets, def cumu_point ->
+                assert rowsets.size() == 6
+                assert cumu_point as int == 5
+                assert rowsets[1].contains("[2-4]")
+                assert rowsets[2].contains("[5-5]")
+                assert rowsets[3].contains("[6-6]")
+                assert rowsets[4].contains("[7-7]")
+                assert rowsets[5].contains("[8-8]")
+            })
+
+
+            sql "use @${write_cluster}"
+            logger.info("==== switch to write cluster")
+            
GetDebugPoint().disableDebugPointForAllBEs("CloudFullCompaction::execute_compact.block")
+            t1.get()
+            qt_write_cluster_full_compaction "select * from ${tableName} order 
by k;"
+            check_rs_metas(tableName, {def rowsets, def cumu_point ->
+                assert rowsets.size() == 4
+                assert cumu_point as int == 7 // updated by full compaction
+                assert rowsets[1].contains("[2-6]")
+                assert rowsets[2].contains("[7-7]")
+                assert rowsets[3].contains("[8-8]")
+            })
+
+
+            do_cumu_compaction(tableName, tabletId, 7, 8, 7)
+            qt_write_cluster_cumu_compaction "select * from ${tableName} order 
by k;"
+            check_rs_metas(tableName, {def rowsets, def cumu_point ->
+                assert rowsets.size() == 3
+                assert cumu_point as int == 7
+                assert rowsets[1].contains("[2-6]")
+                assert rowsets[2].contains("[7-8]")
+            })
+
+            sql """ INSERT INTO ${tableName} VALUES (1,80)""" // ver=9
+            sql "sync;"
+            qt_write_cluster_new_write "select * from ${tableName} order by k;"
+
+
+            // read cluster will read dup keys of ver=9 to ver=7 because it 
will not sync rowset [7-8]
+            sql "use @${read_cluster}"
+            logger.info("==== switch to read cluster")
+            sql "set disable_nereids_rules=ELIMINATE_GROUP_BY;"
+            qt_read_cluster_check_dup_key "select k,count() from ${tableName} 
group by k having count()>1;"
+            qt_read_cluster_res "select * from ${tableName} order by k;"
+
+        } catch (Exception e) {
+            logger.info(e.getMessage())
+            throw e
+        } finally {
+            GetDebugPoint().clearDebugPointsForAllBEs()
+            GetDebugPoint().clearDebugPointsForAllFEs()
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to