This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 32560ae3b35 branch-4.1: [fix](cloud) Normalize SC rowset graph before 
delete bitmap capture #63960 (#63987)
32560ae3b35 is described below

commit 32560ae3b35d1592d816c6e646bb27d6a8e1a6f1
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 3 11:30:36 2026 +0800

    branch-4.1: [fix](cloud) Normalize SC rowset graph before delete bitmap 
capture #63960 (#63987)
    
    Cherry-picked from #63960
    
    Co-authored-by: bobhan1 <[email protected]>
---
 be/src/cloud/cloud_schema_change_job.cpp | 43 +++++++++---------------------
 be/src/cloud/cloud_tablet.cpp            | 45 +++++++++++++++++++++++++++++---
 be/src/cloud/cloud_tablet.h              | 14 +++++++++-
 be/test/cloud/cloud_tablet_test.cpp      | 44 +++++++++++++++++++++++++++++++
 4 files changed, 111 insertions(+), 35 deletions(-)

diff --git a/be/src/cloud/cloud_schema_change_job.cpp 
b/be/src/cloud/cloud_schema_change_job.cpp
index 3536d831db7..1107b7717db 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -541,29 +541,8 @@ Status 
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
         // during double write phase by `CloudMetaMgr::sync_tablet_rowsets` in 
another thread
         std::unique_lock lock {_new_tablet->get_sync_meta_lock()};
         std::unique_lock wlock(_new_tablet->get_header_lock());
-        // Mirror MS behavior: delete rowsets in [2, alter_version] before 
adding
-        // SC output rowsets to avoid stale compaction rowsets remaining 
visible.
-        {
-            int64_t alter_ver = sc_job->alter_version();
-            std::vector<RowsetSharedPtr> to_delete;
-            for (auto& [v, rs] : _new_tablet->rowset_map()) {
-                if (v.first >= 2 && v.second <= alter_ver) {
-                    to_delete.push_back(rs);
-                }
-            }
-            if (!to_delete.empty()) {
-                LOG_INFO(
-                        "schema change: delete {} local rowsets in [2, {}] 
before adding SC "
-                        "output, tablet_id={}, versions=[{}]",
-                        to_delete.size(), alter_ver, _new_tablet->tablet_id(),
-                        fmt::join(to_delete | std::views::transform([](const 
auto& rs) {
-                                      return rs->version().to_string();
-                                  }),
-                                  ", "));
-                _new_tablet->delete_rowsets_for_schema_change(to_delete, 
wlock);
-            }
-        }
-        _new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock, 
false);
+        _new_tablet->replace_rowsets_with_schema_change_output(
+                _output_rowsets, sc_job->alter_version(), wlock, "commit", 
true);
         // Ensure the real new tablet has a continuous local version graph 
before it becomes
         // visible. Later RUNNING-tablet delete bitmap sync depends on 
capturing all old versions.
         RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().fill_version_holes(
@@ -610,6 +589,11 @@ Status 
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
 
     // step 1, process incremental rowset without delete bitmap update lock
     
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
+    {
+        std::unique_lock wlock(tmp_tablet->get_header_lock());
+        tmp_tablet->replace_rowsets_with_schema_change_output(_output_rowsets, 
alter_version, wlock,
+                                                              
"delete_bitmap_without_lock", false);
+    }
     int64_t max_version = tmp_tablet->max_version().second;
     LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
               << "incremental rowsets without lock, version: " << 
start_calc_delete_bitmap_version
@@ -619,10 +603,6 @@ Status 
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
                 {start_calc_delete_bitmap_version, max_version}, 
CaptureRowsetOps {}));
         
DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock",
                         DBUG_BLOCK);
-        {
-            std::unique_lock wlock(tmp_tablet->get_header_lock());
-            tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
-        }
         for (auto rowset : ret.rowsets) {
             
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, 
rowset));
         }
