This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d98c82d6257 [Fix](cloud) `calc_sync_versions` should consider full
compaction (#55630)
d98c82d6257 is described below
commit d98c82d6257768333f1975b8428c3894e3b8ada4
Author: bobhan1 <[email protected]>
AuthorDate: Thu Sep 4 21:37:39 2025 +0800
[Fix](cloud) `calc_sync_versions` should consider full compaction (#55630)
### What problem does this PR solve?
Currently, `MetaServiceImpl::get_rowset` use `calc_sync_versions` to
eliminate unnecessary version ranges when BE sync rowset metas. One of
the optimizations is as the following:
```cpp
std::vector<std::pair<int64_t, int64_t>> calc_sync_versions(int64_t
req_bc_cnt, int64_t bc_cnt,
int64_t
req_cc_cnt, int64_t cc_cnt,
int64_t req_cp,
int64_t cp,
int64_t
req_start, int64_t req_end) {
// ...
if (req_cc_cnt < cc_cnt) {
Version cc_version;
if (req_cp < cp && req_cc_cnt + 1 == cc_cnt) {
// * only one CC happened and CP changed
// BE [=][=][=][=][=====][=][=]
// ^~~~~ req_cp
// MS [=][=][=][=][xxxxxxxxxxxxxx][=======][=][=]
// ^~~~~~~ ms_cp
// ^____________^ related_versions: [req_cp,
ms_cp - 1]
//
cc_version = {req_cp, cp - 1};
} else {
// ...
}
```
This optimization replies on the assumption that only cumulative
compaction will change the cumulative point. However, full compaction
can also change the cumulative point, which breaks the above replied
assumption. This will cause data correctness problem in multi-cluster
environment because it will make the tablet failed to sync some rowset
metas forever.
A data correctness problem has been observed in the following
situaitions:
1. For a certain tablet, base_compaction_cnt=14,
cumulative_compaction_cnt=804, cumu_point=7458.
On node A of the write cluster (cluster 0), a full compaction of
[2-7464] and a cumulative compaction of [7465-7486] were performed. The
stats then became base_compaction_cnt=15, cumulative_compaction_cnt=805,
cumu_point=7465.
2. On node B of the read cluster (cluster 1), during sync_rowset, we
have:
req_base_compaction_cnt=14, base_compaction_cnt=15,
req_cumulative_compaction_cnt=804, cumulative_compaction_cnt=805,
req_cp=7458, cp=7465,
req_start=7487, req_end=int_max.
3. calc_sync_version computes that the rowsets to be pulled are [0-7464]
and [7487-int_max], but it misses the rowset [7465-7486] produced by
cumulative compaction.
4. Moreover, since the max_version of the tablet on cluster 1 node B has
been updated, subsequent sync_rowset operations will also not pull the
rowset [7465-7486].
5. This causes duplicate keys problem on MOW table because new rowset
will generate delete bitmap marks on [7465-7486].
---
This PR forbids the above optimization when full compaction cnt is
changed.
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [x] Regression test
- [x] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
.../cloud/cloud_cumulative_compaction_policy.cpp | 11 ++
be/src/cloud/cloud_full_compaction.cpp | 11 ++
be/src/cloud/cloud_meta_mgr.cpp | 2 +
be/src/cloud/cloud_tablet.h | 3 +
cloud/src/meta-service/meta_service.cpp | 22 ++-
cloud/src/meta-service/meta_service_job.cpp | 1 +
cloud/test/meta_service_test.cpp | 55 +++++--
gensrc/proto/cloud.proto | 3 +-
.../cloud/test_cloud_calc_sync_version.out | Bin 0 -> 526 bytes
.../cloud/test_cloud_calc_sync_version.groovy | 179 +++++++++++++++++++++
10 files changed, 269 insertions(+), 18 deletions(-)
diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp
b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
index d666311d343..6955355778f 100644
--- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
@@ -220,6 +220,17 @@ int64_t
CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point(
int64_t last_cumulative_point) {
TEST_INJECTION_POINT_RETURN_WITH_VALUE("new_cumulative_point", int64_t(0),
output_rowset.get(),
last_cumulative_point);
+
DBUG_EXECUTE_IF("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point",
{
+ auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+ auto cumu_point = dp->param<int64_t>("cumu_point", -1);
+ if (target_tablet_id == tablet->tablet_id() && cumu_point != -1) {
+ LOG_INFO(
+
"[CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point] "
+ "tablet_id={}, cumu_point={}",
+ target_tablet_id, cumu_point);
+ return cumu_point;
+ }
+ });
// for MoW table, if there's too many versions, the delete bitmap will
grow to
// a very big size, which may cause the tablet meta too big and the
`save_meta`
// operation too slow.
diff --git a/be/src/cloud/cloud_full_compaction.cpp
b/be/src/cloud/cloud_full_compaction.cpp
index 9677c7e2446..1102d218001 100644
--- a/be/src/cloud/cloud_full_compaction.cpp
+++ b/be/src/cloud/cloud_full_compaction.cpp
@@ -148,6 +148,16 @@ Status CloudFullCompaction::pick_rowsets_to_compact() {
}
Status CloudFullCompaction::execute_compact() {
+ DBUG_EXECUTE_IF("CloudFullCompaction::execute_compact.block", {
+ auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+ LOG_INFO(
+ "[verbose] CloudFullCompaction::execute_compact.block,
target_tablet_id={}, "
+ "tablet_id={}",
+ target_tablet_id, cloud_tablet()->tablet_id());
+ if (target_tablet_id == cloud_tablet()->tablet_id()) {
+ DBUG_BLOCK;
+ }
+ });
TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudFullCompaction::execute_compact_impl",
Status::OK(),
this);
#ifndef __APPLE__
@@ -270,6 +280,7 @@ Status CloudFullCompaction::modify_rowsets() {
cloud_tablet()->delete_rowsets(_input_rowsets, wrlock);
cloud_tablet()->add_rowsets({_output_rowset}, false, wrlock);
cloud_tablet()->set_base_compaction_cnt(stats.base_compaction_cnt());
+ cloud_tablet()->set_full_compaction_cnt(stats.full_compaction_cnt());
cloud_tablet()->set_cumulative_layer_point(stats.cumulative_point());
if (stats.cumulative_compaction_cnt() >=
cloud_tablet()->cumulative_compaction_cnt()) {
cloud_tablet()->reset_approximate_stats(stats.num_rowsets(),
stats.num_segments(),
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index c93476d987c..ede5a74f8de 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -594,6 +594,7 @@ Status
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
}
req.set_base_compaction_cnt(tablet->base_compaction_cnt());
req.set_cumulative_compaction_cnt(tablet->cumulative_compaction_cnt());
+ req.set_full_compaction_cnt(tablet->full_compaction_cnt());
req.set_cumulative_point(tablet->cumulative_layer_point());
}
req.set_end_version(-1);
@@ -766,6 +767,7 @@ Status
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
tablet->last_cumu_compaction_success_time_ms =
stats.last_cumu_compaction_time_ms();
tablet->set_base_compaction_cnt(stats.base_compaction_cnt());
tablet->set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
+ tablet->set_full_compaction_cnt(stats.full_compaction_cnt());
tablet->set_cumulative_layer_point(stats.cumulative_point());
tablet->reset_approximate_stats(stats.num_rowsets(),
stats.num_segments(),
stats.num_rows(),
stats.data_size());
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index 5e704952632..8a4c1ae5ced 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -134,12 +134,14 @@ public:
int64_t max_version_unlocked() const override { return _max_version; }
int64_t base_compaction_cnt() const { return _base_compaction_cnt; }
int64_t cumulative_compaction_cnt() const { return
_cumulative_compaction_cnt; }
+ int64_t full_compaction_cnt() const { return _full_compaction_cnt; }
int64_t cumulative_layer_point() const {
return _cumulative_point.load(std::memory_order_relaxed);
}
void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; }
void set_cumulative_compaction_cnt(int64_t cnt) {
_cumulative_compaction_cnt = cnt; }
+ void set_full_compaction_cnt(int64_t cnt) { _full_compaction_cnt = cnt; }
void set_cumulative_layer_point(int64_t new_point);
int64_t last_cumu_compaction_failure_time() { return
_last_cumu_compaction_failure_millis; }
@@ -346,6 +348,7 @@ private:
int64_t _base_compaction_cnt = 0;
int64_t _cumulative_compaction_cnt = 0;
+ int64_t _full_compaction_cnt = 0;
int64_t _max_version = -1;
int64_t _base_size = 0;
int64_t _alter_version = -1;
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index a0a6362d3b1..a16cdc7def5 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -2814,6 +2814,7 @@ void internal_get_rowset(Transaction* txn, int64_t start,
int64_t end,
std::vector<std::pair<int64_t, int64_t>> calc_sync_versions(int64_t
req_bc_cnt, int64_t bc_cnt,
int64_t
req_cc_cnt, int64_t cc_cnt,
int64_t req_cp,
int64_t cp,
+ int64_t
req_fc_cnt, int64_t fc_cnt,
int64_t req_start,
int64_t req_end) {
using Version = std::pair<int64_t, int64_t>;
// combine `v1` `v2` to `v1`, return true if success
@@ -2839,8 +2840,8 @@ std::vector<std::pair<int64_t, int64_t>>
calc_sync_versions(int64_t req_bc_cnt,
if (req_cc_cnt < cc_cnt) {
Version cc_version;
- if (req_cp < cp && req_cc_cnt + 1 == cc_cnt) {
- // * only one CC happened and CP changed
+ if (req_cp < cp && req_cc_cnt + 1 == cc_cnt && req_fc_cnt == fc_cnt) {
+ // * only one CC happened and CP changed, and no full compaction
happened
// BE [=][=][=][=][=====][=][=]
// ^~~~~ req_cp
// MS [=][=][=][=][xxxxxxxxxxxxxx][=======][=][=]
@@ -2864,6 +2865,13 @@ std::vector<std::pair<int64_t, int64_t>>
calc_sync_versions(int64_t req_bc_cnt,
// ^_____________________^ related_versions:
[req_cp, max]
// there may be holes if
we don't return all version
// after ms_cp, however
it can be optimized.
+ // * one CC happened and CP changed, and full compaction happened
+ // BE [=][=][=][=][=][=][=][=][=][=]
+ // ^~~~~ req_cp
+ // MS [xxxxxxxxxxxxxx][xxxxxxxxxxxxxx][=======][=][=]
+ // ^~~~~~~ ms_cp
+ // ^___________________________^
related_versions: [req_cp, max]
+ //
cc_version = {req_cp, std::numeric_limits<int64_t>::max() - 1};
}
if (versions.empty() || !combine_if_overlapping(versions.front(),
cc_version)) {
@@ -2954,6 +2962,7 @@ void
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
}
int64_t req_bc_cnt = request->base_compaction_cnt();
int64_t req_cc_cnt = request->cumulative_compaction_cnt();
+ int64_t req_fc_cnt = request->has_full_compaction_cnt() ?
request->full_compaction_cnt() : 0;
int64_t req_cp = request->cumulative_point();
bool is_versioned_read = is_version_read_enabled(instance_id);
@@ -3057,6 +3066,8 @@ void
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
int64_t bc_cnt = tablet_stat.base_compaction_cnt();
int64_t cc_cnt = tablet_stat.cumulative_compaction_cnt();
+ int64_t fc_cnt =
+ tablet_stat.has_full_compaction_cnt() ?
tablet_stat.full_compaction_cnt() : 0;
int64_t cp = tablet_stat.cumulative_point();
response->mutable_stats()->CopyFrom(tablet_stat);
@@ -3068,17 +3079,18 @@ void
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
//==========================================================================
// Find version ranges to be synchronized due to compaction
//==========================================================================
- if (req_bc_cnt > bc_cnt || req_cc_cnt > cc_cnt || req_cp > cp) {
+ if (req_bc_cnt > bc_cnt || req_cc_cnt > cc_cnt || req_cp > cp ||
req_fc_cnt > fc_cnt) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "no valid compaction_cnt or cumulative_point given.
req_bc_cnt=" << req_bc_cnt
<< ", bc_cnt=" << bc_cnt << ", req_cc_cnt=" << req_cc_cnt << ",
cc_cnt=" << cc_cnt
- << ", req_cp=" << req_cp << ", cp=" << cp << " tablet_id=" <<
tablet_id;
+ << " req_fc_cnt=" << req_fc_cnt << ", fc_cnt=" << fc_cnt << ",
req_cp=" << req_cp
+ << ", cp=" << cp << " tablet_id=" << tablet_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
- req_start, req_end);
+ req_fc_cnt, fc_cnt, req_start,
req_end);
if (!is_versioned_read) {
for (auto [start, end] : versions) {
internal_get_rowset(txn.get(), start, end, instance_id,
tablet_id, code, msg,
diff --git a/cloud/src/meta-service/meta_service_job.cpp
b/cloud/src/meta-service/meta_service_job.cpp
index 654a032b6a1..21fe8b9663b 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -764,6 +764,7 @@ int compaction_update_tablet_stats(const
TabletCompactionJobPB& compaction, Tabl
} else if (compaction.type() == TabletCompactionJobPB::FULL) {
// clang-format off
stats->set_base_compaction_cnt(stats->base_compaction_cnt() + 1);
+ stats->set_full_compaction_cnt(stats->has_full_compaction_cnt() ?
stats->full_compaction_cnt() + 1 : 1);
if (compaction.output_cumulative_point() > stats->cumulative_point()) {
// After supporting parallel cumu compaction, compaction with
older cumu point may be committed after
// new cumu point has been set, MUST NOT set cumu point back to
old value
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 7936fabe146..f4e57c208ee 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -3812,7 +3812,7 @@ TEST(MetaServiceTest, FilterCopyFilesTest) {
extern std::vector<std::pair<int64_t, int64_t>> calc_sync_versions(
int64_t req_bc_cnt, int64_t bc_cnt, int64_t req_cc_cnt, int64_t
cc_cnt, int64_t req_cp,
- int64_t cp, int64_t req_start, int64_t req_end);
+ int64_t cp, int64_t req_fc_cnt, int64_t fc_cnt, int64_t req_start,
int64_t req_end);
TEST(MetaServiceTest, CalcSyncVersionsTest) {
using Versions = std::vector<std::pair<int64_t, int64_t>>;
@@ -3828,7 +3828,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 1};
auto [req_cp, cp] = std::tuple {5, 5};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{8, 12}}));
}
@@ -3844,7 +3844,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
auto [req_cp, cp] = std::tuple {5, 10};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{5, 12}})); // [5, 9] v [8, 12]
}
@@ -3853,7 +3853,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
auto [req_cp, cp] = std::tuple {5, 15};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{5, 14}})); // [5, 14] v [8, 12]
}
@@ -3870,7 +3870,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
auto [req_cp, cp] = std::tuple {5, 5};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{5, INT64_MAX - 1}})); // [5, max] v
[8, 12]
}
@@ -3886,7 +3886,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 3};
auto [req_cp, cp] = std::tuple {5, 5};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{5, INT64_MAX - 1}})); // [5, max] v
[8, 12]
}
@@ -3901,7 +3901,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 3};
auto [req_cp, cp] = std::tuple {5, 15};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{5, INT64_MAX - 1}})); // [5, max] v
[8, 12]
}
@@ -3917,7 +3917,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 1};
auto [req_cp, cp] = std::tuple {5, 5};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{0, 4}, {8, 12}}));
}
@@ -3926,7 +3926,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 1};
auto [req_cp, cp] = std::tuple {8, 8};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{0, 12}})); // [0, 7] v [8, 12]
}
@@ -3935,7 +3935,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
auto [req_cp, cp] = std::tuple {5, 10};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{0, 12}})); // [0, 4] v [5, 9] v [8, 12]
}
@@ -3944,7 +3944,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
auto [req_cp, cp] = std::tuple {5, 15};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{0, 14}})); // [0, 4] v [5, 14] v [8,
12]
}
@@ -3953,11 +3953,42 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
auto [req_cp, cp] = std::tuple {5, 5};
auto [req_start, req_end] = std::tuple {8, 12};
- auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp,
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
// [0, 4] v [5, max] v [8, 12]
ASSERT_EQ(versions, (Versions {{0, INT64_MAX - 1}}));
}
+
+ {
+ // when there exists full compaction, we can't optimize by "* only one
CC happened and CP changed"
+
+ // * one CC happened and CP changed, and full compaction happened
+ // BE [=][=][=][=][=][=][=][=][=][=]
+ // ^~~~~ req_cp
+ // MS [xxxxxxxxxxxxxx][xxxxxxxxxxxxxx][=======][=][=]
+ // ^~~~~~~ ms_cp
+ // ^___________________________^ related_versions:
[req_cp, max]
+ //
+ auto [req_bc_cnt, bc_cnt] = std::tuple {0, 1};
+ auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
+ auto [req_cp, cp] = std::tuple {4, 7};
+ auto [req_start, req_end] = std::tuple {9, 12};
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 1,
+ req_start, req_end);
+ ASSERT_EQ(versions, (Versions {{0, INT64_MAX - 1}}));
+ }
+
+ {
+ // abnormal case:
+ auto [req_bc_cnt, bc_cnt] = std::tuple {0, 1};
+ auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
+ auto [req_cp, cp] = std::tuple {4, 7};
+ auto [req_start, req_end] = std::tuple {9, 12};
+ auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt,
cc_cnt, req_cp, cp, 0, 0,
+ req_start, req_end);
+ // when not considering full compaction, the returned versions is
wrong becasue rowsets in [7-8] are missed
+ ASSERT_EQ(versions, (Versions {{0, 6}, {9, 12}}));
+ }
}
TEST(MetaServiceTest, StageTest) {
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 0bde2977a7b..e277c24f8d7 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -816,7 +816,7 @@ message TabletStatsPB {
optional int64 cumulative_point = 9;
optional int64 last_base_compaction_time_ms = 10;
optional int64 last_cumu_compaction_time_ms = 11;
- optional int64 full_compaction_cnt = 12;
+ optional int64 full_compaction_cnt = 12; // used by calc_sync_versions()
only
optional int64 last_full_compaction_time_ms = 13;
optional int64 index_size = 14;
optional int64 segment_size = 15;
@@ -1243,6 +1243,7 @@ message GetRowsetRequest {
// for compability reason we use FILL_WITH_DICT as default
optional SchemaOp schema_op = 8 [default = FILL_WITH_DICT];
optional string request_ip = 9;
+ optional int64 full_compaction_cnt = 10;
}
message GetRowsetResponse {
diff --git
a/regression-test/data/fault_injection_p0/cloud/test_cloud_calc_sync_version.out
b/regression-test/data/fault_injection_p0/cloud/test_cloud_calc_sync_version.out
new file mode 100644
index 00000000000..ed930452a5d
Binary files /dev/null and
b/regression-test/data/fault_injection_p0/cloud/test_cloud_calc_sync_version.out
differ
diff --git
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_calc_sync_version.groovy
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_calc_sync_version.groovy
new file mode 100644
index 00000000000..12bdd2dd007
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_calc_sync_version.groovy
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_cloud_calc_sync_version","docker") {
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ ]
+ options.enableDebugPoints()
+ options.cloudMode = true
+
+ docker(options) {
+ def write_cluster = "write_cluster"
+ def read_cluster = "read_cluster"
+
+ // Add two clusters
+ cluster.addBackend(1, write_cluster)
+ cluster.addBackend(1, read_cluster)
+
+ sql "use @${write_cluster}"
+ logger.info("==== switch to write cluster")
+ def tableName = "test_cloud_calc_sync_version"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k` int ,
+ `v` int ,
+ ) engine=olap
+ UNIQUE KEY(k)
+ DISTRIBUTED BY HASH(`k`) BUCKETS 1
+ properties(
+ "replication_num" = "1",
+ "disable_auto_compaction" = "true")
+ """
+
+ sql """ INSERT INTO ${tableName} VALUES (1,10)"""
+ sql """ INSERT INTO ${tableName} VALUES (2,20)"""
+ sql """ INSERT INTO ${tableName} VALUES (3,30)"""
+ qt_sql "select * from ${tableName} order by k;"
+
+ def check_rs_metas = { tbl, check_func ->
+ def compactionUrl = sql_return_maparray("show tablets from
${tbl};").get(0).CompactionStatus
+ def (code, out, err) = curl("GET", compactionUrl)
+ assert code == 0
+ def jsonMeta = parseJson(out.trim())
+ logger.info("==== rowsets: ${jsonMeta.rowsets}, cumu point:
${jsonMeta["cumulative point"]}")
+ check_func(jsonMeta.rowsets, jsonMeta["cumulative point"])
+ }
+
+ def tabletStats = sql_return_maparray("show tablets from
${tableName};")
+ def tabletId = tabletStats[0].TabletId
+ def tabletBackendId = tabletStats[0].BackendId
+ def tabletBackend
+ def backends = sql_return_maparray('show backends')
+ for (def be : backends) {
+ if (be.BackendId == tabletBackendId) {
+ tabletBackend = be
+ break;
+ }
+ }
+ logger.info("==== tablet ${tabletId} on backend ${tabletBackend.Host}
with backendId=${tabletBackend.BackendId}");
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ GetDebugPoint().clearDebugPointsForAllFEs()
+
+ def do_cumu_compaction = { def tbl, def tablet_id, int start, int end,
int cp ->
+
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point",
[tablet_id: "${tablet_id}", cumu_point: "${cp}"])
+
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
[tablet_id: "${tablet_id}", start_version: "${start}", end_version: "${end}"])
+
+ trigger_and_wait_compaction(tbl, "cumulative")
+
+
GetDebugPoint().disableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point")
+
GetDebugPoint().disableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets")
+ }
+
+ try {
+ // [2-2],[3-3],[4-4] -> [2,4]
+ do_cumu_compaction(tableName, tabletId, 2, 4, 5)
+ qt_sql "select * from ${tableName} order by k;"
+ check_rs_metas(tableName, {def rowsets, def cumu_point ->
+ assert rowsets.size() == 2
+ assert cumu_point as int == 5
+ assert rowsets[1].contains("[2-4]")
+ })
+
+ sql """ INSERT INTO ${tableName} VALUES (4,40)""" // ver=5
+ sql """ INSERT INTO ${tableName} VALUES (5,50)""" // ver=6
+ sql "sync;"
+
+
GetDebugPoint().enableDebugPointForAllBEs("CloudFullCompaction::execute_compact.block",
[tablet_id: "${tabletId}"])
+ def t1 = thread("full compaction") {
+ // [2,4],[5-5],[6-6] -> [2,6]
+ sql "use @${write_cluster}"
+ trigger_and_wait_compaction(tableName, "full")
+ }
+
+ sleep(1500)
+ sql """ INSERT INTO ${tableName} VALUES (1,60)""" // ver=7
+ sql """ INSERT INTO ${tableName} VALUES (2,70)""" // ver=8
+ sql "sync;"
+ qt_write_cluster_new_write "select * from ${tableName} order by k;"
+
+
+ // read cluster sync rowsets [2-4],[5-5],[6-6],[7-7],[8-8],
bc_cnt=0, cc_cnt=1, cp=4
+ sql "use @${read_cluster}"
+ logger.info("==== switch to read cluster")
+ qt_read_cluster_query "select * from ${tableName} order by k;"
+ check_rs_metas(tableName, {def rowsets, def cumu_point ->
+ assert rowsets.size() == 6
+ assert cumu_point as int == 5
+ assert rowsets[1].contains("[2-4]")
+ assert rowsets[2].contains("[5-5]")
+ assert rowsets[3].contains("[6-6]")
+ assert rowsets[4].contains("[7-7]")
+ assert rowsets[5].contains("[8-8]")
+ })
+
+
+ sql "use @${write_cluster}"
+ logger.info("==== switch to write cluster")
+
GetDebugPoint().disableDebugPointForAllBEs("CloudFullCompaction::execute_compact.block")
+ t1.get()
+ qt_write_cluster_full_compaction "select * from ${tableName} order
by k;"
+ check_rs_metas(tableName, {def rowsets, def cumu_point ->
+ assert rowsets.size() == 4
+ assert cumu_point as int == 7 // updated by full compaction
+ assert rowsets[1].contains("[2-6]")
+ assert rowsets[2].contains("[7-7]")
+ assert rowsets[3].contains("[8-8]")
+ })
+
+
+ do_cumu_compaction(tableName, tabletId, 7, 8, 7)
+ qt_write_cluster_cumu_compaction "select * from ${tableName} order
by k;"
+ check_rs_metas(tableName, {def rowsets, def cumu_point ->
+ assert rowsets.size() == 3
+ assert cumu_point as int == 7
+ assert rowsets[1].contains("[2-6]")
+ assert rowsets[2].contains("[7-8]")
+ })
+
+ sql """ INSERT INTO ${tableName} VALUES (1,80)""" // ver=9
+ sql "sync;"
+ qt_write_cluster_new_write "select * from ${tableName} order by k;"
+
+
+ // read cluster will read dup keys of ver=9 to ver=7 because it
will not sync rowset [7-8]
+ sql "use @${read_cluster}"
+ logger.info("==== switch to read cluster")
+ sql "set disable_nereids_rules=ELIMINATE_GROUP_BY;"
+ qt_read_cluster_check_dup_key "select k,count() from ${tableName}
group by k having count()>1;"
+ qt_read_cluster_res "select * from ${tableName} order by k;"
+
+ } catch (Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]