This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 65123a9484e branch-4.0: [fix](cloud) Delete local rowsets before 
add_rowsets in cloud schema change #62256 (#62310)
65123a9484e is described below

commit 65123a9484e201167af3ee117fe3ef0de5d62f0c
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri May 15 21:12:42 2026 -0700

    branch-4.0: [fix](cloud) Delete local rowsets before add_rowsets in cloud 
schema change #62256 (#62310)
    
    Cherry-picked from #62256
    
    Co-authored-by: bobhan1 <[email protected]>
---
 be/src/cloud/cloud_schema_change_job.cpp |  23 +++
 be/src/cloud/cloud_tablet.cpp            |  28 ++++
 be/src/cloud/cloud_tablet.h              |   7 +
 be/test/cloud/cloud_tablet_test.cpp      | 248 +++++++++++++++++++++++++++++++
 4 files changed, 306 insertions(+)

diff --git a/be/src/cloud/cloud_schema_change_job.cpp 
b/be/src/cloud/cloud_schema_change_job.cpp
index a3c9fd951ae..711bc158405 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -24,6 +24,7 @@
 #include <memory>
 #include <mutex>
 #include <random>
+#include <ranges>
 #include <thread>
 
 #include "cloud/cloud_meta_mgr.h"
@@ -540,6 +541,28 @@ Status 
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
         // during double write phase by `CloudMetaMgr::sync_tablet_rowsets` in 
another thread
         std::unique_lock lock {_new_tablet->get_sync_meta_lock()};
         std::unique_lock wlock(_new_tablet->get_header_lock());
+        // Mirror MS behavior: delete rowsets in [2, alter_version] before 
adding
+        // SC output rowsets to avoid stale compaction rowsets remaining 
visible.
+        {
+            int64_t alter_ver = sc_job->alter_version();
+            std::vector<RowsetSharedPtr> to_delete;
+            for (auto& [v, rs] : _new_tablet->rowset_map()) {
+                if (v.first >= 2 && v.second <= alter_ver) {
+                    to_delete.push_back(rs);
+                }
+            }
+            if (!to_delete.empty()) {
+                LOG_INFO(
+                        "schema change: delete {} local rowsets in [2, {}] 
before adding SC "
+                        "output, tablet_id={}, versions=[{}]",
+                        to_delete.size(), alter_ver, _new_tablet->tablet_id(),
+                        fmt::join(to_delete | std::views::transform([](const 
auto& rs) {
+                                      return rs->version().to_string();
+                                  }),
+                                  ", "));
+                _new_tablet->delete_rowsets_for_schema_change(to_delete, 
wlock);
+            }
+        }
         _new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock, 
