This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c9c3667182 [fix](cloud) Normalize SC rowset graph before delete 
bitmap capture (#63960)
7c9c3667182 is described below

commit 7c9c366718206f7651213e97a34cb770aec51881
Author: bobhan1 <[email protected]>
AuthorDate: Tue Jun 2 11:08:10 2026 +0800

    [fix](cloud) Normalize SC rowset graph before delete bitmap capture (#63960)
    
    ## Proposed changes
    
    This PR fixes the remaining MOW schema-change delete-bitmap path after
    #62256.
    
    #62256, whose master commit is
    `dd59f479af5a855401e3f862c751e8416070a1e2`, fixed the final
    schema-change commit path by deleting local rowsets in `[2,
    alter_version]` before adding the schema-change output rowsets to the
    real new tablet. That keeps the committed tablet rowset graph aligned
    with the Meta Service result.
    
    However, the delete-bitmap recompute path still builds and uses a
    temporary tablet in `CloudSchemaChangeJob::_process_delete_bitmap()`.
    That temporary tablet is initialized with the schema-change output
    rowsets, but after each `sync_tablet_rowsets(tmp_tablet)` it can again
    contain non-schema-change local rowsets in `[2, alter_version]`, such as
    double-write rowsets or compaction output rowsets.
    
    If the temporary tablet graph contains both:
    
    - schema-change output rowsets, for example `[2]`, `[3]`, ...
    - a wider local/compaction rowset, for example `[2-3]`
    
    then `capture_consistent_rowsets()` can choose the wider
    non-schema-change rowset from the temporary graph instead of the
    schema-change output rowsets. The delete bitmap is then recomputed
    against a rowset path that is not the one finally committed for the
    schema-changed tablet. A later MOW compaction may observe delete-bitmap
    coverage inconsistent with the visible rowset graph and fail
    row-count/delete-bitmap correctness checks.
    
    The fix is to normalize the temporary tablet rowset graph immediately
    after every `sync_tablet_rowsets(tmp_tablet)` and before capturing
    rowsets for delete-bitmap recomputation.
    
    Concretely this PR:
    
    - extracts `CloudTablet::replace_rowsets_with_schema_change_output()`;
    - removes non-schema-change local rowsets in `[2, alter_version]` from
    both `_rs_version_map` and the version graph before adding schema-change
    output rowsets;
    - reuses the helper in the real schema-change commit path;
    - calls the same helper after both tmp-tablet syncs in
    `_process_delete_bitmap()`;
    - keeps cache/delete-bitmap cleanup only for the real tablet, while the
    temporary tablet only normalizes its local graph;
    - adds a unit test that simulates a polluted tmp graph with `[2]`,
    `[3]`, and a stale compaction rowset `[2-3]`.
    
    ## Root cause
    
    #62256 fixed the final commit graph but not the earlier delete-bitmap
    recompute graph.
    
    The final tablet graph and the temporary delete-bitmap tablet graph must
    use the same schema-change output rowset path for historical versions.
    Otherwise delete bitmap recomputation may be based on a different rowset
    path from the one that becomes visible after schema change.
    
    This is why the issue can surface in a compaction after schema change
    has finished: the compaction output itself does not need to contain
    duplicate rows. The failure comes from delete bitmap state being
    recomputed from a polluted temporary rowset graph and later being
    applied to the committed schema-change graph.
    
    ## Testing
    
    ```
    ./run-be-ut.sh --run --filter=CloudTabletDeleteRowsetsForSchemaChangeTest.* 
-j100
    ```
---
 be/src/cloud/cloud_schema_change_job.cpp | 43 +++++++++---------------------
 be/src/cloud/cloud_tablet.cpp            | 45 +++++++++++++++++++++++++++++---
 be/src/cloud/cloud_tablet.h              | 14 +++++++++-
 be/test/cloud/cloud_tablet_test.cpp      | 44 +++++++++++++++++++++++++++++++
 4 files changed, 111 insertions(+), 35 deletions(-)

diff --git a/be/src/cloud/cloud_schema_change_job.cpp 
b/be/src/cloud/cloud_schema_change_job.cpp
index 018102654e0..9b0d0fcc379 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -550,29 +550,8 @@ Status 
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
         // during double write phase by `CloudMetaMgr::sync_tablet_rowsets` in 
another thread
         std::unique_lock lock {_new_tablet->get_sync_meta_lock()};
         std::unique_lock wlock(_new_tablet->get_header_lock());
-        // Mirror MS behavior: delete rowsets in [2, alter_version] before 
adding
-        // SC output rowsets to avoid stale compaction rowsets remaining 
visible.
-        {
-            int64_t alter_ver = sc_job->alter_version();
-            std::vector<RowsetSharedPtr> to_delete;
-            for (auto& [v, rs] : _new_tablet->rowset_map()) {
-                if (v.first >= 2 && v.second <= alter_ver) {
-                    to_delete.push_back(rs);
-                }
-            }
-            if (!to_delete.empty()) {
-                LOG_INFO(
-                        "schema change: delete {} local rowsets in [2, {}] 
before adding SC "
-                        "output, tablet_id={}, versions=[{}]",
-                        to_delete.size(), alter_ver, _new_tablet->tablet_id(),
-                        fmt::join(to_delete | std::views::transform([](const 
auto& rs) {
-                                      return rs->version().to_string();
-                                  }),
-                                  ", "));
-                _new_tablet->delete_rowsets_for_schema_change(to_delete, 
wlock);
-            }
-        }
-        _new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock, 
false);
+        _new_tablet->replace_rowsets_with_schema_change_output(
+                _output_rowsets, sc_job->alter_version(), wlock, "commit", 
true);
         // Ensure the real new tablet has a continuous local version graph 
before it becomes
         // visible. Later RUNNING-tablet delete bitmap sync depends on 
capturing all old versions.
         RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().fill_version_holes(
@@ -619,6 +598,11 @@ Status 
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
 
     // step 1, process incremental rowset without delete bitmap update lock
     
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
+    {
+        std::unique_lock wlock(tmp_tablet->get_header_lock());
+        tmp_tablet->replace_rowsets_with_schema_change_output(_output_rowsets, 
alter_version, wlock,
+                                                              
"delete_bitmap_without_lock", false);
+    }
     int64_t max_version = tmp_tablet->max_version().second;
     LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
               << "incremental rowsets without lock, version: " << 
start_calc_delete_bitmap_version
@@ -628,10 +612,6 @@ Status 
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
                 {start_calc_delete_bitmap_version, max_version}, 
CaptureRowsetOps {}));
         
DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock",
                         DBUG_BLOCK);
-        {
-            std::unique_lock wlock(tmp_tablet->get_header_lock());
-            tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
-        }
         for (auto rowset : ret.rowsets) {
             
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, 
rowset));
         }
@@ -644,6 +624,11 @@ Status 
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
     
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().get_delete_bitmap_update_lock(
             *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator));
     
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
+    {
+        std::unique_lock wlock(tmp_tablet->get_header_lock());
+        tmp_tablet->replace_rowsets_with_schema_change_output(_output_rowsets, 
alter_version, wlock,
+                                                              
"delete_bitmap_with_lock", false);
+    }
     int64_t new_max_version = tmp_tablet->max_version().second;
     LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
               << "incremental rowsets with lock, version: " << max_version + 1 
<< "-"
@@ -651,10 +636,6 @@ Status 
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
     if (new_max_version > max_version) {
         auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked(
                 {max_version + 1, new_max_version}, CaptureRowsetOps {}));
-        {
-            std::unique_lock wlock(tmp_tablet->get_header_lock());
-            tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
-        }
         for (auto rowset : ret.rowsets) {
             
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, 
rowset));
         }
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 339a5a757a2..52489d1448c 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -93,6 +93,17 @@ bvar::Window<bvar::Adder<int64_t>> 
g_capture_with_freshness_tolerance_fallback_c
 
 static constexpr int LOAD_INITIATOR_ID = -1;
 