@@ -635,6 +615,11 @@ Status 
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
     
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().get_delete_bitmap_update_lock(
             *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator));
     
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
+    {
+        std::unique_lock wlock(tmp_tablet->get_header_lock());
+        tmp_tablet->replace_rowsets_with_schema_change_output(_output_rowsets, 
alter_version, wlock,
+                                                              
"delete_bitmap_with_lock", false);
+    }
     int64_t new_max_version = tmp_tablet->max_version().second;
     LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
               << "incremental rowsets with lock, version: " << max_version + 1 
<< "-"
@@ -642,10 +627,6 @@ Status 
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
     if (new_max_version > max_version) {
         auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked(
                 {max_version + 1, new_max_version}, CaptureRowsetOps {}));
-        {
-            std::unique_lock wlock(tmp_tablet->get_header_lock());
-            tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
-        }
         for (auto rowset : ret.rowsets) {
             
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, 
rowset));
         }
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 9ad09b317f1..ee9b4b20e0b 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -94,6 +94,17 @@ bvar::Window<bvar::Adder<int64_t>> 
g_capture_with_freshness_tolerance_fallback_c
 
 static constexpr int LOAD_INITIATOR_ID = -1;
 
+namespace {
+
+bool is_schema_change_output_rowset(const RowsetSharedPtr& rowset,
+                                    const std::vector<RowsetSharedPtr>& 
output_rowsets) {
+    return std::ranges::any_of(output_rowsets, [&rowset](const 
RowsetSharedPtr& output_rowset) {
+        return output_rowset->rowset_id() == rowset->rowset_id();
+    });
+}
+
+} // namespace
+
 bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_size(
         "file_cache_cloud_tablet_submitted_segment_size");
 bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_num(
@@ -492,7 +503,8 @@ void CloudTablet::delete_rowsets(const 
std::vector<RowsetSharedPtr>& to_delete,
 }
 
 void CloudTablet::delete_rowsets_for_schema_change(const 
std::vector<RowsetSharedPtr>& to_delete,
-                                                   
std::unique_lock<std::shared_mutex>&) {
+                                                   
std::unique_lock<std::shared_mutex>&,
+                                                   bool 
recycle_deleted_rowsets) {
     if (to_delete.empty()) {
         return;
     }
@@ -515,8 +527,35 @@ void CloudTablet::delete_rowsets_for_schema_change(const 
std::vector<RowsetShare
     // the other hits a DCHECK(false) in delete_expired_stale_rowsets().
     _tablet_meta->modify_rs_metas({}, rs_metas, true);
 
-    // Schedule for direct cache cleanup. MS has already recycled these 
rowsets.
-    add_unused_rowsets(to_delete);
+    if (recycle_deleted_rowsets) {
+        // Schedule for direct cache cleanup. MS has already recycled these 
rowsets.
+        add_unused_rowsets(to_delete);
+    }
+}
+
+void CloudTablet::replace_rowsets_with_schema_change_output(
+        const std::vector<RowsetSharedPtr>& output_rowsets, int64_t 
alter_version,
+        std::unique_lock<std::shared_mutex>& meta_lock, const char* stage,
+        bool recycle_deleted_rowsets) {
+    std::vector<RowsetSharedPtr> to_delete;
+    for (auto& [v, rs] : _rs_version_map) {
+        if (v.first >= 2 && v.second <= alter_version &&
+            !is_schema_change_output_rowset(rs, output_rowsets)) {
+            to_delete.push_back(rs);
+        }
+    }
+    if (!to_delete.empty()) {
+        LOG_INFO(
+                "schema change: delete {} local rowsets in [2, {}] before 
adding SC output, "
+                "tablet_id={}, stage={}, versions=[{}]",
+                to_delete.size(), alter_version, tablet_id(), stage,
+                fmt::join(to_delete | std::views::transform([](const auto& rs) 
{
+                              return rs->version().to_string();
+                          }),
+                          ", "));
+        delete_rowsets_for_schema_change(to_delete, meta_lock, 
recycle_deleted_rowsets);
+    }
+    add_rowsets(output_rowsets, true, meta_lock, false);
 }
 
 uint64_t CloudTablet::delete_expired_stale_rowsets() {
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index c31ff8078e7..bb2645ca552 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -164,7 +164,19 @@ public:
     // preferring stale compaction rowsets over individual SC output rowsets.
     // MUST hold EXCLUSIVE `_meta_lock`.
     void delete_rowsets_for_schema_change(const std::vector<RowsetSharedPtr>& 
to_delete,
-                                          std::unique_lock<std::shared_mutex>& 
meta_lock);
+                                          std::unique_lock<std::shared_mutex>& 
meta_lock,
+                                          bool recycle_deleted_rowsets = true);
+
+    // Replace local rowsets in [2, alter_version] with schema change output 
rowsets.
+    // Existing SC output rowsets are kept; other 
local/double-write/compaction rowsets
+    // in this version range are removed from both _rs_version_map and version 
graph.
+    // recycle_deleted_rowsets should only be true for the real tablet; 
temporary
+    // schema-change delete-bitmap tablets only need to normalize their local 
graph.
+    // MUST hold EXCLUSIVE `_meta_lock`.
+    void replace_rowsets_with_schema_change_output(
+            const std::vector<RowsetSharedPtr>& output_rowsets, int64_t 
alter_version,
+            std::unique_lock<std::shared_mutex>& meta_lock, const char* stage,
+            bool recycle_deleted_rowsets);
 
     // When the tablet is dropped, we need to recycle cached data:
     // 1. The data in file cache
diff --git a/be/test/cloud/cloud_tablet_test.cpp 
b/be/test/cloud/cloud_tablet_test.cpp
index 3c66f227ab3..23045f20d5e 100644
--- a/be/test/cloud/cloud_tablet_test.cpp
+++ b/be/test/cloud/cloud_tablet_test.cpp
@@ -1469,6 +1469,50 @@ TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, 
TestSchemaChangeDeletesCompa
     }
 }
 
+TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest,
+       TestReplaceSchemaChangeOutputCleansPollutedTmpGraph) {
+    auto rs_placeholder = create_rowset(Version(0, 1));
+    auto rs_sc_2 = create_rowset(Version(2, 2));
+    auto rs_sc_3 = create_rowset(Version(3, 3));
+    auto rs_compacted = create_rowset(Version(2, 3));
+    auto rs_post_alter = create_rowset(Version(4, 4));
+    ASSERT_NE(rs_placeholder, nullptr);
+    ASSERT_NE(rs_sc_2, nullptr);
+    ASSERT_NE(rs_sc_3, nullptr);
+    ASSERT_NE(rs_compacted, nullptr);
+    ASSERT_NE(rs_post_alter, nullptr);
+
+    {
+        std::unique_lock wlock(_tablet->get_header_lock());
+        _tablet->add_rowsets({rs_placeholder, rs_sc_2, rs_sc_3, rs_compacted, 
rs_post_alter}, false,
+                             wlock, false);
+    }
+    ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 2)));
+    ASSERT_TRUE(_tablet->rowset_map().count(Version(3, 3)));
+    ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 3)));
+
+    {
+        std::unique_lock wlock(_tablet->get_header_lock());
+        _tablet->replace_rowsets_with_schema_change_output({rs_sc_2, rs_sc_3}, 
3, wlock, "test",
+                                                           false);
+    }
+
+    ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 2)));
+    ASSERT_TRUE(_tablet->rowset_map().count(Version(3, 3)));
+    ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 3)));
+    ASSERT_TRUE(_tablet->rowset_map().count(Version(4, 4)));
+    ASSERT_FALSE(_tablet->need_remove_unused_rowsets());
+
+    auto versions_result = 
_tablet->capture_consistent_versions_unlocked(Version(0, 4), {});
+    ASSERT_TRUE(versions_result.has_value()) << versions_result.error();
+    auto& versions = versions_result.value();
+    ASSERT_EQ(versions.size(), 4);
+    ASSERT_EQ(versions[0], Version(0, 1));
+    ASSERT_EQ(versions[1], Version(2, 2));
+    ASSERT_EQ(versions[2], Version(3, 3));
+    ASSERT_EQ(versions[3], Version(4, 4));
+}
+
 // Test that delete_rowsets_for_schema_change with empty input is a no-op
 TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestEmptyDeleteIsNoop) {
     auto rs = create_rowset(Version(0, 1));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to