This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 32560ae3b35 branch-4.1: [fix](cloud) Normalize SC rowset graph before
delete bitmap capture #63960 (#63987)
32560ae3b35 is described below
commit 32560ae3b35d1592d816c6e646bb27d6a8e1a6f1
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 3 11:30:36 2026 +0800
branch-4.1: [fix](cloud) Normalize SC rowset graph before delete bitmap
capture #63960 (#63987)
Cherry-picked from #63960
Co-authored-by: bobhan1 <[email protected]>
---
be/src/cloud/cloud_schema_change_job.cpp | 43 +++++++++---------------------
be/src/cloud/cloud_tablet.cpp | 45 +++++++++++++++++++++++++++++---
be/src/cloud/cloud_tablet.h | 14 +++++++++-
be/test/cloud/cloud_tablet_test.cpp | 44 +++++++++++++++++++++++++++++++
4 files changed, 111 insertions(+), 35 deletions(-)
diff --git a/be/src/cloud/cloud_schema_change_job.cpp
b/be/src/cloud/cloud_schema_change_job.cpp
index 3536d831db7..1107b7717db 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -541,29 +541,8 @@ Status
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
// during double write phase by `CloudMetaMgr::sync_tablet_rowsets` in
another thread
std::unique_lock lock {_new_tablet->get_sync_meta_lock()};
std::unique_lock wlock(_new_tablet->get_header_lock());
- // Mirror MS behavior: delete rowsets in [2, alter_version] before
adding
- // SC output rowsets to avoid stale compaction rowsets remaining
visible.
- {
- int64_t alter_ver = sc_job->alter_version();
- std::vector<RowsetSharedPtr> to_delete;
- for (auto& [v, rs] : _new_tablet->rowset_map()) {
- if (v.first >= 2 && v.second <= alter_ver) {
- to_delete.push_back(rs);
- }
- }
- if (!to_delete.empty()) {
- LOG_INFO(
- "schema change: delete {} local rowsets in [2, {}]
before adding SC "
- "output, tablet_id={}, versions=[{}]",
- to_delete.size(), alter_ver, _new_tablet->tablet_id(),
- fmt::join(to_delete | std::views::transform([](const
auto& rs) {
- return rs->version().to_string();
- }),
- ", "));
- _new_tablet->delete_rowsets_for_schema_change(to_delete,
wlock);
- }
- }
- _new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock,
false);
+ _new_tablet->replace_rowsets_with_schema_change_output(
+ _output_rowsets, sc_job->alter_version(), wlock, "commit",
true);
// Ensure the real new tablet has a continuous local version graph
before it becomes
// visible. Later RUNNING-tablet delete bitmap sync depends on
capturing all old versions.
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().fill_version_holes(
@@ -610,6 +589,11 @@ Status
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
// step 1, process incremental rowset without delete bitmap update lock
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
+ {
+ std::unique_lock wlock(tmp_tablet->get_header_lock());
+ tmp_tablet->replace_rowsets_with_schema_change_output(_output_rowsets,
alter_version, wlock,
+
"delete_bitmap_without_lock", false);
+ }
int64_t max_version = tmp_tablet->max_version().second;
LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
<< "incremental rowsets without lock, version: " <<
start_calc_delete_bitmap_version
@@ -619,10 +603,6 @@ Status
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
{start_calc_delete_bitmap_version, max_version},
CaptureRowsetOps {}));
DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock",
DBUG_BLOCK);
- {
- std::unique_lock wlock(tmp_tablet->get_header_lock());
- tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
- }
for (auto rowset : ret.rowsets) {
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet,
rowset));
}
@@ -635,6 +615,11 @@ Status
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().get_delete_bitmap_update_lock(
*_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator));
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
+ {
+ std::unique_lock wlock(tmp_tablet->get_header_lock());
+ tmp_tablet->replace_rowsets_with_schema_change_output(_output_rowsets,
alter_version, wlock,
+
"delete_bitmap_with_lock", false);
+ }
int64_t new_max_version = tmp_tablet->max_version().second;
LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
<< "incremental rowsets with lock, version: " << max_version + 1
<< "-"
@@ -642,10 +627,6 @@ Status
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
if (new_max_version > max_version) {
auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked(
{max_version + 1, new_max_version}, CaptureRowsetOps {}));
- {
- std::unique_lock wlock(tmp_tablet->get_header_lock());
- tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
- }
for (auto rowset : ret.rowsets) {
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet,
rowset));
}
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 9ad09b317f1..ee9b4b20e0b 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -94,6 +94,17 @@ bvar::Window<bvar::Adder<int64_t>>
g_capture_with_freshness_tolerance_fallback_c
static constexpr int LOAD_INITIATOR_ID = -1;
+namespace {
+
+bool is_schema_change_output_rowset(const RowsetSharedPtr& rowset,
+ const std::vector<RowsetSharedPtr>&
output_rowsets) {
+ return std::ranges::any_of(output_rowsets, [&rowset](const
RowsetSharedPtr& output_rowset) {
+ return output_rowset->rowset_id() == rowset->rowset_id();
+ });
+}
+
+} // namespace
+
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_size(
"file_cache_cloud_tablet_submitted_segment_size");
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_num(
@@ -492,7 +503,8 @@ void CloudTablet::delete_rowsets(const
std::vector<RowsetSharedPtr>& to_delete,
}
void CloudTablet::delete_rowsets_for_schema_change(const
std::vector<RowsetSharedPtr>& to_delete,
-
std::unique_lock<std::shared_mutex>&) {
+
std::unique_lock<std::shared_mutex>&,
+ bool
recycle_deleted_rowsets) {
if (to_delete.empty()) {
return;
}
@@ -515,8 +527,35 @@ void CloudTablet::delete_rowsets_for_schema_change(const
std::vector<RowsetShare
// the other hits a DCHECK(false) in delete_expired_stale_rowsets().
_tablet_meta->modify_rs_metas({}, rs_metas, true);
- // Schedule for direct cache cleanup. MS has already recycled these
rowsets.
- add_unused_rowsets(to_delete);
+ if (recycle_deleted_rowsets) {
+ // Schedule for direct cache cleanup. MS has already recycled these
rowsets.
+ add_unused_rowsets(to_delete);
+ }
+}
+
+void CloudTablet::replace_rowsets_with_schema_change_output(
+ const std::vector<RowsetSharedPtr>& output_rowsets, int64_t
alter_version,
+ std::unique_lock<std::shared_mutex>& meta_lock, const char* stage,
+ bool recycle_deleted_rowsets) {
+ std::vector<RowsetSharedPtr> to_delete;
+ for (auto& [v, rs] : _rs_version_map) {
+ if (v.first >= 2 && v.second <= alter_version &&
+ !is_schema_change_output_rowset(rs, output_rowsets)) {
+ to_delete.push_back(rs);
+ }
+ }
+ if (!to_delete.empty()) {
+ LOG_INFO(
+ "schema change: delete {} local rowsets in [2, {}] before
adding SC output, "
+ "tablet_id={}, stage={}, versions=[{}]",
+ to_delete.size(), alter_version, tablet_id(), stage,
+ fmt::join(to_delete | std::views::transform([](const auto& rs)
{
+ return rs->version().to_string();
+ }),
+ ", "));
+ delete_rowsets_for_schema_change(to_delete, meta_lock,
recycle_deleted_rowsets);
+ }
+ add_rowsets(output_rowsets, true, meta_lock, false);
}
uint64_t CloudTablet::delete_expired_stale_rowsets() {
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index c31ff8078e7..bb2645ca552 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -164,7 +164,19 @@ public:
// preferring stale compaction rowsets over individual SC output rowsets.
// MUST hold EXCLUSIVE `_meta_lock`.
void delete_rowsets_for_schema_change(const std::vector<RowsetSharedPtr>&
to_delete,
- std::unique_lock<std::shared_mutex>&
meta_lock);
+ std::unique_lock<std::shared_mutex>&
meta_lock,
+ bool recycle_deleted_rowsets = true);
+
+ // Replace local rowsets in [2, alter_version] with schema change output
rowsets.
+ // Existing SC output rowsets are kept; other
local/double-write/compaction rowsets
+ // in this version range are removed from both _rs_version_map and version
graph.
+ // recycle_deleted_rowsets should only be true for the real tablet;
temporary
+ // schema-change delete-bitmap tablets only need to normalize their local
graph.
+ // MUST hold EXCLUSIVE `_meta_lock`.
+ void replace_rowsets_with_schema_change_output(
+ const std::vector<RowsetSharedPtr>& output_rowsets, int64_t
alter_version,
+ std::unique_lock<std::shared_mutex>& meta_lock, const char* stage,
+ bool recycle_deleted_rowsets);
// When the tablet is dropped, we need to recycle cached data:
// 1. The data in file cache
diff --git a/be/test/cloud/cloud_tablet_test.cpp
b/be/test/cloud/cloud_tablet_test.cpp
index 3c66f227ab3..23045f20d5e 100644
--- a/be/test/cloud/cloud_tablet_test.cpp
+++ b/be/test/cloud/cloud_tablet_test.cpp
@@ -1469,6 +1469,50 @@ TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest,
TestSchemaChangeDeletesCompa
}
}
+TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest,
+ TestReplaceSchemaChangeOutputCleansPollutedTmpGraph) {
+ auto rs_placeholder = create_rowset(Version(0, 1));
+ auto rs_sc_2 = create_rowset(Version(2, 2));
+ auto rs_sc_3 = create_rowset(Version(3, 3));
+ auto rs_compacted = create_rowset(Version(2, 3));
+ auto rs_post_alter = create_rowset(Version(4, 4));
+ ASSERT_NE(rs_placeholder, nullptr);
+ ASSERT_NE(rs_sc_2, nullptr);
+ ASSERT_NE(rs_sc_3, nullptr);
+ ASSERT_NE(rs_compacted, nullptr);
+ ASSERT_NE(rs_post_alter, nullptr);
+
+ {
+ std::unique_lock wlock(_tablet->get_header_lock());
+ _tablet->add_rowsets({rs_placeholder, rs_sc_2, rs_sc_3, rs_compacted,
rs_post_alter}, false,
+ wlock, false);
+ }
+ ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 2)));
+ ASSERT_TRUE(_tablet->rowset_map().count(Version(3, 3)));
+ ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 3)));
+
+ {
+ std::unique_lock wlock(_tablet->get_header_lock());
+ _tablet->replace_rowsets_with_schema_change_output({rs_sc_2, rs_sc_3},
3, wlock, "test",
+ false);
+ }
+
+ ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 2)));
+ ASSERT_TRUE(_tablet->rowset_map().count(Version(3, 3)));
+ ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 3)));
+ ASSERT_TRUE(_tablet->rowset_map().count(Version(4, 4)));
+ ASSERT_FALSE(_tablet->need_remove_unused_rowsets());
+
+ auto versions_result =
_tablet->capture_consistent_versions_unlocked(Version(0, 4), {});
+ ASSERT_TRUE(versions_result.has_value()) << versions_result.error();
+ auto& versions = versions_result.value();
+ ASSERT_EQ(versions.size(), 4);
+ ASSERT_EQ(versions[0], Version(0, 1));
+ ASSERT_EQ(versions[1], Version(2, 2));
+ ASSERT_EQ(versions[2], Version(3, 3));
+ ASSERT_EQ(versions[3], Version(4, 4));
+}
+
// Test that delete_rowsets_for_schema_change with empty input is a no-op
TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestEmptyDeleteIsNoop) {
auto rs = create_rowset(Version(0, 1));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]