This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e0d08ca6f67 branch-3.0: [Fix](cloud) `calc_sync_versions` should 
consider full compaction (#55630) (#55699)
e0d08ca6f67 is described below

commit e0d08ca6f67d9eca67eaa29bb08e4ba41deea54a
Author: bobhan1 <[email protected]>
AuthorDate: Fri Sep 5 17:47:38 2025 +0800

    branch-3.0: [Fix](cloud) `calc_sync_versions` should consider full 
compaction (#55630) (#55699)
    
    pick https://github.com/apache/doris/pull/55630
---
 .../cloud/cloud_cumulative_compaction_policy.cpp   |  11 ++
 be/src/cloud/cloud_full_compaction.cpp             |  11 ++
 be/src/cloud/cloud_meta_mgr.cpp                    |   2 +
 be/src/cloud/cloud_tablet.h                        |   3 +
 cloud/src/meta-service/meta_service.cpp            |  22 ++-
 cloud/src/meta-service/meta_service_job.cpp        |   1 +
 cloud/test/meta_service_test.cpp                   |  55 +++++--
 gensrc/proto/cloud.proto                           |   3 +-
 .../cloud/test_cloud_calc_sync_version.out         | Bin 0 -> 526 bytes
 .../cloud/test_cloud_calc_sync_version.groovy      | 179 +++++++++++++++++++++
 10 files changed, 269 insertions(+), 18 deletions(-)

diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp 
b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
index 24bd61db8fa..f4178320044 100644
--- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
@@ -219,6 +219,17 @@ int64_t 
CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point(
         int64_t last_cumulative_point) {
     TEST_INJECTION_POINT_RETURN_WITH_VALUE("new_cumulative_point", int64_t(0), 
output_rowset.get(),
                                            last_cumulative_point);
+    
DBUG_EXECUTE_IF("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point",
 {
+        auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+        auto cumu_point = dp->param<int64_t>("cumu_point", -1);
+        if (target_tablet_id == tablet->tablet_id() && cumu_point != -1) {
+            LOG_INFO(
+                    
"[CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point] "
+                    "tablet_id={}, cumu_point={}",
+                    target_tablet_id, cumu_point);
+            return cumu_point;
+        }
+    });
     // for MoW table, if there's too many versions, the delete bitmap will 
grow to
     // a very big size, which may cause the tablet meta too big and the 
`save_meta`
     // operation too slow.
diff --git a/be/src/cloud/cloud_full_compaction.cpp 
b/be/src/cloud/cloud_full_compaction.cpp
index 7358f6d1915..08e43ab2142 100644
--- a/be/src/cloud/cloud_full_compaction.cpp
+++ b/be/src/cloud/cloud_full_compaction.cpp
@@ -149,6 +149,16 @@ Status CloudFullCompaction::pick_rowsets_to_compact() {
 }
 
 Status CloudFullCompaction::execute_compact() {
+    DBUG_EXECUTE_IF("CloudFullCompaction::execute_compact.block", {
+        auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+        LOG_INFO(
+                "[verbose] CloudFullCompaction::execute_compact.block, 
target_tablet_id={}, "
+                "tablet_id={}",
+                target_tablet_id, cloud_tablet()->tablet_id());
+        if (target_tablet_id == cloud_tablet()->tablet_id()) {
+            DBUG_BLOCK;
+        }
+    });
     
TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudFullCompaction::execute_compact_impl", 
Status::OK(),
                                       this);
 #ifndef __APPLE__
@@ -272,6 +282,7 @@ Status CloudFullCompaction::modify_rowsets() {
         cloud_tablet()->delete_rowsets(_input_rowsets, wrlock);
         cloud_tablet()->add_rowsets({_output_rowset}, false, wrlock);
         cloud_tablet()->set_base_compaction_cnt(stats.base_compaction_cnt());
+        cloud_tablet()->set_full_compaction_cnt(stats.full_compaction_cnt());
         cloud_tablet()->set_cumulative_layer_point(stats.cumulative_point());
         if (output_rowset_delete_bitmap) {
             
_tablet->tablet_meta()->delete_bitmap()->merge(*output_rowset_delete_bitmap);
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 904e8e2e099..a7a560f6719 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -521,6 +521,7 @@ Status 
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
             }
             req.set_base_compaction_cnt(tablet->base_compaction_cnt());
             
req.set_cumulative_compaction_cnt(tablet->cumulative_compaction_cnt());
+            req.set_full_compaction_cnt(tablet->full_compaction_cnt());
             req.set_cumulative_point(tablet->cumulative_layer_point());
         }
         req.set_end_version(-1);
@@ -772,6 +773,7 @@ Status 
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
             tablet->last_cumu_compaction_success_time_ms = 
stats.last_cumu_compaction_time_ms();
             tablet->set_base_compaction_cnt(stats.base_compaction_cnt());
             
tablet->set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
+            tablet->set_full_compaction_cnt(stats.full_compaction_cnt());
             tablet->set_cumulative_layer_point(stats.cumulative_point());
             tablet->reset_approximate_stats(stats.num_rowsets(), 
stats.num_segments(),
                                             stats.num_rows(), 
stats.data_size());
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index d1ea7dec379..1f3c202d06a 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -123,12 +123,14 @@ public:
     int64_t max_version_unlocked() const override { return _max_version; }
     int64_t base_compaction_cnt() const { return _base_compaction_cnt; }
     int64_t cumulative_compaction_cnt() const { return 
_cumulative_compaction_cnt; }
+    int64_t full_compaction_cnt() const { return _full_compaction_cnt; }
     int64_t cumulative_layer_point() const {
         return _cumulative_point.load(std::memory_order_relaxed);
     }
 
     void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; }
     void set_cumulative_compaction_cnt(int64_t cnt) { 
_cumulative_compaction_cnt = cnt; }
+    void set_full_compaction_cnt(int64_t cnt) { _full_compaction_cnt = cnt; }
     void set_cumulative_layer_point(int64_t new_point);
 
     int64_t last_cumu_compaction_failure_time() { return 
_last_cumu_compaction_failure_millis; }
@@ -322,6 +324,7 @@ private:
 
     int64_t _base_compaction_cnt = 0;
     int64_t _cumulative_compaction_cnt = 0;
+    int64_t _full_compaction_cnt = 0;
     int64_t _max_version = -1;
     int64_t _base_size = 0;
     int64_t _alter_version = -1;
diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index 2fa746e5f69..68ee07cd268 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -1603,6 +1603,7 @@ void internal_get_rowset(Transaction* txn, int64_t start, 
int64_t end,
 std::vector<std::pair<int64_t, int64_t>> calc_sync_versions(int64_t 
req_bc_cnt, int64_t bc_cnt,
                                                             int64_t 
req_cc_cnt, int64_t cc_cnt,
                                                             int64_t req_cp, 
int64_t cp,
+                                                            int64_t 
req_fc_cnt, int64_t fc_cnt,
                                                             int64_t req_start, 
int64_t req_end) {
     using Version = std::pair<int64_t, int64_t>;
     // combine `v1` `v2`  to `v1`, return true if success
@@ -1628,8 +1629,8 @@ std::vector<std::pair<int64_t, int64_t>> 
calc_sync_versions(int64_t req_bc_cnt,
 
     if (req_cc_cnt < cc_cnt) {
         Version cc_version;
-        if (req_cp < cp && req_cc_cnt + 1 == cc_cnt) {
-            // * only one CC happened and CP changed
+        if (req_cp < cp && req_cc_cnt + 1 == cc_cnt && req_fc_cnt == fc_cnt) {
+            // * only one CC happened and CP changed, and no full compaction 
happened
             // BE  [=][=][=][=][=====][=][=]
             //                  ^~~~~ req_cp
             // MS  [=][=][=][=][xxxxxxxxxxxxxx][=======][=][=]
@@ -1653,6 +1654,13 @@ std::vector<std::pair<int64_t, int64_t>> 
calc_sync_versions(int64_t req_bc_cnt,
             //                  ^_____________________^ related_versions: 
[req_cp, max]
             //                                           there may be holes if 
we don't return all version
             //                                           after ms_cp, however 
it can be optimized.
+            // * one CC happened and CP changed, and full compaction happened
+            // BE  [=][=][=][=][=][=][=][=][=][=]
+            //                  ^~~~~ req_cp
+            // MS  [xxxxxxxxxxxxxx][xxxxxxxxxxxxxx][=======][=][=]
+            //                      ^~~~~~~ ms_cp
+            //                  ^___________________________^ 
related_versions: [req_cp, max]
+            //
             cc_version = {req_cp, std::numeric_limits<int64_t>::max() - 1};
         }
         if (versions.empty() || !combine_if_overlapping(versions.front(), 
cc_version)) {
@@ -1723,6 +1731,7 @@ void 
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
     }
     int64_t req_bc_cnt = request->base_compaction_cnt();
     int64_t req_cc_cnt = request->cumulative_compaction_cnt();
+    int64_t req_fc_cnt = request->has_full_compaction_cnt() ? 
request->full_compaction_cnt() : 0;
     int64_t req_cp = request->cumulative_point();
 
     do {
@@ -1807,6 +1816,8 @@ void 
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
 
         int64_t bc_cnt = tablet_stat.base_compaction_cnt();
         int64_t cc_cnt = tablet_stat.cumulative_compaction_cnt();
+        int64_t fc_cnt =
+                tablet_stat.has_full_compaction_cnt() ? 
tablet_stat.full_compaction_cnt() : 0;
         int64_t cp = tablet_stat.cumulative_point();
 
         response->mutable_stats()->CopyFrom(tablet_stat);
@@ -1818,17 +1829,18 @@ void 
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
         
//==========================================================================
         //      Find version ranges to be synchronized due to compaction
         
//==========================================================================
-        if (req_bc_cnt > bc_cnt || req_cc_cnt > cc_cnt || req_cp > cp) {
+        if (req_bc_cnt > bc_cnt || req_cc_cnt > cc_cnt || req_cp > cp || 
req_fc_cnt > fc_cnt) {
             code = MetaServiceCode::INVALID_ARGUMENT;
             ss << "no valid compaction_cnt or cumulative_point given. 
req_bc_cnt=" << req_bc_cnt
                << ", bc_cnt=" << bc_cnt << ", req_cc_cnt=" << req_cc_cnt << ", 
cc_cnt=" << cc_cnt
-               << ", req_cp=" << req_cp << ", cp=" << cp << " tablet_id=" << 
tablet_id;
+               << " req_fc_cnt=" << req_fc_cnt << ", fc_cnt=" << fc_cnt << ", 
req_cp=" << req_cp
+               << ", cp=" << cp << " tablet_id=" << tablet_id;
             msg = ss.str();
             LOG(WARNING) << msg;
             return;
         }
         auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
-                                           req_start, req_end);
+                                           req_fc_cnt, fc_cnt, req_start, 
req_end);
         for (auto [start, end] : versions) {
             internal_get_rowset(txn.get(), start, end, instance_id, tablet_id, 
code, msg, response);
             if (code != MetaServiceCode::OK) {
diff --git a/cloud/src/meta-service/meta_service_job.cpp 
b/cloud/src/meta-service/meta_service_job.cpp
index 7b1f1346203..76b6a8d0c43 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -755,6 +755,7 @@ void process_compaction_job(MetaServiceCode& code, 
std::string& msg, std::string
     } else if (compaction.type() == TabletCompactionJobPB::FULL) {
         // clang-format off
         stats->set_base_compaction_cnt(stats->base_compaction_cnt() + 1);
+        stats->set_full_compaction_cnt(stats->has_full_compaction_cnt() ? 
stats->full_compaction_cnt() + 1 : 1);
         if (compaction.output_cumulative_point() > stats->cumulative_point()) {
             // After supporting parallel cumu compaction, compaction with 
older cumu point may be committed after
             // new cumu point has been set, MUST NOT set cumu point back to 
old value
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 8150dced5bd..6f94ec3b46a 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -3781,7 +3781,7 @@ TEST(MetaServiceTest, FilterCopyFilesTest) {
 
 extern std::vector<std::pair<int64_t, int64_t>> calc_sync_versions(
         int64_t req_bc_cnt, int64_t bc_cnt, int64_t req_cc_cnt, int64_t 
cc_cnt, int64_t req_cp,
-        int64_t cp, int64_t req_start, int64_t req_end);
+        int64_t cp, int64_t req_fc_cnt, int64_t fc_cnt, int64_t req_start, 
int64_t req_end);
 
 TEST(MetaServiceTest, CalcSyncVersionsTest) {
     using Versions = std::vector<std::pair<int64_t, int64_t>>;
@@ -3797,7 +3797,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 1};
         auto [req_cp, cp] = std::tuple {5, 5};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         ASSERT_EQ(versions, (Versions {{8, 12}}));
     }
@@ -3813,7 +3813,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
         auto [req_cp, cp] = std::tuple {5, 10};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         ASSERT_EQ(versions, (Versions {{5, 12}})); // [5, 9] v [8, 12]
     }
@@ -3822,7 +3822,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
         auto [req_cp, cp] = std::tuple {5, 15};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         ASSERT_EQ(versions, (Versions {{5, 14}})); // [5, 14] v [8, 12]
     }
@@ -3839,7 +3839,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
         auto [req_cp, cp] = std::tuple {5, 5};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         ASSERT_EQ(versions, (Versions {{5, INT64_MAX - 1}})); // [5, max] v 
[8, 12]
     }
@@ -3855,7 +3855,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 3};
         auto [req_cp, cp] = std::tuple {5, 5};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         ASSERT_EQ(versions, (Versions {{5, INT64_MAX - 1}})); // [5, max] v 
[8, 12]
     }
@@ -3870,7 +3870,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 3};
         auto [req_cp, cp] = std::tuple {5, 15};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         ASSERT_EQ(versions, (Versions {{5, INT64_MAX - 1}})); // [5, max] v 
[8, 12]
     }
@@ -3886,7 +3886,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 1};
         auto [req_cp, cp] = std::tuple {5, 5};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         ASSERT_EQ(versions, (Versions {{0, 4}, {8, 12}}));
     }
@@ -3895,7 +3895,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 1};
         auto [req_cp, cp] = std::tuple {8, 8};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         ASSERT_EQ(versions, (Versions {{0, 12}})); // [0, 7] v [8, 12]
     }
