This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new b8f09203bd5 branch-4.0: [fix](cloud-compaction) prevent
EMPTY_CUMULATIVE / BASE-CUMU races on the same tablet #64619 (#64701)
b8f09203bd5 is described below
commit b8f09203bd507b81be21b05763e7706969a7b7ae
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 24 18:06:40 2026 +0800
branch-4.0: [fix](cloud-compaction) prevent EMPTY_CUMULATIVE / BASE-CUMU
races on the same tablet #64619 (#64701)
Cherry-picked from #64619
Co-authored-by: Lijia Liu <[email protected]>
Co-authored-by: liutang123 <[email protected]>
---
cloud/src/meta-service/meta_service_job.cpp | 90 ++++++++++--
cloud/test/meta_service_job_test.cpp | 215 ++++++++++++++++++++++++++++
2 files changed, 294 insertions(+), 11 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_job.cpp
b/cloud/src/meta-service/meta_service_job.cpp
index 2494e91c241..61c14b1767d 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -97,6 +97,55 @@ bool check_compaction_input_verions(const
TabletCompactionJobPB& compaction,
return false;
}
+// Normalize compaction type for conflict detection.
+// EMPTY_CUMULATIVE only updates cumulative_point and
cumulative_compaction_cnt without producing
+// any output rowset. It MUST be considered the same family as CUMULATIVE so
that an EMPTY_CUMULATIVE
+// cannot be accepted while a real CUMULATIVE is still running on the same
tablet (which would
+// otherwise advance cumulative_point past the input range of the in-flight
cumu and let base
+// compaction race with it).
+static inline TabletCompactionJobPB::CompactionType
normalize_compaction_type_for_conflict(
+ TabletCompactionJobPB::CompactionType t) {
+ return t == TabletCompactionJobPB::EMPTY_CUMULATIVE ?
TabletCompactionJobPB::CUMULATIVE : t;
+}
+
+// Two compaction jobs are considered to be in the same conflict family iff
their normalized
+// compaction types are equal, OR either side is FULL (full compaction
conflicts with anything),
+// OR they are different in (BASE, CUMULATIVE) - in this case the conflict
still depends on whether
+// their input version ranges overlap, callers should additionally consult
version range checks.
+static inline bool
is_same_conflict_family(TabletCompactionJobPB::CompactionType a,
+
TabletCompactionJobPB::CompactionType b) {
+ return normalize_compaction_type_for_conflict(a) ==
normalize_compaction_type_for_conflict(b);
+}
+
+// Whether a compaction type belongs to the "rowset compaction family", i.e.
any compaction
+// kind that operates on rowsets bounded by `cumulative_point`. Only members
of this family
+// are subject to the cross-type version-range conflict check below. Note that
STOP_TOKEN is
+// already filtered out by the caller (see the early-return on STOP_TOKEN in
start_compaction_job),
+// so it is intentionally excluded here.
+static inline bool
is_rowset_compaction_family(TabletCompactionJobPB::CompactionType t) {
+ return t == TabletCompactionJobPB::BASE || t ==
TabletCompactionJobPB::CUMULATIVE ||
+ t == TabletCompactionJobPB::EMPTY_CUMULATIVE || t ==
TabletCompactionJobPB::FULL;
+}
+
+// Whether two compaction jobs MAY conflict on the rowset range (regardless of
the actual
+// input version range). The caller still needs to compare `input_versions` to
make the
+// final decision.
+//
+// Conflict matrix (Plan D extends this from same-type-only to cross-type
within the family):
+// FULL vs anything-in-family : true (full compaction touches the whole
rowset range)
+// BASE vs BASE / CUMULATIVE : true (their rowset ranges may overlap
around cumu_point)
+// CUMU vs CUMU / BASE : true (symmetric of the above)
+// EMPTY_CU vs anything-in-family : true (it advances cumu_point and would
race with the others)
+//
+// Any compaction type outside this family (or that we don't yet model) is
conservatively NOT
+// considered conflicting here - if a new type is added later, the author MUST
revisit this
+// function and decide its conflict semantics explicitly, instead of silently
inheriting the
+// "everything conflicts" behaviour.
+static inline bool may_conflict_by_type(TabletCompactionJobPB::CompactionType
a,
+ TabletCompactionJobPB::CompactionType
b) {
+ return is_rowset_compaction_family(a) && is_rowset_compaction_family(b);
+}
+
void start_compaction_job(MetaServiceCode& code, std::string& msg,
std::stringstream& ss,
std::unique_ptr<Transaction>& txn, const
StartTabletJobRequest* request,
StartTabletJobResponse* response, std::string&
instance_id,
@@ -244,10 +293,16 @@ void start_compaction_job(MetaServiceCode& code,
std::string& msg, std::stringst
compaction.input_versions().empty()) ||
(compaction.has_check_input_versions_range() &&
!compaction.check_input_versions_range())) {
- // Unknown input version range, doesn't support parallel
compaction of same type
+ // Unknown input version range, doesn't support parallel
compaction of same family.
+ // EMPTY_CUMULATIVE is normalized to CUMULATIVE here so it
conflicts with an in-flight
+ // CUMULATIVE on the same tablet (otherwise EMPTY_CUMULATIVE could
advance
+ // cumulative_point past the in-flight cumu's input range and let
base compaction race
+ // with it).
for (auto& c : compactions) {
- if (c.type() != compaction.type() && c.type() !=
TabletCompactionJobPB::FULL)
+ if (!is_same_conflict_family(c.type(), compaction.type()) &&
+ c.type() != TabletCompactionJobPB::FULL) {
continue;
+ }
if (c.id() == compaction.id()) return; // Same job, return OK
to keep idempotency
msg = fmt::format("compaction has already started,
tablet_id={} job={}", tablet_id,
proto_to_json(c));
@@ -264,8 +319,14 @@ void start_compaction_job(MetaServiceCode& code,
std::string& msg, std::stringst
a.input_versions(1) < b.input_versions(0);
};
for (auto& c : compactions) {
- if (c.type() != compaction.type() && c.type() !=
TabletCompactionJobPB::FULL)
- continue;
+ // Plan D: BASE and CUMULATIVE on the same tablet may also
conflict when their
+ // input version ranges overlap. Previously we only checked
same-type conflicts,
+ // which left a window where BASE could be accepted with
versions that overlap an
+ // in-flight CUMULATIVE (and vice versa) after
cumulative_point was unsafely
+ // advanced. Now we treat any pair within (BASE, CUMULATIVE,
EMPTY_CUMULATIVE,
+ // FULL) as potentially conflicting and rely on the
input-version-range check
+ // below to make the final decision.
+ if (!may_conflict_by_type(c.type(), compaction.type()))
continue;
if (c.input_versions_size() > 0 && version_not_conflict(c,
compaction)) continue;
if (c.id() == compaction.id()) return; // Same job, return OK
to keep idempotency
msg = fmt::format("compaction has already started,
tablet_id={} job={}", tablet_id,
@@ -273,14 +334,21 @@ void start_compaction_job(MetaServiceCode& code,
std::string& msg, std::stringst
code = MetaServiceCode::JOB_TABLET_BUSY;
// Unknown version range of started compaction, BE should not
retry other version range
if (c.input_versions_size() == 0) return;
- // Notify version ranges in started compaction to BE, so BE
can retry other version range
+ // Notify version ranges of all in-flight compactions that may
conflict with the
+ // incoming one, so BE can retry on a non-overlapping range.
The notification
+ // predicate is intentionally kept consistent with the
conflict predicate above
+ // (`may_conflict_by_type`); previously only same-family
ranges were surfaced,
+ // which left BE blind to cross-family (BASE vs CUMULATIVE)
conflicts.
+ //
+ // An in-flight EMPTY_CUMULATIVE (or any other family member
without a concrete
+ // [v_lo, v_hi]) carries no usable range; surfacing fabricated
zeros would
+ // mislead BE retry. Skip such entries defensively here - the
real conflict is
+ // already enforced by the version-range check above.
for (auto& c : compactions) {
- if (c.type() == compaction.type() || c.type() ==
TabletCompactionJobPB::FULL) {
- // If there are multiple started compaction of same
type, they all must has input version range
- DCHECK_EQ(c.input_versions_size(), 2) <<
proto_to_json(c);
-
response->add_version_in_compaction(c.input_versions(0));
-
response->add_version_in_compaction(c.input_versions(1));
- }
+ if (!may_conflict_by_type(c.type(), compaction.type()))
continue;
+ if (c.input_versions_size() != 2) continue;
+ response->add_version_in_compaction(c.input_versions(0));
+ response->add_version_in_compaction(c.input_versions(1));
}
return;
}
diff --git a/cloud/test/meta_service_job_test.cpp
b/cloud/test/meta_service_job_test.cpp
index ddba9b4c750..a84d33d9208 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -4545,6 +4545,221 @@ TEST(MetaServiceJobTest, ParallelCumuCompactionTest) {
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
+// Plan A regression test: EMPTY_CUMULATIVE must be considered the same
conflict family as
+// CUMULATIVE so that an EMPTY_CUMULATIVE submitted while a real CUMULATIVE is
still active on the
+// same tablet is rejected with JOB_TABLET_BUSY. Otherwise EMPTY_CUMULATIVE
could advance
+// cumulative_point past the in-flight cumu's input range and let base
compaction race with it.
+TEST(MetaServiceJobTest, EmptyCumulativeBlockedByCumulativeTest) {
+ auto meta_service = get_meta_service();
+
+ auto sp = SyncPoint::get_instance();
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
+ sp->set_call_back("get_instance_id", [&](auto&& args) {
+ auto* ret = try_any_cast_ret<std::string>(args);
+ ret->first = instance_id;
+ ret->second = true;
+ });
+ sp->enable_processing();
+
+ constexpr int64_t table_id = 1;
+ constexpr int64_t index_id = 2;
+ constexpr int64_t partition_id = 3;
+ constexpr int64_t tablet_id = 4;
+ ASSERT_NO_FATAL_FAILURE(
+ create_tablet(meta_service.get(), table_id, index_id,
partition_id, tablet_id, false));
+
+ // Helper to start an EMPTY_CUMULATIVE job. EMPTY_CUMULATIVE has no
input_versions and no
+ // expiration (only cumulative_point/cumulative_compaction_cnt are
bumped), which lets it
+ // bypass `STALE_TABLET_CACHE` when both sides carry the same
cumulative_compaction_cnt.
+ auto start_empty_cumu = [&](const std::string& job_id, const std::string&
initiator,
+ int base_cnt, int cumu_cnt,
StartTabletJobResponse& res) {
+ brpc::Controller cntl;
+ StartTabletJobRequest req;
+ req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id);
+ auto* compaction = req.mutable_job()->add_compaction();
+ compaction->set_id(job_id);
+ compaction->set_initiator(initiator);
+ compaction->set_base_compaction_cnt(base_cnt);
+ compaction->set_cumulative_compaction_cnt(cumu_cnt);
+ compaction->set_type(TabletCompactionJobPB::EMPTY_CUMULATIVE);
+ long now = ::time(nullptr);
+ compaction->set_lease(now + 3);
+ meta_service->start_tablet_job(&cntl, &req, &res, nullptr);
+ };
+
+ // Step 1: An in-flight CUMULATIVE job [42326-42474] is registered first
(mimics the
+ // scenario from the production log).
+ StartTabletJobResponse res;
+ start_compaction_job(meta_service.get(), tablet_id, "cumu1", "BE1", 0, 0,
+ TabletCompactionJobPB::CUMULATIVE, res, {42326,
42474});
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+ // Step 2: An EMPTY_CUMULATIVE arrives carrying the same
cumulative_compaction_cnt as
+ // cumu1. Before the fix this was wrongly accepted because MS only
compared raw enum types.
+ // After the fix, EMPTY_CUMULATIVE must be normalized to CUMULATIVE for
conflict detection
+ // and rejected as JOB_TABLET_BUSY.
+ res.Clear();
+ start_empty_cumu("empty1", "BE1", 0, 0, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) <<
res.status().msg();
+ // EMPTY_CUMULATIVE has no input_versions, so BE must NOT receive any
version range hint
+ // (the BE retry on `version_in_compaction` is meaningless for
EMPTY_CUMULATIVE).
+ EXPECT_EQ(res.version_in_compaction_size(), 0);
+
+ // Step 3: Idempotency check - the same job_id submitted twice should
still return OK.
+ res.Clear();
+ start_compaction_job(meta_service.get(), tablet_id, "cumu1", "BE1", 0, 0,
+ TabletCompactionJobPB::CUMULATIVE, res, {42326,
42474});
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+ // Step 4: A BASE compaction arrives via the same code path used by
EMPTY_CUMULATIVE -
+ // i.e. without `input_versions`. Because is_same_conflict_family(BASE,
CUMULATIVE) is
+ // false, BASE should still be accepted on this branch (the cross-family
conflict is
+ // enforced only on the version-range branch validated by Plan D test
below).
+ res.Clear();
+ start_compaction_job(meta_service.get(), tablet_id, "base1", "BE1", 0, 0,
+ TabletCompactionJobPB::BASE, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+ // Step 5: A second EMPTY_CUMULATIVE should also be rejected by the
now-active CUMULATIVE.
+ // (Even though job_pb already contains an EMPTY_CUMULATIVE-equivalent,
the same-family
+ // check primarily catches the CUMULATIVE side here.)
+ res.Clear();
+ start_empty_cumu("empty2", "BE2", 0, 0, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) <<
res.status().msg();
+}
+
+// Plan D regression test: when a CUMULATIVE compaction is already running
with `input_versions`
+// and `check_input_versions_range = true`, a BASE compaction whose version
range overlaps with
+// the in-flight CUMULATIVE must be rejected with JOB_TABLET_BUSY.
Non-overlapping BASE jobs are
+// still allowed, which is the typical safe case (BASE handles [0, cumu_point
- 1] while
+// CUMULATIVE handles versions above cumu_point).
+TEST(MetaServiceJobTest, BaseCumulativeCrossTypeConflictTest) {
+ auto meta_service = get_meta_service();
+
+ auto sp = SyncPoint::get_instance();
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
+ sp->set_call_back("get_instance_id", [&](auto&& args) {
+ auto* ret = try_any_cast_ret<std::string>(args);
+ ret->first = instance_id;
+ ret->second = true;
+ });
+ sp->enable_processing();
+
+ constexpr int64_t table_id = 1;
+ constexpr int64_t index_id = 2;
+ constexpr int64_t partition_id = 3;
+ constexpr int64_t tablet_id = 4;
+ ASSERT_NO_FATAL_FAILURE(
+ create_tablet(meta_service.get(), table_id, index_id,
partition_id, tablet_id, false));
+
+ // Local helper: start a BASE compaction request that carries
`input_versions` (matching
+ // production BE behaviour: cloud_base_compaction.cpp always calls
add_input_versions).
+ // Note: BASE does NOT call set_check_input_versions_range, so it's left
as default false
+ // BUT input_versions is non-empty - this routes the request into the "has
input_versions"
+ // branch on MS, which is the branch Plan D guards.
+ auto start_base = [&](const std::string& job_id, const std::string&
initiator, int base_cnt,
+ int cumu_cnt, std::pair<int64_t, int64_t> versions,
+ StartTabletJobResponse& res) {
+ brpc::Controller cntl;
+ StartTabletJobRequest req;
+ req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id);
+ auto* compaction = req.mutable_job()->add_compaction();
+ compaction->set_id(job_id);
+ compaction->set_initiator(initiator);
+ compaction->set_base_compaction_cnt(base_cnt);
+ compaction->set_cumulative_compaction_cnt(cumu_cnt);
+ compaction->set_type(TabletCompactionJobPB::BASE);
+ long now = ::time(nullptr);
+ compaction->set_expiration(now + 12);
+ compaction->set_lease(now + 3);
+ compaction->add_input_versions(versions.first);
+ compaction->add_input_versions(versions.second);
+ // Intentionally NOT calling set_check_input_versions_range - BASE
relies on the
+ // default false to mimic real BE behaviour.
+ meta_service->start_tablet_job(&cntl, &req, &res, nullptr);
+ };
+
+ // Step 1: A CUMULATIVE compaction with versions [10, 20] is started with
parallel-cumu
+ // mode enabled (check_input_versions_range = true). This routes into the
+ // version-range-aware branch on MS.
+ StartTabletJobResponse res;
+ start_compaction_job(meta_service.get(), tablet_id, "cumu1", "BE1", 0, 0,
+ TabletCompactionJobPB::CUMULATIVE, res, {10, 20});
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+ // Step 2: BASE [5, 15] overlaps with CUMULATIVE [10, 20]. Plan D requires
this to be
+ // rejected. Before the fix it would succeed (because the old `c.type() !=
compaction.type()`
+ // check skipped the active CUMULATIVE for a BASE submission).
+ res.Clear();
+ start_base("base_overlap_left", "BE1", 0, 0, {5, 15}, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) <<
res.status().msg();
+
+ // Step 3: BASE [15, 25] also overlaps. Should be rejected.
+ res.Clear();
+ start_base("base_overlap_right", "BE1", 0, 0, {15, 25}, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) <<
res.status().msg();
+
+ // Step 4: BASE [12, 18] is fully contained inside CUMULATIVE's range.
Should be rejected.
+ res.Clear();
+ start_base("base_overlap_inside", "BE1", 0, 0, {12, 18}, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) <<
res.status().msg();
+
+ // Step 5: BASE [5, 25] fully covers the CUMULATIVE range. Should be
rejected.
+ res.Clear();
+ start_base("base_overlap_cover", "BE1", 0, 0, {5, 25}, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) <<
res.status().msg();
+
+ // Step 6: BASE [0, 9] is BELOW the CUMULATIVE range. This is the typical
safe case
+ // (base handles [0, cumu_point - 1]) and must still be accepted after
Plan D.
+ res.Clear();
+ start_base("base_safe_below", "BE1", 0, 0, {0, 9}, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();
+
+ // Step 7: A second BASE [21, 30] is ABOVE the CUMULATIVE range AND
non-overlapping with the
+ // already-accepted base_safe_below [0, 9]. This is also a safe
non-overlap case - although
+ // unusual in production (BASE rarely operates above cumu_point), MS
should accept it.
+ res.Clear();
+ start_base("base_safe_above", "BE2", 0, 0, {21, 30}, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();
+
+ // Step 8: A new CUMULATIVE [22, 28] overlaps with the just-accepted
base_safe_above and
+ // must be rejected. Verifies the conflict is symmetric - CUMULATIVE
submissions also
+ // see BASE jobs as conflicting.
+ res.Clear();
+ start_compaction_job(meta_service.get(), tablet_id, "cumu_overlap_base",
"BE1", 0, 0,
+ TabletCompactionJobPB::CUMULATIVE, res, {22, 28});
+ ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) <<
res.status().msg();
+ // The version_in_compaction notification predicate is kept consistent
with the conflict
+ // predicate (`may_conflict_by_type`): every in-flight job in the rowset
compaction family
+ // (BASE / CUMULATIVE) is surfaced so BE can pick a non-overlapping range
to retry.
+ // Active jobs at this point: cumu1[10,20], base_safe_below[0,9],
base_safe_above[21,30].
+ // All three carry concrete input_versions so all three must be reported.
+ ASSERT_EQ(res.version_in_compaction_size(), 6);
+ EXPECT_EQ(res.version_in_compaction(0), 10);
+ EXPECT_EQ(res.version_in_compaction(1), 20);
+ EXPECT_EQ(res.version_in_compaction(2), 0);
+ EXPECT_EQ(res.version_in_compaction(3), 9);
+ EXPECT_EQ(res.version_in_compaction(4), 21);
+ EXPECT_EQ(res.version_in_compaction(5), 30);
+
+ // Step 9: A new CUMULATIVE [30, 35] does not overlap with cumu1 [10, 20]
but DOES overlap
+ // with base_safe_above [21, 30] (sharing version 30). Must be rejected.
+ res.Clear();
+ start_compaction_job(meta_service.get(), tablet_id, "cumu_overlap_base2",
"BE1", 0, 0,
+ TabletCompactionJobPB::CUMULATIVE, res, {30, 35});
+ ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) <<
res.status().msg();
+
+ // Step 10: A new CUMULATIVE [31, 40] is fully above all active jobs and
must be accepted.
+ res.Clear();
+ start_compaction_job(meta_service.get(), tablet_id, "cumu_safe_above",
"BE1", 0, 0,
+ TabletCompactionJobPB::CUMULATIVE, res, {31, 40});
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();
+}
+
TEST(MetaServiceJobTest, SchemaChangeJobPersistTest) {
auto meta_service = get_meta_service();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]