This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new b5602c665a3 branch-3.1: [Fix](cloud) `calc_sync_versions` should
consider full compaction #55630 (#55710)
b5602c665a3 is described below
commit b5602c665a3c7d9f91b0aa8105cac81ea7cb7744
Author: bobhan1 <[email protected]>
AuthorDate: Sat Sep 6 00:45:03 2025 +0800
branch-3.1: [Fix](cloud) `calc_sync_versions` should consider full
compaction #55630 (#55710)
pick #55630
---
.../cloud/cloud_cumulative_compaction_policy.cpp | 11 ++
be/src/cloud/cloud_full_compaction.cpp | 11 ++
be/src/cloud/cloud_meta_mgr.cpp | 2 +
be/src/cloud/cloud_tablet.h | 3 +
cloud/src/meta-service/meta_service.cpp | 22 ++-
cloud/src/meta-service/meta_service_job.cpp | 1 +
cloud/test/meta_service_test.cpp | 55 +++++--
gensrc/proto/cloud.proto | 3 +-
.../cloud/test_cloud_calc_sync_version.out | Bin 0 -> 526 bytes
.../cloud/test_cloud_calc_sync_version.groovy | 179 +++++++++++++++++++++
10 files changed, 269 insertions(+), 18 deletions(-)
diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp
b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
index 24bd61db8fa..f4178320044 100644
--- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
@@ -219,6 +219,17 @@ int64_t
CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point(
int64_t last_cumulative_point) {
TEST_INJECTION_POINT_RETURN_WITH_VALUE("new_cumulative_point", int64_t(0),
output_rowset.get(),
last_cumulative_point);
+
DBUG_EXECUTE_IF("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point",
{
+ auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+ auto cumu_point = dp->param<int64_t>("cumu_point", -1);
+ if (target_tablet_id == tablet->tablet_id() && cumu_point != -1) {
+ LOG_INFO(
+
"[CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point] "
+ "tablet_id={}, cumu_point={}",
+ target_tablet_id, cumu_point);
+ return cumu_point;
+ }
+ });
// for MoW table, if there's too many versions, the delete bitmap will
grow to
// a very big size, which may cause the tablet meta too big and the
`save_meta`
// operation too slow.
diff --git a/be/src/cloud/cloud_full_compaction.cpp
b/be/src/cloud/cloud_full_compaction.cpp
index 7358f6d1915..08e43ab2142 100644
--- a/be/src/cloud/cloud_full_compaction.cpp
+++ b/be/src/cloud/cloud_full_compaction.cpp
@@ -149,6 +149,16 @@ Status CloudFullCompaction::pick_rowsets_to_compact() {
}
Status CloudFullCompaction::execute_compact() {
+ DBUG_EXECUTE_IF("CloudFullCompaction::execute_compact.block", {
+ auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+ LOG_INFO(
+ "[verbose] CloudFullCompaction::execute_compact.block,
target_tablet_id={}, "
+ "tablet_id={}",
+ target_tablet_id, cloud_tablet()->tablet_id());
+ if (target_tablet_id == cloud_tablet()->tablet_id()) {
+ DBUG_BLOCK;
+ }
+ });
TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudFullCompaction::execute_compact_impl",
Status::OK(),
this);
#ifndef __APPLE__
@@ -272,6 +282,7 @@ Status CloudFullCompaction::modify_rowsets() {
cloud_tablet()->delete_rowsets(_input_rowsets, wrlock);
cloud_tablet()->add_rowsets({_output_rowset}, false, wrlock);
cloud_tablet()->set_base_compaction_cnt(stats.base_compaction_cnt());
+ cloud_tablet()->set_full_compaction_cnt(stats.full_compaction_cnt());
cloud_tablet()->set_cumulative_layer_point(stats.cumulative_point());
if (output_rowset_delete_bitmap) {
_tablet->tablet_meta()->delete_bitmap()->merge(*output_rowset_delete_bitmap);
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index a05d9de402a..fbaf2daf9a3 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -518,6 +518,7 @@ Status
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
}
req.set_base_compaction_cnt(tablet->base_compaction_cnt());
req.set_cumulative_compaction_cnt(tablet->cumulative_compaction_cnt());
+ req.set_full_compaction_cnt(tablet->full_compaction_cnt());
req.set_cumulative_point(tablet->cumulative_layer_point());
}
req.set_end_version(-1);
@@ -771,6 +772,7 @@ Status
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
tablet->last_cumu_compaction_success_time_ms =
stats.last_cumu_compaction_time_ms();
tablet->set_base_compaction_cnt(stats.base_compaction_cnt());
tablet->set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
+ tablet->set_full_compaction_cnt(stats.full_compaction_cnt());
tablet->set_cumulative_layer_point(stats.cumulative_point());
tablet->reset_approximate_stats(stats.num_rowsets(),
stats.num_segments(),
stats.num_rows(),
stats.data_size());
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index 7fc52438ca3..38b9cf94e6a 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -123,12 +123,14 @@ public:
int64_t max_version_unlocked() const override { return _max_version; }
int64_t base_compaction_cnt() const { return _base_compaction_cnt; }
int64_t cumulative_compaction_cnt() const { return
_cumulative_compaction_cnt; }
+ int64_t full_compaction_cnt() const { return _full_compaction_cnt; }
int64_t cumulative_layer_point() const {
return _cumulative_point.load(std::memory_order_relaxed);
}
void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; }
void set_cumulative_compaction_cnt(int64_t cnt) {
_cumulative_compaction_cnt = cnt; }
+ void set_full_compaction_cnt(int64_t cnt) { _full_compaction_cnt = cnt; }
void set_cumulative_layer_point(int64_t new_point);
int64_t last_cumu_compaction_failure_time() { return
_last_cumu_compaction_failure_millis; }
@@ -325,6 +327,7 @@ private:
int64_t _base_compaction_cnt = 0;
int64_t _cumulative_compaction_cnt = 0;
+ int64_t _full_compaction_cnt = 0;
int64_t _max_version = -1;
int64_t _base_size = 0;
int64_t _alter_version = -1;
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index aeb3e46c7da..d5efa55c606 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -2280,6 +2280,7 @@ void internal_get_rowset(Transaction* txn, int64_t start,
int64_t end,
std::vector<std::pair<int64_t, int64_t>> calc_sync_versions(int64_t
req_bc_cnt, int64_t bc_cnt,
int64_t
req_cc_cnt, int64_t cc_cnt,
int64_t req_cp,
int64_t cp,
+ int64_t
req_fc_cnt, int64_t fc_cnt,
int64_t req_start,
int64_t req_end) {
using Version = std::pair<int64_t, int64_t>;
// combine `v1` `v2` to `v1`, return true if success
@@ -2305,8 +2306,8 @@ std::vector<std::pair<int64_t, int64_t>>
calc_sync_versions(int64_t req_bc_cnt,
if (req_cc_cnt < cc_cnt) {
Version cc_version;
- if (req_cp < cp && req_cc_cnt + 1 == cc_cnt) {
- // * only one CC happened and CP changed
+ if (req_cp < cp && req_cc_cnt + 1 == cc_cnt && req_fc_cnt == fc_cnt) {
+ // * only one CC happened and CP changed, and no full compaction
happened
// BE [=][=][=][=][=====][=][=]
// ^~~~~ req_cp
// MS [=][=][=][=][xxxxxxxxxxxxxx][=======][=][=]
@@ -2330,6 +2331,13 @@ std::vector<std::pair<int64_t, int64_t>>
calc_sync_versions(int64_t req_bc_cnt,
// ^_____________________^ related_versions:
[req_cp, max]
// there may be holes if
we don't return all version
// after ms_cp, however
it can be optimized.
+ // * one CC happened and CP changed, and full compaction happened
+ // BE [=][=][=][=][=][=][=][=][=][=]
+ // ^~~~~ req_cp
+ // MS [xxxxxxxxxxxxxx][xxxxxxxxxxxxxx][=======][=][=]
+ // ^~~~~~~ ms_cp
+ // ^___________________________^
related_versions: [req_cp, max]
+ //
cc_version = {req_cp, std::numeric_limits<int64_t>::max() - 1};
}
if (versions.empty() || !combine_if_overlapping(versions.front(),
cc_version)) {
@@ -2400,6 +2408,7 @@ void
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
}
int64_t req_bc_cnt = request->base_compaction_cnt();
int64_t req_cc_cnt = request->cumulative_compaction_cnt();
+ int64_t req_fc_cnt = request->has_full_compaction_cnt() ?
request->full_compaction_cnt() : 0;
int64_t req_cp = request->cumulative_point();
do {
@@ -2484,6 +2493,8 @@ void
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
int64_t bc_cnt = tablet_stat.base_compaction_cnt();
int64_t cc_cnt = tablet_stat.cumulative_compaction_cnt();
+ int64_t fc_cnt =
+ tablet_stat.has_full_compaction_cnt() ?
tablet_stat.full_compaction_cnt() : 0;
int64_t cp = tablet_stat.cumulative_point();
response->mutable_stats()->CopyFrom(tablet_stat);
@@ -2495,17 +2506,18 @@ void
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
//==========================================================================
// Find version ranges to be synchronized due to compaction
//==========================================================================
- if (req_bc_cnt > bc_cnt || req_cc_cnt > cc_cnt || req_cp > cp) {
+ if (req_bc_cnt > bc_cnt || req_cc_cnt > cc_cnt || req_cp > cp ||
req_fc_cnt > fc_cnt) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "no valid compaction_cnt or cumulative_point given.
req_bc_cnt=" << req_bc_cnt
<< ", bc_cnt=" << bc_cnt << ", req_cc_cnt=" << req_cc_cnt << ",
cc_cnt=" << cc_cnt
- << ", req_cp=" << req_cp << ", cp=" << cp << " tablet_id=" <<
tablet_id;
+ << " req_fc_cnt=" << req_fc_cnt << ", fc_cnt=" << fc_cnt << ",
req_cp=" << req_cp
+ << ", cp=" << cp << " tablet_id=" << tablet_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
- req_start, req_end);
+ req_fc_cnt, fc_cnt, req_start,
req_end);
for (auto [start, end] : versions) {
internal_get_rowset(txn.get(), start, end, instance_id, tablet_id,
code, msg, response);
if (code != MetaServiceCode::OK) {
diff --git a/cloud/src/meta-service/meta_service_job.cpp
b/cloud/src/meta-service/meta_service_job.cpp
index e974513c244..588a5cca1d6 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -832,6 +832,7 @@ void process_compaction_job(MetaServiceCode& code,
std::string& msg, std::string
} else if (compaction.type() == TabletCompactionJobPB::FULL) {
// clang-format off
stats->set_base_compaction_cnt(stats->base_compaction_cnt() + 1);
+ stats->set_full_compaction_cnt(stats->has_full_compaction_cnt() ?
stats->full_compaction_cnt() + 1 : 1);
if (compaction.output_cumulative_point() > stats->cumulative_point()) {
// After supporting parallel cumu compaction, compaction with
older cumu point may be committed after
// new cumu point has been set, MUST NOT set cumu point back to
old value
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index a52df83e3b1..74b9a321dd6 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -3781,7 +3781,7 @@ TEST(MetaServiceTest, FilterCopyFilesTest) {
extern std::vector<std::pair<int64_t, int64_t>> calc_sync_versions(
int64_t req_bc_cnt, int64_t bc_cnt, int64_t req_cc_cnt, int64_t
cc_cnt, int64_t req_cp,
- int64_t cp, int64_t req_start, int64_t req_end);
+ int64_t cp, int64_t req_fc_cnt, int64_t fc_cnt, int64_t req_start,
int64_t req_end);
TEST(MetaServiceTest, CalcSyncVersionsTest) {
using Versions = std::vector<std::pair<int64_t, int64_t>>;
@@ -3797,7 +3797,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 1};
auto [req_cp, cp] = std::tuple {5, 5};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{8, 12}}));
}
@@ -3813,7 +3813,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
auto [req_cp, cp] = std::tuple {5, 10};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{5, 12}})); // [5, 9] v [8, 12]
}
@@ -3822,7 +3822,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
auto [req_cp, cp] = std::tuple {5, 15};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{5, 14}})); // [5, 14] v [8, 12]
}
@@ -3839,7 +3839,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
auto [req_cp, cp] = std::tuple {5, 5};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{5, INT64_MAX - 1}})); // [5, max] v
[8, 12]
}
@@ -3855,7 +3855,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 3};
auto [req_cp, cp] = std::tuple {5, 5};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{5, INT64_MAX - 1}})); // [5, max] v
[8, 12]
}
@@ -3870,7 +3870,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 3};
auto [req_cp, cp] = std::tuple {5, 15};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{5, INT64_MAX - 1}})); // [5, max] v
[8, 12]
}
@@ -3886,7 +3886,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 1};
auto [req_cp, cp] = std::tuple {5, 5};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{0, 4}, {8, 12}}));
}
@@ -3895,7 +3895,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 1};
auto [req_cp, cp] = std::tuple {8, 8};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{0, 12}})); // [0, 7] v [8, 12]
}
@@ -3904,7 +3904,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
auto [req_cp, cp] = std::tuple {5, 10};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{0, 12}})); // [0, 4] v [5, 9] v [8, 12]
}
@@ -3913,7 +3913,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
auto [req_cp, cp] = std::tuple {5, 15};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{0, 14}})); // [0, 4] v [5, 14] v [8,
12]
}
@@ -3922,11 +3922,42 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
auto [req_cp, cp] = std::tuple {5, 5};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
// [0, 4] v [5, max] v [8, 12]
ASSERT_EQ(versions, (Versions {{0, INT64_MAX - 1}}));
}
+
+ {
+ // when there exists full compaction, we can't optimize by "* only one
CC happened and CP changed"
+
+ // * one CC happened and CP changed, and full compaction happened
+ // BE [=][=][=][=][=][=][=][=][=][=]
+ // ^~~~~ req_cp
+ // MS [xxxxxxxxxxxxxx][xxxxxxxxxxxxxx][=======][=][=]
+ // ^~~~~~~ ms_cp
+ // ^___________________________^ related_versions:
[req_cp, max]
+ //
+ auto [req_bc_cnt, bc_cnt] = std::tuple {0, 1};
+ auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
+ auto [req_cp, cp] = std::tuple {4, 7};
+ auto [req_start, req_end] = std::tuple {9, 12};
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 1,
+ req_start, req_end);
+ ASSERT_EQ(versions, (Versions {{0, INT64_MAX - 1}}));
+ }
+
+ {
+ // abnormal case:
+ auto [req_bc_cnt, bc_cnt] = std::tuple {0, 1};
+ auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
+ auto [req_cp, cp] = std::tuple {4, 7};
+ auto [req_start, req_end] = std::tuple {9, 12};
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
+ req_start, req_end);
+ // when not considering full compaction, the returned versions is
wrong becasue rowsets in [7-8] are missed
+ ASSERT_EQ(versions, (Versions {{0, 6}, {9, 12}}));
+ }
}
TEST(MetaServiceTest, StageTest) {
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index de17c954374..3e18b2dde73 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -650,7 +650,7 @@ message TabletStatsPB {
optional int64 cumulative_point = 9;
optional int64 last_base_compaction_time_ms = 10;
optional int64 last_cumu_compaction_time_ms = 11;
- optional int64 full_compaction_cnt = 12;
+ optional int64 full_compaction_cnt = 12; // used by calc_sync_versions()
only
optional int64 last_full_compaction_time_ms = 13;
optional int64 index_size = 14;
optional int64 segment_size = 15;
@@ -1069,6 +1069,7 @@ message GetRowsetRequest {
// for compability reason we use FILL_WITH_DICT as default
optional SchemaOp schema_op = 8 [default = FILL_WITH_DICT];
optional string request_ip = 9;
+ optional int64 full_compaction_cnt = 10;
}
message GetRowsetResponse {
diff --git
a/regression-test/data/fault_injection_p0/cloud/test_cloud_calc_sync_version.out
b/regression-test/data/fault_injection_p0/cloud/test_cloud_calc_sync_version.out
new file mode 100644
index 00000000000..ed930452a5d
Binary files /dev/null and
b/regression-test/data/fault_injection_p0/cloud/test_cloud_calc_sync_version.out
differ
diff --git
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_calc_sync_version.groovy
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_calc_sync_version.groovy
new file mode 100644
index 00000000000..12bdd2dd007
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_calc_sync_version.groovy
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_cloud_calc_sync_version","docker") {
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ ]
+ options.enableDebugPoints()
+ options.cloudMode = true
+
+ docker(options) {
+ def write_cluster = "write_cluster"
+ def read_cluster = "read_cluster"
+
+ // Add two clusters
+ cluster.addBackend(1, write_cluster)
+ cluster.addBackend(1, read_cluster)
+
+ sql "use @${write_cluster}"
+ logger.info("==== switch to write cluster")
+ def tableName = "test_cloud_calc_sync_version"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k` int ,
+ `v` int ,
+ ) engine=olap
+ UNIQUE KEY(k)
+ DISTRIBUTED BY HASH(`k`) BUCKETS 1
+ properties(
+ "replication_num" = "1",
+ "disable_auto_compaction" = "true")
+ """
+
+ sql """ INSERT INTO ${tableName} VALUES (1,10)"""
+ sql """ INSERT INTO ${tableName} VALUES (2,20)"""
+ sql """ INSERT INTO ${tableName} VALUES (3,30)"""
+ qt_sql "select * from ${tableName} order by k;"
+
+ def check_rs_metas = { tbl, check_func ->
+ def compactionUrl = sql_return_maparray("show tablets from
${tbl};").get(0).CompactionStatus
+ def (code, out, err) = curl("GET", compactionUrl)
+ assert code == 0
+ def jsonMeta = parseJson(out.trim())
+ logger.info("==== rowsets: ${jsonMeta.rowsets}, cumu point:
${jsonMeta["cumulative point"]}")
+ check_func(jsonMeta.rowsets, jsonMeta["cumulative point"])
+ }
+
+ def tabletStats = sql_return_maparray("show tablets from
${tableName};")
+ def tabletId = tabletStats[0].TabletId
+ def tabletBackendId = tabletStats[0].BackendId
+ def tabletBackend
+ def backends = sql_return_maparray('show backends')
+ for (def be : backends) {
+ if (be.BackendId == tabletBackendId) {
+ tabletBackend = be
+ break;
+ }
+ }
+ logger.info("==== tablet ${tabletId} on backend ${tabletBackend.Host}
with backendId=${tabletBackend.BackendId}");
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ GetDebugPoint().clearDebugPointsForAllFEs()
+
+ def do_cumu_compaction = { def tbl, def tablet_id, int start, int end,
int cp ->
+
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point",
[tablet_id: "${tablet_id}", cumu_point: "${cp}"])
+
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
[tablet_id: "${tablet_id}", start_version: "${start}", end_version: "${end}"])
+
+ trigger_and_wait_compaction(tbl, "cumulative")
+
+
GetDebugPoint().disableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point")
+
GetDebugPoint().disableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets")
+ }
+
+ try {
+ // [2-2],[3-3],[4-4] -> [2,4]
+ do_cumu_compaction(tableName, tabletId, 2, 4, 5)
+ qt_sql "select * from ${tableName} order by k;"
+ check_rs_metas(tableName, {def rowsets, def cumu_point ->
+ assert rowsets.size() == 2
+ assert cumu_point as int == 5
+ assert rowsets[1].contains("[2-4]")
+ })
+
+ sql """ INSERT INTO ${tableName} VALUES (4,40)""" // ver=5
+ sql """ INSERT INTO ${tableName} VALUES (5,50)""" // ver=6
+ sql "sync;"
+
+
GetDebugPoint().enableDebugPointForAllBEs("CloudFullCompaction::execute_compact.block",
[tablet_id: "${tabletId}"])
+ def t1 = thread("full compaction") {
+ // [2,4],[5-5],[6-6] -> [2,6]
+ sql "use @${write_cluster}"
+ trigger_and_wait_compaction(tableName, "full")
+ }
+
+ sleep(1500)
+ sql """ INSERT INTO ${tableName} VALUES (1,60)""" // ver=7
+ sql """ INSERT INTO ${tableName} VALUES (2,70)""" // ver=8
+ sql "sync;"
+ qt_write_cluster_new_write "select * from ${tableName} order by k;"
+
+
+ // read cluster sync rowsets [2-4],[5-5],[6-6],[7-7],[8-8],
bc_cnt=0, cc_cnt=1, cp=4
+ sql "use @${read_cluster}"
+ logger.info("==== switch to read cluster")
+ qt_read_cluster_query "select * from ${tableName} order by k;"
+ check_rs_metas(tableName, {def rowsets, def cumu_point ->
+ assert rowsets.size() == 6
+ assert cumu_point as int == 5
+ assert rowsets[1].contains("[2-4]")
+ assert rowsets[2].contains("[5-5]")
+ assert rowsets[3].contains("[6-6]")
+ assert rowsets[4].contains("[7-7]")
+ assert rowsets[5].contains("[8-8]")
+ })
+
+
+ sql "use @${write_cluster}"
+ logger.info("==== switch to write cluster")
+
GetDebugPoint().disableDebugPointForAllBEs("CloudFullCompaction::execute_compact.block")
+ t1.get()
+ qt_write_cluster_full_compaction "select * from ${tableName} order
by k;"
+ check_rs_metas(tableName, {def rowsets, def cumu_point ->
+ assert rowsets.size() == 4
+ assert cumu_point as int == 7 // updated by full compaction
+ assert rowsets[1].contains("[2-6]")
+ assert rowsets[2].contains("[7-7]")
+ assert rowsets[3].contains("[8-8]")
+ })
+
+
+ do_cumu_compaction(tableName, tabletId, 7, 8, 7)
+ qt_write_cluster_cumu_compaction "select * from ${tableName} order
by k;"
+ check_rs_metas(tableName, {def rowsets, def cumu_point ->
+ assert rowsets.size() == 3
+ assert cumu_point as int == 7
+ assert rowsets[1].contains("[2-6]")
+ assert rowsets[2].contains("[7-8]")
+ })
+
+ sql """ INSERT INTO ${tableName} VALUES (1,80)""" // ver=9
+ sql "sync;"
+ qt_write_cluster_new_write "select * from ${tableName} order by k;"
+
+
+ // read cluster will read dup keys of ver=9 to ver=7 because it
will not sync rowset [7-8]
+ sql "use @${read_cluster}"
+ logger.info("==== switch to read cluster")
+ sql "set disable_nereids_rules=ELIMINATE_GROUP_BY;"
+ qt_read_cluster_check_dup_key "select k,count() from ${tableName}
group by k having count()>1;"
+ qt_read_cluster_res "select * from ${tableName} order by k;"
+
+ } catch (Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]