+namespace {
+
+bool is_schema_change_output_rowset(const RowsetSharedPtr& rowset,
+                                    const std::vector<RowsetSharedPtr>& 
output_rowsets) {
+    return std::ranges::any_of(output_rowsets, [&rowset](const 
RowsetSharedPtr& output_rowset) {
+        return output_rowset->rowset_id() == rowset->rowset_id();
+    });
+}
+
+} // namespace
+
 bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_size(
         "file_cache_cloud_tablet_submitted_segment_size");
 bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_num(
@@ -491,7 +502,8 @@ void CloudTablet::delete_rowsets(const 
std::vector<RowsetSharedPtr>& to_delete,
 }
 
 void CloudTablet::delete_rowsets_for_schema_change(const 
std::vector<RowsetSharedPtr>& to_delete,
-                                                   
std::unique_lock<std::shared_mutex>&) {
+                                                   
std::unique_lock<std::shared_mutex>&,
+                                                   bool 
recycle_deleted_rowsets) {
     if (to_delete.empty()) {
         return;
     }
@@ -514,8 +526,35 @@ void CloudTablet::delete_rowsets_for_schema_change(const 
std::vector<RowsetShare
     // the other hits a DCHECK(false) in delete_expired_stale_rowsets().
     _tablet_meta->modify_rs_metas({}, rs_metas, true);
 
-    // Schedule for direct cache cleanup. MS has already recycled these 
rowsets.
-    add_unused_rowsets(to_delete);
+    if (recycle_deleted_rowsets) {
+        // Schedule for direct cache cleanup. MS has already recycled these 
rowsets.
+        add_unused_rowsets(to_delete);
+    }
+}
+
+void CloudTablet::replace_rowsets_with_schema_change_output(
+        const std::vector<RowsetSharedPtr>& output_rowsets, int64_t 
alter_version,
+        std::unique_lock<std::shared_mutex>& meta_lock, const char* stage,
+        bool recycle_deleted_rowsets) {
+    std::vector<RowsetSharedPtr> to_delete;
+    for (auto& [v, rs] : _rs_version_map) {
+        if (v.first >= 2 && v.second <= alter_version &&
+            !is_schema_change_output_rowset(rs, output_rowsets)) {
+            to_delete.push_back(rs);
+        }
+    }
+    if (!to_delete.empty()) {
+        LOG_INFO(
+                "schema change: delete {} local rowsets in [2, {}] before 
adding SC output, "
+                "tablet_id={}, stage={}, versions=[{}]",
+                to_delete.size(), alter_version, tablet_id(), stage,
+                fmt::join(to_delete | std::views::transform([](const auto& rs) 
{
+                              return rs->version().to_string();
+                          }),
+                          ", "));
+        delete_rowsets_for_schema_change(to_delete, meta_lock, 
recycle_deleted_rowsets);
+    }
+    add_rowsets(output_rowsets, true, meta_lock, false);
 }
 
 uint64_t CloudTablet::delete_expired_stale_rowsets() {
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index 807ca6207c4..93ffc40488d 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -164,7 +164,19 @@ public:
     // preferring stale compaction rowsets over individual SC output rowsets.
     // MUST hold EXCLUSIVE `_meta_lock`.
     void delete_rowsets_for_schema_change(const std::vector<RowsetSharedPtr>& 
to_delete,
-                                          std::unique_lock<std::shared_mutex>& 
meta_lock);
+                                          std::unique_lock<std::shared_mutex>& 
meta_lock,
+                                          bool recycle_deleted_rowsets = true);
+
+    // Replace local rowsets in [2, alter_version] with schema change output 
rowsets.
+    // Existing SC output rowsets are kept; other 
local/double-write/compaction rowsets
+    // in this version range are removed from both _rs_version_map and version 
graph.
+    // recycle_deleted_rowsets should only be true for the real tablet; 
temporary
+    // schema-change delete-bitmap tablets only need to normalize their local 
graph.
+    // MUST hold EXCLUSIVE `_meta_lock`.
+    void replace_rowsets_with_schema_change_output(
+            const std::vector<RowsetSharedPtr>& output_rowsets, int64_t 
alter_version,
+            std::unique_lock<std::shared_mutex>& meta_lock, const char* stage,
+            bool recycle_deleted_rowsets);
 
     // When the tablet is dropped, we need to recycle cached data:
     // 1. The data in file cache
diff --git a/be/test/cloud/cloud_tablet_test.cpp 
b/be/test/cloud/cloud_tablet_test.cpp
index 3c66f227ab3..23045f20d5e 100644
--- a/be/test/cloud/cloud_tablet_test.cpp
+++ b/be/test/cloud/cloud_tablet_test.cpp
@@ -1469,6 +1469,50 @@ TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, 
TestSchemaChangeDeletesCompa
     }
 }
 
+TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest,
+       TestReplaceSchemaChangeOutputCleansPollutedTmpGraph) {
+    auto rs_placeholder = create_rowset(Version(0, 1));
+    auto rs_sc_2 = create_rowset(Version(2, 2));
+    auto rs_sc_3 = create_rowset(Version(3, 3));
+    auto rs_compacted = create_rowset(Version(2, 3));
+    auto rs_post_alter = create_rowset(Version(4, 4));
+    ASSERT_NE(rs_placeholder, nullptr);
+    ASSERT_NE(rs_sc_2, nullptr);
+    ASSERT_NE(rs_sc_3, nullptr);
+    ASSERT_NE(rs_compacted, nullptr);
+    ASSERT_NE(rs_post_alter, nullptr);
+
+    {
+        std::unique_lock wlock(_tablet->get_header_lock());
+        _tablet->add_rowsets({rs_placeholder, rs_sc_2, rs_sc_3, rs_compacted, 
rs_post_alter}, false,
+                             wlock, false);
+    }
+    ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 2)));
+    ASSERT_TRUE(_tablet->rowset_map().count(Version(3, 3)));
+    ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 3)));
+
+    {
+        std::unique_lock wlock(_tablet->get_header_lock());
+        _tablet->replace_rowsets_with_schema_change_output({rs_sc_2, rs_sc_3}, 
3, wlock, "test",
+                                                           false);
+    }
+
+    ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 2)));
+    ASSERT_TRUE(_tablet->rowset_map().count(Version(3, 3)));
+    ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 3)));
+    ASSERT_TRUE(_tablet->rowset_map().count(Version(4, 4)));
+    ASSERT_FALSE(_tablet->need_remove_unused_rowsets());
+
+    auto versions_result = 
_tablet->capture_consistent_versions_unlocked(Version(0, 4), {});
+    ASSERT_TRUE(versions_result.has_value()) << versions_result.error();
+    auto& versions = versions_result.value();
+    ASSERT_EQ(versions.size(), 4);
+    ASSERT_EQ(versions[0], Version(0, 1));
+    ASSERT_EQ(versions[1], Version(2, 2));
+    ASSERT_EQ(versions[2], Version(3, 3));
+    ASSERT_EQ(versions[3], Version(4, 4));
+}
+
 // Test that delete_rowsets_for_schema_change with empty input is a no-op
 TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestEmptyDeleteIsNoop) {
     auto rs = create_rowset(Version(0, 1));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to