@@ -3904,7 +3904,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
         auto [req_cp, cp] = std::tuple {5, 10};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         ASSERT_EQ(versions, (Versions {{0, 12}})); // [0, 4] v [5, 9] v [8, 12]
     }
@@ -3913,7 +3913,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
         auto [req_cp, cp] = std::tuple {5, 15};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         ASSERT_EQ(versions, (Versions {{0, 14}})); // [0, 4] v [5, 14] v [8, 
12]
     }
@@ -3922,11 +3922,42 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
         auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
         auto [req_cp, cp] = std::tuple {5, 5};
         auto [req_start, req_end] = std::tuple {8, 12};
-        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
                                            req_start, req_end);
         // [0, 4] v [5, max] v [8, 12]
         ASSERT_EQ(versions, (Versions {{0, INT64_MAX - 1}}));
     }
+
+    {
+        // when there exists full compaction, we can't optimize by "* only one 
CC happened and CP changed"
+
+        // * one CC happened and CP changed, and full compaction happened
+        // BE  [=][=][=][=][=][=][=][=][=][=]
+        //                  ^~~~~ req_cp
+        // MS  [xxxxxxxxxxxxxx][xxxxxxxxxxxxxx][=======][=][=]
+        //                      ^~~~~~~ ms_cp
+        //                  ^___________________________^ related_versions: 
[req_cp, max]
+        //
+        auto [req_bc_cnt, bc_cnt] = std::tuple {0, 1};
+        auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
+        auto [req_cp, cp] = std::tuple {4, 7};
+        auto [req_start, req_end] = std::tuple {9, 12};
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 1,
+                                           req_start, req_end);
+        ASSERT_EQ(versions, (Versions {{0, INT64_MAX - 1}}));
+    }
+
+    {
+        // abnormal case:
+        auto [req_bc_cnt, bc_cnt] = std::tuple {0, 1};
+        auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
+        auto [req_cp, cp] = std::tuple {4, 7};
+        auto [req_start, req_end] = std::tuple {9, 12};
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp, 0, 0,
+                                           req_start, req_end);
+        // when not considering full compaction, the returned versions is 
wrong becasue rowsets in [7-8] are missed
+        ASSERT_EQ(versions, (Versions {{0, 6}, {9, 12}}));
+    }
 }
 
 TEST(MetaServiceTest, StageTest) {
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index bc4a219d026..345e03e8574 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -624,7 +624,7 @@ message TabletStatsPB {
     optional int64 cumulative_point = 9;
     optional int64 last_base_compaction_time_ms = 10;
     optional int64 last_cumu_compaction_time_ms = 11;
-    optional int64 full_compaction_cnt = 12;
+    optional int64 full_compaction_cnt = 12; // used by calc_sync_versions() 
only
     optional int64 last_full_compaction_time_ms = 13;
     optional int64 index_size = 14;
     optional int64 segment_size = 15; 
@@ -1043,6 +1043,7 @@ message GetRowsetRequest {
     // for compability reason we use FILL_WITH_DICT as default
     optional SchemaOp schema_op = 8  [default = FILL_WITH_DICT];
     optional string request_ip = 9;
+    optional int64 full_compaction_cnt = 10;
 }
 
 message GetRowsetResponse {
diff --git 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_calc_sync_version.out
 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_calc_sync_version.out
new file mode 100644
index 00000000000..ed930452a5d
Binary files /dev/null and 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_calc_sync_version.out
 differ
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_calc_sync_version.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_calc_sync_version.groovy
new file mode 100644
index 00000000000..12bdd2dd007
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_calc_sync_version.groovy
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_cloud_calc_sync_version","docker") {
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'cloud_tablet_rebalancer_interval_second=1',
+    ]
+    options.enableDebugPoints()
+    options.cloudMode = true
+
+    docker(options) {
+        def write_cluster = "write_cluster"
+        def read_cluster = "read_cluster"
+
+        // Add two clusters
+        cluster.addBackend(1, write_cluster)
+        cluster.addBackend(1, read_cluster)
+
+        sql "use @${write_cluster}"
+        logger.info("==== switch to write cluster")
+        def tableName = "test_cloud_calc_sync_version"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `k` int ,
+                `v` int ,
+            ) engine=olap
+            UNIQUE KEY(k)
+            DISTRIBUTED BY HASH(`k`) BUCKETS 1
+            properties(
+                "replication_num" = "1",
+                "disable_auto_compaction" = "true")
+            """
+
+        sql """ INSERT INTO ${tableName} VALUES (1,10)"""
+        sql """ INSERT INTO ${tableName} VALUES (2,20)"""
+        sql """ INSERT INTO ${tableName} VALUES (3,30)"""
+        qt_sql "select * from ${tableName} order by k;"
+
+        def check_rs_metas = { tbl, check_func -> 
+            def compactionUrl = sql_return_maparray("show tablets from 
${tbl};").get(0).CompactionStatus
+            def (code, out, err) = curl("GET", compactionUrl)
+            assert code == 0
+            def jsonMeta = parseJson(out.trim())
+            logger.info("==== rowsets: ${jsonMeta.rowsets}, cumu point: 
${jsonMeta["cumulative point"]}")
+            check_func(jsonMeta.rowsets, jsonMeta["cumulative point"])
+        }
+
+        def tabletStats = sql_return_maparray("show tablets from 
${tableName};")
+        def tabletId = tabletStats[0].TabletId
+        def tabletBackendId = tabletStats[0].BackendId
+        def tabletBackend
+        def backends = sql_return_maparray('show backends')
+        for (def be : backends) {
+            if (be.BackendId == tabletBackendId) {
+                tabletBackend = be
+                break;
+            }
+        }
+        logger.info("==== tablet ${tabletId} on backend ${tabletBackend.Host} 
with backendId=${tabletBackend.BackendId}");
+
+        GetDebugPoint().clearDebugPointsForAllBEs()
+        GetDebugPoint().clearDebugPointsForAllFEs()
+
+        def do_cumu_compaction = { def tbl, def tablet_id, int start, int end, 
int cp ->
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point",
 [tablet_id: "${tablet_id}", cumu_point: "${cp}"])
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
 [tablet_id: "${tablet_id}", start_version: "${start}", end_version: "${end}"])
+
+            trigger_and_wait_compaction(tbl, "cumulative")
+
+            
GetDebugPoint().disableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point")
+            
GetDebugPoint().disableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets")
+        }
+
+        try {
+            // [2-2],[3-3],[4-4] -> [2,4]
+            do_cumu_compaction(tableName, tabletId, 2, 4, 5)
+            qt_sql "select * from ${tableName} order by k;"
+            check_rs_metas(tableName, {def rowsets, def cumu_point ->
+                assert rowsets.size() == 2
+                assert cumu_point as int == 5
+                assert rowsets[1].contains("[2-4]")
+            })
+
+            sql """ INSERT INTO ${tableName} VALUES (4,40)""" // ver=5
+            sql """ INSERT INTO ${tableName} VALUES (5,50)""" // ver=6
+            sql "sync;"
+
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudFullCompaction::execute_compact.block",
 [tablet_id: "${tabletId}"])
+            def t1 = thread("full compaction") {
+                // [2,4],[5-5],[6-6] -> [2,6]
+                sql "use @${write_cluster}"
+                trigger_and_wait_compaction(tableName, "full")
+            }
+
+            sleep(1500)
+            sql """ INSERT INTO ${tableName} VALUES (1,60)""" // ver=7
+            sql """ INSERT INTO ${tableName} VALUES (2,70)""" // ver=8
+            sql "sync;"
+            qt_write_cluster_new_write "select * from ${tableName} order by k;"
+
+
+            // read cluster sync rowsets [2-4],[5-5],[6-6],[7-7],[8-8], 
bc_cnt=0, cc_cnt=1, cp=4
+            sql "use @${read_cluster}"
+            logger.info("==== switch to read cluster")
+            qt_read_cluster_query "select * from ${tableName} order by k;"
+            check_rs_metas(tableName, {def rowsets, def cumu_point ->
+                assert rowsets.size() == 6
+                assert cumu_point as int == 5
+                assert rowsets[1].contains("[2-4]")
+                assert rowsets[2].contains("[5-5]")
+                assert rowsets[3].contains("[6-6]")
+                assert rowsets[4].contains("[7-7]")
+                assert rowsets[5].contains("[8-8]")
+            })
+
+
+            sql "use @${write_cluster}"
+            logger.info("==== switch to write cluster")
+            
GetDebugPoint().disableDebugPointForAllBEs("CloudFullCompaction::execute_compact.block")
+            t1.get()
+            qt_write_cluster_full_compaction "select * from ${tableName} order 
by k;"
+            check_rs_metas(tableName, {def rowsets, def cumu_point ->
+                assert rowsets.size() == 4
+                assert cumu_point as int == 7 // updated by full compaction
+                assert rowsets[1].contains("[2-6]")
+                assert rowsets[2].contains("[7-7]")
+                assert rowsets[3].contains("[8-8]")
+            })
+
+
+            do_cumu_compaction(tableName, tabletId, 7, 8, 7)
+            qt_write_cluster_cumu_compaction "select * from ${tableName} order 
by k;"
+            check_rs_metas(tableName, {def rowsets, def cumu_point ->
+                assert rowsets.size() == 3
+                assert cumu_point as int == 7
+                assert rowsets[1].contains("[2-6]")
+                assert rowsets[2].contains("[7-8]")
+            })
+
+            sql """ INSERT INTO ${tableName} VALUES (1,80)""" // ver=9
+            sql "sync;"
+            qt_write_cluster_new_write "select * from ${tableName} order by k;"
+
+
+            // read cluster will read dup keys of ver=9 to ver=7 because it 
will not sync rowset [7-8]
+            sql "use @${read_cluster}"
+            logger.info("==== switch to read cluster")
+            sql "set disable_nereids_rules=ELIMINATE_GROUP_BY;"
+            qt_read_cluster_check_dup_key "select k,count() from ${tableName} 
group by k having count()>1;"
+            qt_read_cluster_res "select * from ${tableName} order by k;"
+
+        } catch (Exception e) {
+            logger.info(e.getMessage())
+            throw e
+        } finally {
+            GetDebugPoint().clearDebugPointsForAllBEs()
+            GetDebugPoint().clearDebugPointsForAllFEs()
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to