false);
         _new_tablet->set_cumulative_layer_point(_output_cumulative_point);
         _new_tablet->reset_approximate_stats(stats.num_rowsets(), 
stats.num_segments(),
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 19bc5b92791..db620468fcb 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -682,6 +682,34 @@ void CloudTablet::delete_rowsets(const 
std::vector<RowsetSharedPtr>& to_delete,
     _tablet_meta->modify_rs_metas({}, rs_metas, false);
 }
 
+void CloudTablet::delete_rowsets_for_schema_change(const 
std::vector<RowsetSharedPtr>& to_delete,
+                                                   
std::unique_lock<std::shared_mutex>&) {
+    if (to_delete.empty()) {
+        return;
+    }
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    rs_metas.reserve(to_delete.size());
+    for (auto&& rs : to_delete) {
+        rs_metas.push_back(rs->rowset_meta());
+        _rs_version_map.erase(rs->version());
+        // Remove edge from version graph so that the greedy capture algorithm
+        // won't prefer the wider stale compaction rowset over individual SC
+        // output rowsets (e.g. [818-822] vs [818],[819],...,[822]).
+        _timestamped_version_tracker.delete_version(rs->version());
+    }
+
+    // Use same_version=true to skip adding to _stale_rs_metas. Do NOT use the
+    // stale tracking mechanism (_stale_rs_version_map / 
_stale_version_path_map)
+    // because SC output will create new rowsets with identical version ranges;
+    // a later compaction could put those into stale as well, causing two stale
+    // paths to reference the same version key -- when one path is cleaned 
first,
+    // the other hits a DCHECK(false) in delete_expired_stale_rowsets().
+    _tablet_meta->modify_rs_metas({}, rs_metas, true);
+
+    // Schedule for direct cache cleanup. MS has already recycled these 
rowsets.
+    add_unused_rowsets(to_delete);
+}
+
 uint64_t CloudTablet::delete_expired_stale_rowsets() {
     if (config::enable_mow_verbose_log) {
         LOG_INFO("begin delete_expired_stale_rowset for tablet={}", 
tablet_id());
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index b27bd7c5b55..5def9eabea2 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -159,6 +159,13 @@ public:
     void delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,
                         std::unique_lock<std::shared_mutex>& meta_lock);
 
+    // Like delete_rowsets, but also removes edges from the version graph.
+    // Used by schema change to prevent the greedy capture algorithm from
+    // preferring stale compaction rowsets over individual SC output rowsets.
+    // MUST hold EXCLUSIVE `_meta_lock`.
+    void delete_rowsets_for_schema_change(const std::vector<RowsetSharedPtr>& 
to_delete,
+                                          std::unique_lock<std::shared_mutex>& 
meta_lock);
+
     // When the tablet is dropped, we need to recycle cached data:
     // 1. The data in file cache
     // 2. The memory in tablet cache
diff --git a/be/test/cloud/cloud_tablet_test.cpp 
b/be/test/cloud/cloud_tablet_test.cpp
index 904dc2e3fdf..356bbc2e040 100644
--- a/be/test/cloud/cloud_tablet_test.cpp
+++ b/be/test/cloud/cloud_tablet_test.cpp
@@ -997,4 +997,252 @@ TEST_F(CloudTabletWarmUpStateTest, 
TestWarmedUpOverridesNotWarmedUp) {
     EXPECT_TRUE(_tablet->is_rowset_warmed_up(rowset->rowset_id()));
 }
 
+class CloudTabletDeleteRowsetsForSchemaChangeTest : public testing::Test {
+public:
+    CloudTabletDeleteRowsetsForSchemaChangeTest() : 
_engine(CloudStorageEngine(EngineOptions {})) {}
+
+    void SetUp() override {
+        _tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, 
TTabletSchema(), 6, {{7, 8}},
+                                          UniqueId(9, 10), 
TTabletType::TABLET_TYPE_DISK,
+                                          TCompressionType::LZ4F));
+        _tablet =
+                std::make_shared<CloudTablet>(_engine, 
std::make_shared<TabletMeta>(*_tablet_meta));
+    }
+    void TearDown() override {}
+
+    RowsetSharedPtr create_rowset(Version version) {
+        auto rs_meta = std::make_shared<RowsetMeta>();
+        rs_meta->set_rowset_type(BETA_ROWSET);
+        rs_meta->set_version(version);
+        rs_meta->set_rowset_id(_engine.next_rowset_id());
+        RowsetSharedPtr rowset;
+        Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta, 
&rowset);
+        if (!st.ok()) {
+            return nullptr;
+        }
+        return rowset;
+    }
+
+protected:
+    TabletMetaSharedPtr _tablet_meta;
+    std::shared_ptr<CloudTablet> _tablet;
+    CloudStorageEngine _engine;
+};
+
+// Simulate the DORIS-25014 scenario:
+// - New tablet has compacted rowset [2-6] from compaction during SC
+// - SC produces individual output rowsets [2],[3],[4],[5],[6]
+// - Without the fix, add_rowsets fails to remove [2-6] because
+//   [2].contains([2-6]) = false
+// - With delete_rowsets_for_schema_change, the stale compaction rowset is
+//   removed from both _rs_version_map and version graph before add_rowsets
+TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, 
TestSchemaChangeDeletesCompactionRowset) {
+    // Setup: add placeholder [0-1] and compacted rowset [2-6]
+    auto rs_placeholder = create_rowset(Version(0, 1));
+    auto rs_compacted = create_rowset(Version(2, 6));
+    ASSERT_NE(rs_placeholder, nullptr);
+    ASSERT_NE(rs_compacted, nullptr);
+
+    {
+        std::unique_lock wlock(_tablet->get_header_lock());
+        _tablet->add_rowsets({rs_placeholder, rs_compacted}, false, wlock, 
false);
+    }
+    // Verify initial state
+    ASSERT_EQ(_tablet->rowset_map().size(), 2);
+    ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 6)));
+
+    // SC produces individual rowsets
+    std::vector<RowsetSharedPtr> sc_output;
+    for (int v = 2; v <= 6; v++) {
+        auto rs = create_rowset(Version(v, v));
+        ASSERT_NE(rs, nullptr);
+        sc_output.push_back(rs);
+    }
+
+    // Simulate delete_rowsets_for_schema_change + add_rowsets
+    int64_t alter_version = 6;
+    {
+        std::unique_lock wlock(_tablet->get_header_lock());
+        // Collect rowsets in [2, alter_version]
+        std::vector<RowsetSharedPtr> to_delete;
+        for (auto& [v, rs] : _tablet->rowset_map()) {
+            if (v.first >= 2 && v.second <= alter_version) {
+                to_delete.push_back(rs);
+            }
+        }
+        ASSERT_EQ(to_delete.size(), 1); // only [2-6]
+        ASSERT_EQ(to_delete[0]->version(), Version(2, 6));
+
+        _tablet->delete_rowsets_for_schema_change(to_delete, wlock);
+
+        // [2-6] should be removed from rs_version_map
+        ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 6)));
+        // Should NOT go to stale (to avoid stale path conflicts), but to 
unused
+        ASSERT_FALSE(_tablet->has_stale_rowsets());
+        ASSERT_TRUE(_tablet->need_remove_unused_rowsets());
+
+        _tablet->add_rowsets(std::move(sc_output), false, wlock, false);
+    }
+
+    // Verify: individual SC rowsets are now in rs_version_map
+    ASSERT_EQ(_tablet->rowset_map().size(), 6); // [0-1] + 5 individual
+    for (int v = 2; v <= 6; v++) {
+        ASSERT_TRUE(_tablet->rowset_map().count(Version(v, v)))
+                << "Missing version " << v << "-" << v;
+    }
+    ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 6)));
+
+    // Verify: capture_consistent_versions works correctly (no stale edges)
+    auto versions_result = 
_tablet->capture_consistent_versions_unlocked(Version(0, 6), {});
+    ASSERT_TRUE(versions_result.has_value()) << versions_result.error();
+    auto& versions = versions_result.value();
+    ASSERT_EQ(versions.size(), 6); // [0-1] + [2],[3],[4],[5],[6]
+    ASSERT_EQ(versions[0], Version(0, 1));
+    for (int i = 0; i < 5; i++) {
+        ASSERT_EQ(versions[i + 1], Version(2 + i, 2 + i));
+    }
+}
+
+// Test that delete_rowsets_for_schema_change with empty input is a no-op
+TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestEmptyDeleteIsNoop) {
+    auto rs = create_rowset(Version(0, 1));
+    ASSERT_NE(rs, nullptr);
+    {
+        std::unique_lock wlock(_tablet->get_header_lock());
+        _tablet->add_rowsets({rs}, false, wlock, false);
+    }
+    ASSERT_EQ(_tablet->rowset_map().size(), 1);
+
+    {
+        std::unique_lock wlock(_tablet->get_header_lock());
+        _tablet->delete_rowsets_for_schema_change({}, wlock);
+    }
+    ASSERT_EQ(_tablet->rowset_map().size(), 1);
+    ASSERT_FALSE(_tablet->has_stale_rowsets());
+}
+
+// Test with multiple compaction rowsets spanning different version ranges
+TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, 
TestMultipleCompactionRowsets) {
+    auto rs_placeholder = create_rowset(Version(0, 1));
+    auto rs_comp1 = create_rowset(Version(2, 5));
+    auto rs_comp2 = create_rowset(Version(6, 10));
+    auto rs_post = create_rowset(Version(11, 11)); // after alter_version, 
should NOT be deleted
+    ASSERT_NE(rs_placeholder, nullptr);
+    ASSERT_NE(rs_comp1, nullptr);
+    ASSERT_NE(rs_comp2, nullptr);
+    ASSERT_NE(rs_post, nullptr);
+
+    {
+        std::unique_lock wlock(_tablet->get_header_lock());
+        _tablet->add_rowsets({rs_placeholder, rs_comp1, rs_comp2, rs_post}, 
false, wlock, false);
+    }
+    ASSERT_EQ(_tablet->rowset_map().size(), 4);
+
+    // SC output: individual rowsets for versions 2-10
+    std::vector<RowsetSharedPtr> sc_output;
+    for (int v = 2; v <= 10; v++) {
+        auto rs = create_rowset(Version(v, v));
+        ASSERT_NE(rs, nullptr);
+        sc_output.push_back(rs);
+    }
+
+    int64_t alter_version = 10;
+    {
+        std::unique_lock wlock(_tablet->get_header_lock());
+        std::vector<RowsetSharedPtr> to_delete;
+        for (auto& [v, rs] : _tablet->rowset_map()) {
+            if (v.first >= 2 && v.second <= alter_version) {
+                to_delete.push_back(rs);
+            }
+        }
+        ASSERT_EQ(to_delete.size(), 2); // [2-5] and [6-10]
+
+        _tablet->delete_rowsets_for_schema_change(to_delete, wlock);
+
+        // Post-alter rowset should survive
+        ASSERT_TRUE(_tablet->rowset_map().count(Version(11, 11)));
+        ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 5)));
+        ASSERT_FALSE(_tablet->rowset_map().count(Version(6, 10)));
+
+        _tablet->add_rowsets(std::move(sc_output), false, wlock, false);
+    }
+
+    // Verify: [0-1], [2],[3],...,[10], [11-11]
+    ASSERT_EQ(_tablet->rowset_map().size(), 11);
+
+    // Verify capture
+    auto versions_result = 
_tablet->capture_consistent_versions_unlocked(Version(0, 11), {});
+    ASSERT_TRUE(versions_result.has_value()) << versions_result.error();
+    auto& versions = versions_result.value();
+    ASSERT_EQ(versions.size(), 11);
+    ASSERT_EQ(versions[0], Version(0, 1));
+    for (int i = 0; i < 9; i++) {
+        ASSERT_EQ(versions[i + 1], Version(2 + i, 2 + i));
+    }
+    ASSERT_EQ(versions[10], Version(11, 11));
+}
+
+// Reproduce the CI crash scenario: SC delete puts rowsets to stale, then
+// compaction creates a new stale path with overlapping version keys. When
+// one stale path is cleaned, the other hits DCHECK(false) because the
+// version is already removed from _stale_rs_version_map.
+// With the fix (bypassing stale tracking), this should not happen.
+TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, 
TestNoStalePathConflictWithCompaction) {
+    // Setup: [0-1] placeholder, [2-6] compaction product during SC
+    auto rs_placeholder = create_rowset(Version(0, 1));
+    auto rs_compacted = create_rowset(Version(2, 6));
+    ASSERT_NE(rs_placeholder, nullptr);
+    ASSERT_NE(rs_compacted, nullptr);
+
+    {
+        std::unique_lock wlock(_tablet->get_header_lock());
+        _tablet->add_rowsets({rs_placeholder, rs_compacted}, false, wlock, 
false);
+    }
+
+    // SC output: individual rowsets [2],[3],[4],[5],[6]
+    std::vector<RowsetSharedPtr> sc_output;
+    for (int v = 2; v <= 6; v++) {
+        sc_output.push_back(create_rowset(Version(v, v)));
+    }
+
+    // Step 1: delete_rowsets_for_schema_change + add SC output
+    {
+        std::unique_lock wlock(_tablet->get_header_lock());
+        _tablet->delete_rowsets_for_schema_change({rs_compacted}, wlock);
+        _tablet->add_rowsets(std::move(sc_output), false, wlock, false);
+    }
+    // Stale should be empty — SC delete bypasses stale tracking
+    ASSERT_FALSE(_tablet->has_stale_rowsets());
+
+    // Step 2: compaction merges SC output [2],[3],[4],[5],[6] -> [2-6]
+    auto rs_new_compacted = create_rowset(Version(2, 6));
+    std::vector<RowsetSharedPtr> compaction_input;
+    {
+        std::unique_lock wlock(_tablet->get_header_lock());
+        for (auto& [v, rs] : _tablet->rowset_map()) {
+            if (v.first >= 2 && v.second <= 6) {
+                compaction_input.push_back(rs);
+            }
+        }
+        ASSERT_EQ(compaction_input.size(), 5);
+        // Normal compaction delete_rowsets — this WILL use stale tracking
+        _tablet->delete_rowsets(compaction_input, wlock);
+        _tablet->add_rowsets({rs_new_compacted}, false, wlock, false);
+    }
+    // Now stale has the compaction inputs
+    ASSERT_TRUE(_tablet->has_stale_rowsets());
+
+    // Step 3: delete_expired_stale_rowsets — this is where CI crashed
+    // With old code: stale path from SC and compaction both reference [2-6] 
key,
+    // causing DCHECK(false). With fix: only compaction stale path exists, no 
conflict.
+    config::tablet_rowset_stale_sweep_time_sec = 0; // expire immediately
+    ASSERT_NO_FATAL_FAILURE(_tablet->delete_expired_stale_rowsets());
+
+    // Verify final state: [0-1] and [2-6] active, no stale left
+    ASSERT_EQ(_tablet->rowset_map().size(), 2);
+    ASSERT_TRUE(_tablet->rowset_map().count(Version(0, 1)));
+    ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 6)));
+    ASSERT_FALSE(_tablet->has_stale_rowsets());
+}
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to