This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0e477b6c8dc [opt](rowset) Remote fetch rowsets to avoid -230 error 
when capturing rowsets (#52440)
0e477b6c8dc is described below

commit 0e477b6c8dc5ddf699bc86c32d9ab646871a0f40
Author: Siyang Tang <[email protected]>
AuthorDate: Mon Jul 7 11:04:12 2025 +0800

    [opt](rowset) Remote fetch rowsets to avoid -230 error when capturing 
rowsets (#52440)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    Make stale rowsets accessible across BEs to avoid E-230 (versions
    already merged) in Read-Write Splitting senario.
---
 be/src/cloud/cloud_base_compaction.cpp             |   2 +-
 be/src/cloud/cloud_cumulative_compaction.cpp       |  10 +-
 be/src/cloud/cloud_full_compaction.cpp             |  10 +-
 be/src/cloud/cloud_meta_mgr.cpp                    |   2 +-
 be/src/cloud/cloud_schema_change_job.cpp           |  26 +-
 be/src/cloud/cloud_tablet.cpp                      |  45 +-
 be/src/cloud/cloud_tablet.h                        |   3 -
 be/src/cloud/cloud_tablet_mgr.cpp                  |  20 +-
 be/src/cloud/cloud_tablet_mgr.h                    |   2 +-
 be/src/common/config.cpp                           |   1 +
 be/src/common/config.h                             |   1 +
 be/src/http/action/delete_bitmap_action.cpp        |   6 +-
 be/src/olap/base_tablet.cpp                        |  47 +--
 be/src/olap/base_tablet.h                          |  43 +-
 be/src/olap/compaction.cpp                         |   6 +-
 be/src/olap/data_dir.cpp                           |   6 +-
 be/src/olap/full_compaction.cpp                    |   8 +-
 be/src/olap/merger.cpp                             |   8 +-
 be/src/olap/parallel_scanner_builder.cpp           |  21 +-
 be/src/olap/parallel_scanner_builder.h             |   8 +-
 be/src/olap/rowset/rowset_reader_context.h         |   2 +-
 be/src/olap/rowset_version_mgr.cpp                 | 452 +++++++++++++++++++++
 be/src/olap/schema_change.cpp                      |  22 +-
 be/src/olap/snapshot_manager.cpp                   |  21 +-
 be/src/olap/tablet.cpp                             |  94 +----
 be/src/olap/tablet.h                               |   9 -
 be/src/olap/tablet_manager.cpp                     |   2 +-
 be/src/olap/tablet_meta.cpp                        |  54 ++-
 be/src/olap/tablet_meta.h                          |  14 +-
 be/src/olap/tablet_reader.cpp                      |  15 +-
 be/src/olap/tablet_reader.h                        |  14 +-
 be/src/olap/task/engine_checksum_task.cpp          |  13 +-
 be/src/olap/task/engine_clone_task.cpp             |   2 +-
 be/src/olap/task/engine_storage_migration_task.cpp |  10 +-
 be/src/olap/task/index_builder.cpp                 |   4 +-
 be/src/pipeline/exec/olap_scan_operator.cpp        |   9 +-
 be/src/pipeline/exec/olap_scan_operator.h          |   2 +-
 be/src/service/internal_service.cpp                |  67 +++
 be/src/service/internal_service.h                  |   5 +
 be/src/vec/exec/scan/new_olap_scanner.cpp          |  24 +-
 be/src/vec/exec/scan/new_olap_scanner.h            |   2 +-
 be/test/olap/delta_writer_test.cpp                 |   4 +-
 be/test/olap/segcompaction_mow_test.cpp            |   2 +-
 be/test/olap/tablet_meta_test.cpp                  |  18 +-
 be/test/olap/test_data/rowset_meta.json            |   4 +
 .../apache/doris/cloud/catalog/CloudReplica.java   |  16 +
 .../apache/doris/service/FrontendServiceImpl.java  |  28 +-
 gensrc/proto/internal_service.proto                |  16 +
 .../cloud/test_cloud_version_already_merged.out    | Bin 0 -> 199 bytes
 regression-test/pipeline/p0/conf/be.conf           |   2 +
 .../cloud/test_cloud_version_already_merged.groovy | 126 ++++++
 51 files changed, 1010 insertions(+), 318 deletions(-)

diff --git a/be/src/cloud/cloud_base_compaction.cpp 
b/be/src/cloud/cloud_base_compaction.cpp
index 3b861e1c944..a1f57d8ce3d 100644
--- a/be/src/cloud/cloud_base_compaction.cpp
+++ b/be/src/cloud/cloud_base_compaction.cpp
@@ -419,7 +419,7 @@ Status CloudBaseCompaction::modify_rowsets() {
         // the tablet to be unable to synchronize the rowset meta changes 
generated by cumu compaction.
         cloud_tablet()->set_base_compaction_cnt(stats.base_compaction_cnt());
         if (output_rowset_delete_bitmap) {
-            
_tablet->tablet_meta()->delete_bitmap().merge(*output_rowset_delete_bitmap);
+            
_tablet->tablet_meta()->delete_bitmap()->merge(*output_rowset_delete_bitmap);
         }
         if (stats.cumulative_compaction_cnt() >= 
cloud_tablet()->cumulative_compaction_cnt()) {
             cloud_tablet()->reset_approximate_stats(stats.num_rowsets(), 
stats.num_segments(),
diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp 
b/be/src/cloud/cloud_cumulative_compaction.cpp
index 39954097324..71234470298 100644
--- a/be/src/cloud/cloud_cumulative_compaction.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction.cpp
@@ -376,7 +376,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
         
cloud_tablet()->set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
         cloud_tablet()->set_cumulative_layer_point(stats.cumulative_point());
         if (output_rowset_delete_bitmap) {
-            
_tablet->tablet_meta()->delete_bitmap().merge(*output_rowset_delete_bitmap);
+            
_tablet->tablet_meta()->delete_bitmap()->merge(*output_rowset_delete_bitmap);
         }
         if (stats.base_compaction_cnt() >= 
cloud_tablet()->base_compaction_cnt()) {
             cloud_tablet()->reset_approximate_stats(stats.num_rowsets(), 
stats.num_segments(),
@@ -416,7 +416,7 @@ Status 
CloudCumulativeCompaction::process_old_version_delete_bitmap() {
                 rowset->rowset_id().to_string();
                 DeleteBitmap::BitmapKey start {rowset->rowset_id(), seg_id, 0};
                 DeleteBitmap::BitmapKey end {rowset->rowset_id(), seg_id, 
pre_max_version};
-                auto d = _tablet->tablet_meta()->delete_bitmap().get_agg(
+                auto d = _tablet->tablet_meta()->delete_bitmap()->get_agg(
                         {rowset->rowset_id(), seg_id, pre_max_version});
                 
to_remove_vec.emplace_back(std::make_tuple(_tablet->tablet_id(), start, end));
                 if (d->isEmpty()) {
@@ -440,10 +440,10 @@ Status 
CloudCumulativeCompaction::process_old_version_delete_bitmap() {
                             _input_rowsets.back()->end_version());
             for (auto it = new_delete_bitmap->delete_bitmap.begin();
                  it != new_delete_bitmap->delete_bitmap.end(); it++) {
-                _tablet->tablet_meta()->delete_bitmap().set(it->first, 
it->second);
+                _tablet->tablet_meta()->delete_bitmap()->set(it->first, 
it->second);
             }
-            
_tablet->tablet_meta()->delete_bitmap().add_to_remove_queue(version.to_string(),
-                                                                        
to_remove_vec);
+            
_tablet->tablet_meta()->delete_bitmap()->add_to_remove_queue(version.to_string(),
+                                                                         
to_remove_vec);
             DBUG_EXECUTE_IF(
                     
"CloudCumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets",
                     { 
static_cast<CloudTablet*>(_tablet.get())->delete_expired_stale_rowsets(); });
diff --git a/be/src/cloud/cloud_full_compaction.cpp 
b/be/src/cloud/cloud_full_compaction.cpp
index 6bfab2ec698..7358f6d1915 100644
--- a/be/src/cloud/cloud_full_compaction.cpp
+++ b/be/src/cloud/cloud_full_compaction.cpp
@@ -26,6 +26,7 @@
 #include "common/status.h"
 #include "cpp/sync_point.h"
 #include "gen_cpp/cloud.pb.h"
+#include "olap/base_tablet.h"
 #include "olap/compaction.h"
 #include "olap/rowset/beta_rowset.h"
 #include "olap/tablet_meta.h"
@@ -273,7 +274,7 @@ Status CloudFullCompaction::modify_rowsets() {
         cloud_tablet()->set_base_compaction_cnt(stats.base_compaction_cnt());
         cloud_tablet()->set_cumulative_layer_point(stats.cumulative_point());
         if (output_rowset_delete_bitmap) {
-            
_tablet->tablet_meta()->delete_bitmap().merge(*output_rowset_delete_bitmap);
+            
_tablet->tablet_meta()->delete_bitmap()->merge(*output_rowset_delete_bitmap);
         }
         if (stats.cumulative_compaction_cnt() >= 
cloud_tablet()->cumulative_compaction_cnt()) {
             cloud_tablet()->reset_approximate_stats(stats.num_rowsets(), 
stats.num_segments(),
@@ -340,8 +341,9 @@ Status 
CloudFullCompaction::_cloud_full_compaction_update_delete_bitmap(int64_t
     int64_t max_version = cloud_tablet()->max_version().second;
     DCHECK(max_version >= _output_rowset->version().second);
     if (max_version > _output_rowset->version().second) {
-        RETURN_IF_ERROR(cloud_tablet()->capture_consistent_rowsets_unlocked(
-                {_output_rowset->version().second + 1, max_version}, 
&tmp_rowsets));
+        auto ret = 
DORIS_TRY(cloud_tablet()->capture_consistent_rowsets_unlocked(
+                {_output_rowset->version().second + 1, max_version}, 
CaptureRowsetOps {}));
+        tmp_rowsets = std::move(ret.rowsets);
     }
     for (const auto& it : tmp_rowsets) {
         int64_t cur_version = it->rowset_meta()->start_version();
@@ -372,7 +374,7 @@ Status 
CloudFullCompaction::_cloud_full_compaction_update_delete_bitmap(int64_t
             .tag("input_segments", _input_segments)
             .tag("input_rowsets_total_size", _input_rowsets_total_size)
             .tag("update_bitmap_size", delete_bitmap->delete_bitmap.size());
-    _tablet->tablet_meta()->delete_bitmap().merge(*delete_bitmap);
+    _tablet->tablet_meta()->delete_bitmap()->merge(*delete_bitmap);
     return Status::OK();
 }
 
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index dff85b3f639..684d89cb01b 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -577,7 +577,7 @@ Status 
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
                         .error(st);
                 return st;
             }
-            tablet->tablet_meta()->delete_bitmap().merge(delete_bitmap);
+            tablet->tablet_meta()->delete_bitmap()->merge(delete_bitmap);
             if (config::enable_mow_verbose_log && !resp.rowset_meta().empty() 
&&
                 delete_bitmap.cardinality() > 0) {
                 std::vector<std::string> new_rowset_msgs;
diff --git a/be/src/cloud/cloud_schema_change_job.cpp 
b/be/src/cloud/cloud_schema_change_job.cpp
index a5707e51bb6..405dcbe1a0d 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -27,6 +27,7 @@
 #include "cloud/cloud_meta_mgr.h"
 #include "cloud/cloud_tablet_mgr.h"
 #include "common/status.h"
+#include "olap/base_tablet.h"
 #include "olap/delete_handler.h"
 #include "olap/olap_define.h"
 #include "olap/rowset/beta_rowset.h"
@@ -186,7 +187,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const 
TAlterTabletReqV2& reque
     reader_context.sequence_id_idx = 
reader_context.tablet_schema->sequence_col_idx();
     reader_context.is_unique = _base_tablet->keys_type() == UNIQUE_KEYS;
     reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
-    reader_context.delete_bitmap = 
&_base_tablet->tablet_meta()->delete_bitmap();
+    reader_context.delete_bitmap = 
_base_tablet->tablet_meta()->delete_bitmap();
     reader_context.version = Version(0, start_resp.alter_version());
 
     for (auto& split : rs_splits) {
@@ -457,7 +458,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t 
alter_version,
             .tag("alter_version", alter_version);
     
RETURN_IF_ERROR(_cloud_storage_engine.register_compaction_stop_token(_new_tablet,
 initiator));
     TabletMetaSharedPtr tmp_meta = 
std::make_shared<TabletMeta>(*(_new_tablet->tablet_meta()));
-    tmp_meta->delete_bitmap().delete_bitmap.clear();
+    tmp_meta->delete_bitmap()->delete_bitmap.clear();
     std::shared_ptr<CloudTablet> tmp_tablet =
             std::make_shared<CloudTablet>(_cloud_storage_engine, tmp_meta);
     {
@@ -466,22 +467,21 @@ Status 
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
     }
 
     // step 1, process incremental rowset without delete bitmap update lock
-    std::vector<RowsetSharedPtr> incremental_rowsets;
     
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
     int64_t max_version = tmp_tablet->max_version().second;
     LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
               << "incremental rowsets without lock, version: " << 
start_calc_delete_bitmap_version
               << "-" << max_version << " new_table_id: " << 
_new_tablet->tablet_id();
     if (max_version >= start_calc_delete_bitmap_version) {
-        RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked(
-                {start_calc_delete_bitmap_version, max_version}, 
&incremental_rowsets));
+        auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked(
+                {start_calc_delete_bitmap_version, max_version}, 
CaptureRowsetOps {}));
         
DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock",
                         DBUG_BLOCK);
         {
             std::unique_lock wlock(tmp_tablet->get_header_lock());
             tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
         }
-        for (auto rowset : incremental_rowsets) {
+        for (auto rowset : ret.rowsets) {
             
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, 
rowset));
         }
     }
@@ -497,15 +497,14 @@ Status 
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
     LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
               << "incremental rowsets with lock, version: " << max_version + 1 
<< "-"
               << new_max_version << " new_tablet_id: " << 
_new_tablet->tablet_id();
-    std::vector<RowsetSharedPtr> new_incremental_rowsets;
     if (new_max_version > max_version) {
-        RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked(
-                {max_version + 1, new_max_version}, &new_incremental_rowsets));
+        auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked(
+                {max_version + 1, new_max_version}, CaptureRowsetOps {}));
         {
             std::unique_lock wlock(tmp_tablet->get_header_lock());
             tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
         }
-        for (auto rowset : new_incremental_rowsets) {
+        for (auto rowset : ret.rowsets) {
             
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, 
rowset));
         }
     }
@@ -522,13 +521,14 @@ Status 
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
         }
     });
 
-    auto& delete_bitmap = tmp_tablet->tablet_meta()->delete_bitmap();
+    auto delete_bitmap = tmp_tablet->tablet_meta()->delete_bitmap();
 
     // step4, store delete bitmap
     RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().update_delete_bitmap(
-            *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator, 
&delete_bitmap));
+            *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator, 
delete_bitmap.get()));
 
-    _new_tablet->tablet_meta()->delete_bitmap() = delete_bitmap;
+    auto original_dbm = _new_tablet->tablet_meta()->delete_bitmap();
+    *original_dbm = std::move(*delete_bitmap);
     return Status::OK();
 }
 
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 78663de6ed8..85b60e1588a 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -37,6 +37,7 @@
 #include "common/logging.h"
 #include "io/cache/block_file_cache_downloader.h"
 #include "io/cache/block_file_cache_factory.h"
+#include "olap/base_tablet.h"
 #include "olap/compaction.h"
 #include "olap/cumulative_compaction_time_series_policy.h"
 #include "olap/olap_define.h"
@@ -69,23 +70,6 @@ bool CloudTablet::exceed_version_limit(int32_t limit) {
     return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit;
 }
 
-Status CloudTablet::capture_consistent_rowsets_unlocked(
-        const Version& spec_version, std::vector<RowsetSharedPtr>* rowsets) 
const {
-    Versions version_path;
-    auto st = 
_timestamped_version_tracker.capture_consistent_versions(spec_version, 
&version_path);
-    if (!st.ok()) {
-        // Check no missed versions or req version is merged
-        auto missed_versions = get_missed_versions(spec_version.second);
-        if (missed_versions.empty()) {
-            st.set_code(VERSION_ALREADY_MERGED); // Reset error code
-        }
-        st.append(" tablet_id=" + std::to_string(tablet_id()));
-        return st;
-    }
-    VLOG_DEBUG << "capture consitent versions: " << version_path;
-    return _capture_consistent_rowsets_unlocked(version_path, rowsets);
-}
-
 std::string CloudTablet::tablet_path() const {
     return "";
 }
@@ -97,25 +81,10 @@ Status CloudTablet::capture_rs_readers(const Version& 
spec_version,
         LOG_WARNING("CloudTablet.capture_rs_readers.return 
e-230").tag("tablet_id", tablet_id());
         return Status::Error<false>(-230, "injected error");
     });
-    Versions version_path;
     std::shared_lock rlock(_meta_lock);
-    auto st = 
_timestamped_version_tracker.capture_consistent_versions(spec_version, 
&version_path);
-    if (!st.ok()) {
-        rlock.unlock(); // avoid logging in lock range
-        // Check no missed versions or req version is merged
-        auto missed_versions = get_missed_versions(spec_version.second);
-        if (missed_versions.empty()) {
-            st.set_code(VERSION_ALREADY_MERGED); // Reset error code
-            st.append(" versions are already compacted, ");
-        }
-        st.append(" tablet_id=" + std::to_string(tablet_id()));
-        // clang-format off
-        LOG(WARNING) << st << '\n' << [this]() { std::string json; 
get_compaction_status(&json); return json; }();
-        // clang-format on
-        return st;
-    }
-    VLOG_DEBUG << "capture consitent versions: " << version_path;
-    return capture_rs_readers_unlocked(version_path, rs_splits);
+    *rs_splits = DORIS_TRY(capture_rs_readers_unlocked(
+            spec_version, CaptureRowsetOps {.skip_missing_versions = 
skip_missing_version}));
+    return Status::OK();
 }
 
 Status CloudTablet::merge_rowsets_schema() {
@@ -469,7 +438,7 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
         }
         _reconstruct_version_tracker_if_necessary();
     }
-    
_tablet_meta->delete_bitmap().remove_stale_delete_bitmap_from_queue(version_to_delete);
+    
_tablet_meta->delete_bitmap()->remove_stale_delete_bitmap_from_queue(version_to_delete);
     recycle_cached_data(expired_rowsets);
     if (config::enable_mow_verbose_log) {
         LOG_INFO("finish delete_expired_stale_rowset for tablet={}", 
tablet_id());
@@ -977,7 +946,7 @@ Status CloudTablet::calc_delete_bitmap_for_compaction(
     std::size_t missed_rows_size = 0;
     calc_compaction_output_rowset_delete_bitmap(
             input_rowsets, rowid_conversion, 0, version.second + 1, 
missed_rows.get(),
-            location_map.get(), tablet_meta()->delete_bitmap(), 
output_rowset_delete_bitmap.get());
+            location_map.get(), *tablet_meta()->delete_bitmap(), 
output_rowset_delete_bitmap.get());
     if (missed_rows) {
         missed_rows_size = missed_rows->size();
         if (!allow_delete_in_cumu_compaction) {
@@ -1013,7 +982,7 @@ Status CloudTablet::calc_delete_bitmap_for_compaction(
 
     calc_compaction_output_rowset_delete_bitmap(
             input_rowsets, rowid_conversion, version.second, UINT64_MAX, 
missed_rows.get(),
-            location_map.get(), tablet_meta()->delete_bitmap(), 
output_rowset_delete_bitmap.get());
+            location_map.get(), *tablet_meta()->delete_bitmap(), 
output_rowset_delete_bitmap.get());
     int64_t t4 = MonotonicMicros();
     if (location_map) {
         RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map));
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index 2dd1d3c4425..dc357eb7249 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -55,9 +55,6 @@ public:
     Status capture_rs_readers(const Version& spec_version, 
std::vector<RowSetSplits>* rs_splits,
                               bool skip_missing_version) override;
 
-    Status capture_consistent_rowsets_unlocked(
-            const Version& spec_version, std::vector<RowsetSharedPtr>* 
rowsets) const override;
-
     size_t tablet_footprint() override {
         return _approximate_data_size.load(std::memory_order_relaxed);
     }
diff --git a/be/src/cloud/cloud_tablet_mgr.cpp 
b/be/src/cloud/cloud_tablet_mgr.cpp
index 25f50f7ef4b..1f155635c65 100644
--- a/be/src/cloud/cloud_tablet_mgr.cpp
+++ b/be/src/cloud/cloud_tablet_mgr.cpp
@@ -25,6 +25,7 @@
 #include "cloud/cloud_storage_engine.h"
 #include "cloud/cloud_tablet.h"
 #include "cloud/config.h"
+#include "common/logging.h"
 #include "common/status.h"
 #include "olap/lru_cache.h"
 #include "runtime/memory/cache_policy.h"
@@ -158,7 +159,7 @@ void set_tablet_access_time_ms(CloudTablet* tablet) {
 Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t 
tablet_id, bool warmup_data,
                                                                 bool 
sync_delete_bitmap,
                                                                 
SyncRowsetStats* sync_stats,
-                                                                bool 
force_use_cache) {
+                                                                bool 
local_only) {
     // LRU value type. `Value`'s lifetime MUST NOT be longer than 
`CloudTabletMgr`
     class Value : public LRUCacheValueBase {
     public:
@@ -176,12 +177,17 @@ Result<std::shared_ptr<CloudTablet>> 
CloudTabletMgr::get_tablet(int64_t tablet_i
     CacheKey key(tablet_id_str);
     auto* handle = _cache->lookup(key);
 
-    if (handle == nullptr && force_use_cache) {
-        return ResultError(
-                Status::InternalError("failed to get cloud tablet from cache 
{}", tablet_id));
-    }
-
     if (handle == nullptr) {
+        if (local_only) {
+            LOG(INFO) << "tablet=" << tablet_id
+                      << "does not exists in local tablet cache, because param 
local_only=true, "
+                         "treat it as an error";
+            return ResultError(Status::InternalError(
+                    "tablet={} does not exists in local tablet cache, because 
param "
+                    "local_only=true, "
+                    "treat it as an error",
+                    tablet_id));
+        }
         if (sync_stats) {
             ++sync_stats->tablet_meta_cache_miss;
         }
@@ -485,7 +491,7 @@ void CloudTabletMgr::get_topn_tablet_delete_bitmap_score(
         auto t = tablet_wk.lock();
         if (!t) return;
         uint64_t delete_bitmap_count =
-                
t.get()->tablet_meta()->delete_bitmap().get_delete_bitmap_count();
+                
t.get()->tablet_meta()->delete_bitmap()->get_delete_bitmap_count();
         total_delete_map_count += delete_bitmap_count;
         if (delete_bitmap_count > *max_delete_bitmap_score) {
             max_delete_bitmap_score_tablet_id = t->tablet_id();
diff --git a/be/src/cloud/cloud_tablet_mgr.h b/be/src/cloud/cloud_tablet_mgr.h
index a1ce6d2b8cf..2e0ea79f424 100644
--- a/be/src/cloud/cloud_tablet_mgr.h
+++ b/be/src/cloud/cloud_tablet_mgr.h
@@ -47,7 +47,7 @@ public:
     Result<std::shared_ptr<CloudTablet>> get_tablet(int64_t tablet_id, bool 
warmup_data = false,
                                                     bool sync_delete_bitmap = 
true,
                                                     SyncRowsetStats* 
sync_stats = nullptr,
-                                                    bool force_use_cache = 
false);
+                                                    bool local_only = false);
 
     void erase_tablet(int64_t tablet_id);
 
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 0f9a60aec88..dc81ca48373 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1515,6 +1515,7 @@ DEFINE_mBool(enable_compaction_pause_on_high_memory, 
"true");
 
 DEFINE_mBool(enable_calc_delete_bitmap_between_segments_concurrently, "false");
 
+DEFINE_mBool(enable_fetch_rowsets_from_peer_replicas, "false");
 // the max length of segments key bounds, in bytes
 // ATTENTION: as long as this conf has ever been enabled, cluster downgrade 
and backup recovery will no longer be supported.
 DEFINE_mInt32(segments_key_bounds_truncation_threshold, "-1");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index af916f1f7bd..a1820494725 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1589,6 +1589,7 @@ DECLARE_mBool(enable_compaction_pause_on_high_memory);
 
 DECLARE_mBool(enable_calc_delete_bitmap_between_segments_concurrently);
 
+DECLARE_mBool(enable_fetch_rowsets_from_peer_replicas);
 // the max length of segments key bounds, in bytes
 // ATTENTION: as long as this conf has ever been enabled, cluster downgrade 
and backup recovery will no longer be supported.
 DECLARE_mInt32(segments_key_bounds_truncation_threshold);
diff --git a/be/src/http/action/delete_bitmap_action.cpp 
b/be/src/http/action/delete_bitmap_action.cpp
index 59783d1c055..5fc4d0f4388 100644
--- a/be/src/http/action/delete_bitmap_action.cpp
+++ b/be/src/http/action/delete_bitmap_action.cpp
@@ -159,7 +159,7 @@ Status 
DeleteBitmapAction::_handle_show_local_delete_bitmap_count(HttpRequest* r
     if (tablet == nullptr) {
         return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
     }
-    auto dm = tablet->tablet_meta()->delete_bitmap().snapshot();
+    auto dm = tablet->tablet_meta()->delete_bitmap()->snapshot();
     _show_delete_bitmap(dm, verbose, json_result);
     return Status::OK();
 }
@@ -183,7 +183,7 @@ Status 
DeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest* req,
         LOG(WARNING) << "failed to sync tablet=" << tablet_id << ", st=" << st;
         return st;
     }
-    auto dm = tablet->tablet_meta()->delete_bitmap().snapshot();
+    auto dm = tablet->tablet_meta()->delete_bitmap()->snapshot();
     _show_delete_bitmap(dm, verbose, json_result);
     return Status::OK();
 }
@@ -210,4 +210,4 @@ void DeleteBitmapAction::handle(HttpRequest* req) {
 }
 
 #include "common/compile_check_end.h"
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 1aa74582d84..56ffc2d100e 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -516,7 +516,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, 
TabletSchema* latest
             if (!s.ok() && !s.is<KEY_ALREADY_EXISTS>()) {
                 return s;
             }
-            if (s.ok() && 
_tablet_meta->delete_bitmap().contains_agg_without_cache(
+            if (s.ok() && 
_tablet_meta->delete_bitmap()->contains_agg_without_cache(
                                   {loc.rowset_id, loc.segment_id, version}, 
loc.row_id)) {
                 // if has sequence col, we continue to compare the sequence_id 
of
                 // all rowsets, util we find an existing key.
@@ -1114,37 +1114,6 @@ void BaseTablet::_rowset_ids_difference(const 
RowsetIdUnorderedSet& cur,
     }
 }
 
-Status BaseTablet::_capture_consistent_rowsets_unlocked(
-        const std::vector<Version>& version_path, 
std::vector<RowsetSharedPtr>* rowsets) const {
-    DCHECK(rowsets != nullptr);
-    rowsets->reserve(version_path.size());
-    for (const auto& version : version_path) {
-        bool is_find = false;
-        do {
-            auto it = _rs_version_map.find(version);
-            if (it != _rs_version_map.end()) {
-                is_find = true;
-                rowsets->push_back(it->second);
-                break;
-            }
-
-            auto it_expired = _stale_rs_version_map.find(version);
-            if (it_expired != _stale_rs_version_map.end()) {
-                is_find = true;
-                rowsets->push_back(it_expired->second);
-                break;
-            }
-        } while (false);
-
-        if (!is_find) {
-            return Status::Error<CAPTURE_ROWSET_ERROR>(
-                    "fail to find Rowset for version. tablet={}, version={}", 
tablet_id(),
-                    version.to_string());
-        }
-    }
-    return Status::OK();
-}
-
 Status BaseTablet::check_delete_bitmap_correctness(DeleteBitmapPtr 
delete_bitmap,
                                                    int64_t max_version, 
int64_t txn_id,
                                                    const RowsetIdUnorderedSet& 
rowset_ids,
@@ -1564,7 +1533,7 @@ Status BaseTablet::update_delete_bitmap_without_lock(
         delete_bitmap->remove_sentinel_marks();
     }
     for (auto& iter : delete_bitmap->delete_bitmap) {
-        self->_tablet_meta->delete_bitmap().merge(
+        self->_tablet_meta->delete_bitmap()->merge(
                 {std::get<0>(iter.first), std::get<1>(iter.first), 
cur_version}, iter.second);
     }
 
@@ -1723,7 +1692,7 @@ void BaseTablet::get_base_rowset_delete_bitmap_count(
             }
             base_found = true;
             uint64_t base_rowset_delete_bitmap_count =
-                    this->tablet_meta()->delete_bitmap().get_count_with_range(
+                    this->tablet_meta()->delete_bitmap()->get_count_with_range(
                             {rowset->rowset_id(), 0, 0},
                             {rowset->rowset_id(), UINT32_MAX, UINT64_MAX});
             if (base_rowset_delete_bitmap_count > 
*max_base_rowset_delete_bitmap_score) {
@@ -1737,6 +1706,16 @@ void BaseTablet::get_base_rowset_delete_bitmap_count(
     }
 }
 
+void TabletReadSource::fill_delete_predicates() {
+    DCHECK_EQ(delete_predicates.size(), 0);
+    auto delete_pred_view =
+            rs_splits | std::views::transform([](auto&& split) {
+                return split.rs_reader->rowset()->rowset_meta();
+            }) |
+            std::views::filter([](const auto& rs_meta) { return 
rs_meta->has_delete_predicate(); });
+    delete_predicates = {delete_pred_view.begin(), delete_pred_view.end()};
+}
+
 int32_t BaseTablet::max_version_config() {
     int32_t max_version = tablet_meta()->compaction_policy() == 
CUMULATIVE_TIME_SERIES_POLICY
                                   ? config::time_series_max_tablet_version_num
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 9dd69d0bd9a..6c797e0478b 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -42,6 +42,9 @@ class SegmentCacheHandle;
 class RowIdConversion;
 struct PartialUpdateInfo;
 class PartialUpdateReadPlan;
+struct CaptureRowsetOps;
+struct CaptureRowsetResult;
+struct TabletReadSource;
 
 struct TabletWithVersion {
     BaseTabletSPtr tablet;
@@ -107,9 +110,6 @@ public:
     virtual Result<std::unique_ptr<RowsetWriter>> 
create_rowset_writer(RowsetWriterContext& context,
                                                                        bool 
vertical) = 0;
 
-    virtual Status capture_consistent_rowsets_unlocked(
-            const Version& spec_version, std::vector<RowsetSharedPtr>* 
rowsets) const = 0;
-
     virtual Status capture_rs_readers(const Version& spec_version,
                                       std::vector<RowSetSplits>* rs_splits,
                                       bool skip_missing_version) = 0;
@@ -309,6 +309,20 @@ public:
         return Status::OK();
     }
 
+    [[nodiscard]] Result<CaptureRowsetResult> 
capture_consistent_rowsets_unlocked(
+            const Version& version_range, const CaptureRowsetOps& options) 
const;
+
+    [[nodiscard]] Result<std::vector<Version>> 
capture_consistent_versions_unlocked(
+            const Version& version_range, const CaptureRowsetOps& options) 
const;
+
+    [[nodiscard]] Result<std::vector<RowSetSplits>> 
capture_rs_readers_unlocked(
+            const Version& version_range, const CaptureRowsetOps& options) 
const;
+
+    [[nodiscard]] Result<TabletReadSource> capture_read_source(const Version& 
version_range,
+                                                               const 
CaptureRowsetOps& options);
+
+    Result<CaptureRowsetResult> _remote_capture_rowsets(const Version& 
version_range) const;
+
 protected:
     // Find the missed versions until the spec_version.
     //
@@ -326,9 +340,6 @@ protected:
                                        const RowsetIdUnorderedSet& pre,
                                        RowsetIdUnorderedSet* to_add, 
RowsetIdUnorderedSet* to_del);
 
-    Status _capture_consistent_rowsets_unlocked(const std::vector<Version>& 
version_path,
-                                                std::vector<RowsetSharedPtr>* 
rowsets) const;
-
     Status sort_block(vectorized::Block& in_block, vectorized::Block& 
output_block);
 
     mutable std::shared_mutex _meta_lock;
@@ -369,4 +380,24 @@ public:
     Status last_compaction_status = Status::OK();
 };
 
+struct CaptureRowsetOps {
+    bool skip_missing_versions = false;
+    bool quiet = false;
+    bool include_stale_rowsets = true;
+    bool enable_fetch_rowsets_from_peers = false;
+};
+
+struct CaptureRowsetResult {
+    std::vector<RowsetSharedPtr> rowsets;
+    std::shared_ptr<DeleteBitmap> delete_bitmap;
+};
+
+struct TabletReadSource {
+    std::vector<RowSetSplits> rs_splits;
+    std::vector<RowsetMetaSharedPtr> delete_predicates;
+    std::shared_ptr<DeleteBitmap> delete_bitmap;
+    // Fill delete predicates with `rs_splits`
+    void fill_delete_predicates();
+};
+
 } /* namespace doris */
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 7e312c5847f..77af2b30fe1 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -1069,7 +1069,7 @@ Status CompactionMixin::modify_rowsets() {
         std::size_t missed_rows_size = 0;
         tablet()->calc_compaction_output_rowset_delete_bitmap(
                 _input_rowsets, *_rowid_conversion, 0, version.second + 1, 
missed_rows.get(),
-                location_map.get(), _tablet->tablet_meta()->delete_bitmap(),
+                location_map.get(), *_tablet->tablet_meta()->delete_bitmap(),
                 &output_rowset_delete_bitmap);
         if (missed_rows) {
             missed_rows_size = missed_rows->size();
@@ -1112,7 +1112,7 @@ Status CompactionMixin::modify_rowsets() {
                     ss << ", debug info: ";
                     DeleteBitmap subset_map(_tablet->tablet_id());
                     for (auto rs : _input_rowsets) {
-                        _tablet->tablet_meta()->delete_bitmap().subset(
+                        _tablet->tablet_meta()->delete_bitmap()->subset(
                                 {rs->rowset_id(), 0, 0},
                                 {rs->rowset_id(), rs->num_segments(), 
version.second + 1},
                                 &subset_map);
@@ -1187,7 +1187,7 @@ Status CompactionMixin::modify_rowsets() {
             // incremental data.
             tablet()->calc_compaction_output_rowset_delete_bitmap(
                     _input_rowsets, *_rowid_conversion, version.second, 
UINT64_MAX,
-                    missed_rows.get(), location_map.get(), 
_tablet->tablet_meta()->delete_bitmap(),
+                    missed_rows.get(), location_map.get(), 
*_tablet->tablet_meta()->delete_bitmap(),
                     &output_rowset_delete_bitmap);
 
             if (missed_rows) {
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index 5cb2a6105d9..6389f1ad133 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -635,14 +635,14 @@ Status DataDir::load() {
             }
             ++dbm_cnt;
             auto seg_id = delete_bitmap_pb.segment_ids(i);
-            auto iter = 
tablet->tablet_meta()->delete_bitmap().delete_bitmap.find(
+            auto iter = 
tablet->tablet_meta()->delete_bitmap()->delete_bitmap.find(
                     {rst_id, seg_id, version});
             // This version of delete bitmap already exists
-            if (iter != 
tablet->tablet_meta()->delete_bitmap().delete_bitmap.end()) {
+            if (iter != 
tablet->tablet_meta()->delete_bitmap()->delete_bitmap.end()) {
                 continue;
             }
             auto bitmap = delete_bitmap_pb.segment_delete_bitmaps(i).data();
-            tablet->tablet_meta()->delete_bitmap().delete_bitmap[{rst_id, 
seg_id, version}] =
+            tablet->tablet_meta()->delete_bitmap()->delete_bitmap[{rst_id, 
seg_id, version}] =
                     roaring::Roaring::read(bitmap);
         }
         return true;
diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp
index 06641e7a22c..7d3c497f2a2 100644
--- a/be/src/olap/full_compaction.cpp
+++ b/be/src/olap/full_compaction.cpp
@@ -27,6 +27,7 @@
 
 #include "common/config.h"
 #include "common/status.h"
+#include "olap/base_tablet.h"
 #include "olap/compaction.h"
 #include "olap/cumulative_compaction_policy.h"
 #include "olap/olap_common.h"
@@ -143,8 +144,9 @@ Status FullCompaction::modify_rowsets() {
         int64_t max_version = tablet()->max_version().second;
         DCHECK(max_version >= _output_rowset->version().second);
         if (max_version > _output_rowset->version().second) {
-            RETURN_IF_ERROR(_tablet->capture_consistent_rowsets_unlocked(
-                    {_output_rowset->version().second + 1, max_version}, 
&tmp_rowsets));
+            auto ret = DORIS_TRY(_tablet->capture_consistent_rowsets_unlocked(
+                    {_output_rowset->version().second + 1, max_version}, 
CaptureRowsetOps {}));
+            tmp_rowsets = std::move(ret.rowsets);
         }
 
         for (const auto& it : tmp_rowsets) {
@@ -219,7 +221,7 @@ Status 
FullCompaction::_full_compaction_calc_delete_bitmap(const RowsetSharedPtr
             [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
     for (const auto& [k, v] : delete_bitmap->delete_bitmap) {
         if (std::get<1>(k) != DeleteBitmap::INVALID_SEGMENT_ID) {
-            _tablet->tablet_meta()->delete_bitmap().merge(
+            _tablet->tablet_meta()->delete_bitmap()->merge(
                     {std::get<0>(k), std::get<1>(k), cur_version}, v);
         }
     }
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index 01f36d77dc2..c3f59872f62 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -68,7 +68,7 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, 
ReaderType reader_type,
     reader_params.tablet = tablet;
     reader_params.reader_type = reader_type;
 
-    TabletReader::ReadSource read_source;
+    TabletReadSource read_source;
     read_source.rs_splits.reserve(src_rowset_readers.size());
     for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) {
         read_source.rs_splits.emplace_back(rs_reader);
@@ -87,7 +87,7 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, 
ReaderType reader_type,
     }
     reader_params.tablet_schema = merge_tablet_schema;
     if (!tablet->tablet_schema()->cluster_key_idxes().empty()) {
-        reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap();
+        reader_params.delete_bitmap = tablet->tablet_meta()->delete_bitmap();
     }
 
     if (stats_output && stats_output->rowid_conversion) {
@@ -249,7 +249,7 @@ Status Merger::vertical_compact_one_group(
     reader_params.tablet = tablet;
     reader_params.reader_type = reader_type;
 
-    TabletReader::ReadSource read_source;
+    TabletReadSource read_source;
     read_source.rs_splits.reserve(src_rowset_readers.size());
     for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) {
         read_source.rs_splits.emplace_back(rs_reader);
@@ -268,7 +268,7 @@ Status Merger::vertical_compact_one_group(
 
     reader_params.tablet_schema = merge_tablet_schema;
     if (!tablet->tablet_schema()->cluster_key_idxes().empty()) {
-        reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap();
+        reader_params.delete_bitmap = tablet->tablet_meta()->delete_bitmap();
     }
 
     if (is_key && stats_output && stats_output->rowid_conversion) {
diff --git a/be/src/olap/parallel_scanner_builder.cpp 
b/be/src/olap/parallel_scanner_builder.cpp
index 103e6341d7c..f4cd210bbd2 100644
--- a/be/src/olap/parallel_scanner_builder.cpp
+++ b/be/src/olap/parallel_scanner_builder.cpp
@@ -57,7 +57,7 @@ Status 
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
         // `rs_splits` in `entire read source` will be devided into several 
partitial read sources
         // to build several parallel scanners, based on segment rows number. 
All the partitial read sources
         // share the same delete predicates from their corresponding entire 
read source.
-        TabletReader::ReadSource partitial_read_source;
+        TabletReadSource partitial_read_source;
         int64_t rows_collected = 0;
         for (auto& rs_split : entire_read_source.rs_splits) {
             auto reader = rs_split.rs_reader;
@@ -106,10 +106,11 @@ Status 
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
 
                         
partitial_read_source.rs_splits.emplace_back(std::move(split));
 
-                        scanners.emplace_back(
-                                _build_scanner(tablet, version, _key_ranges,
-                                               
{std::move(partitial_read_source.rs_splits),
-                                                
entire_read_source.delete_predicates}));
+                        scanners.emplace_back(_build_scanner(
+                                tablet, version, _key_ranges,
+                                {.rs_splits = 
std::move(partitial_read_source.rs_splits),
+                                 .delete_predicates = 
entire_read_source.delete_predicates,
+                                 .delete_bitmap = 
entire_read_source.delete_bitmap}));
 
                         partitial_read_source = {};
                         split = RowSetSplits(reader->clone());
@@ -150,9 +151,11 @@ Status 
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
                           split.segment_offsets.second - 
split.segment_offsets.first);
             }
 #endif
-            scanners.emplace_back(_build_scanner(tablet, version, _key_ranges,
-                                                 
{std::move(partitial_read_source.rs_splits),
-                                                  
entire_read_source.delete_predicates}));
+            scanners.emplace_back(
+                    _build_scanner(tablet, version, _key_ranges,
+                                   {.rs_splits = 
std::move(partitial_read_source.rs_splits),
+                                    .delete_predicates = 
entire_read_source.delete_predicates,
+                                    .delete_bitmap = 
entire_read_source.delete_bitmap}));
         }
     }
 
@@ -199,7 +202,7 @@ Status ParallelScannerBuilder::_load() {
 
 std::shared_ptr<NewOlapScanner> ParallelScannerBuilder::_build_scanner(
         BaseTabletSPtr tablet, int64_t version, const 
std::vector<OlapScanRange*>& key_ranges,
-        TabletReader::ReadSource&& read_source) {
+        TabletReadSource&& read_source) {
     NewOlapScanner::Params params {_state,  _scanner_profile.get(), 
key_ranges, std::move(tablet),
                                    version, std::move(read_source), _limit,    
 _is_preaggregation};
     return NewOlapScanner::create_shared(_parent, std::move(params));
diff --git a/be/src/olap/parallel_scanner_builder.h 
b/be/src/olap/parallel_scanner_builder.h
index 1f371e3129a..a746ff5ba5d 100644
--- a/be/src/olap/parallel_scanner_builder.h
+++ b/be/src/olap/parallel_scanner_builder.h
@@ -44,7 +44,7 @@ class ParallelScannerBuilder {
 public:
     ParallelScannerBuilder(pipeline::OlapScanLocalState* parent,
                            const std::vector<TabletWithVersion>& tablets,
-                           std::vector<TabletReader::ReadSource>& read_sources,
+                           std::vector<TabletReadSource>& read_sources,
                            const std::shared_ptr<RuntimeProfile>& profile,
                            const std::vector<OlapScanRange*>& key_ranges, 
RuntimeState* state,
                            int64_t limit, bool is_dup_mow_key, bool 
is_preaggregation)
@@ -71,7 +71,7 @@ private:
 
     std::shared_ptr<vectorized::NewOlapScanner> _build_scanner(
             BaseTabletSPtr tablet, int64_t version, const 
std::vector<OlapScanRange*>& key_ranges,
-            TabletReader::ReadSource&& read_source);
+            TabletReadSource&& read_source);
 
     pipeline::OlapScanLocalState* _parent;
 
@@ -94,8 +94,8 @@ private:
     bool _is_preaggregation;
     std::vector<TabletWithVersion> _tablets;
     std::vector<OlapScanRange*> _key_ranges;
-    std::unordered_map<int64_t, TabletReader::ReadSource> _all_read_sources;
-    std::vector<TabletReader::ReadSource>& _read_sources;
+    std::unordered_map<int64_t, TabletReadSource> _all_read_sources;
+    std::vector<TabletReadSource>& _read_sources;
 };
 
 } // namespace doris
diff --git a/be/src/olap/rowset/rowset_reader_context.h 
b/be/src/olap/rowset/rowset_reader_context.h
index fd3b4fed56f..2faed632349 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -74,7 +74,7 @@ struct RowsetReaderContext {
     uint64_t* merged_rows = nullptr;
     // for unique key merge on write
     bool enable_unique_key_merge_on_write = false;
-    const DeleteBitmap* delete_bitmap = nullptr;
+    std::shared_ptr<DeleteBitmap> delete_bitmap = nullptr;
     bool record_rowids = false;
     RowIdConversion* rowid_conversion;
     bool is_vertical_compaction = false;
diff --git a/be/src/olap/rowset_version_mgr.cpp 
b/be/src/olap/rowset_version_mgr.cpp
new file mode 100644
index 00000000000..27df2ede4b2
--- /dev/null
+++ b/be/src/olap/rowset_version_mgr.cpp
@@ -0,0 +1,452 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <brpc/controller.h>
+#include <bthread/bthread.h>
+#include <bthread/countdown_event.h>
+#include <bthread/mutex.h>
+#include <bthread/types.h>
+#include <bvar/latency_recorder.h>
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/HeartbeatService_types.h>
+#include <gen_cpp/Types_types.h>
+#include <gen_cpp/internal_service.pb.h>
+#include <gen_cpp/olap_file.pb.h>
+#include <glog/logging.h>
+
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <ranges>
+#include <sstream>
+#include <utility>
+
+#include "cloud/config.h"
+#include "common/status.h"
+#include "cpp/sync_point.h"
+#include "olap/base_tablet.h"
+#include "olap/olap_common.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_reader.h"
+#include "runtime/client_cache.h"
+#include "service/backend_options.h"
+#include "service/internal_service.h"
+#include "util/brpc_client_cache.h"
+#include "util/debug_points.h"
+#include "util/thrift_rpc_helper.h"
+#include "util/time.h"
+
+namespace doris {
+
+using namespace ErrorCode;
+using namespace std::ranges;
+
+static bvar::LatencyRecorder 
g_remote_fetch_tablet_rowsets_single_request_latency(
+        "remote_fetch_rowsets_single_rpc");
+static bvar::LatencyRecorder 
g_remote_fetch_tablet_rowsets_latency("remote_fetch_rowsets");
+
+[[nodiscard]] Result<std::vector<Version>> 
BaseTablet::capture_consistent_versions_unlocked(
+        const Version& version_range, const CaptureRowsetOps& options) const {
+    std::vector<Version> version_path;
+    auto st =
+            
_timestamped_version_tracker.capture_consistent_versions(version_range, 
&version_path);
+    if (!st && !options.quiet) {
+        auto missed_versions = 
get_missed_versions_unlocked(version_range.second);
+        if (missed_versions.empty()) {
+            LOG(WARNING) << fmt::format(
+                    "version already has been merged. version_range={}, 
max_version={}, "
+                    "tablet_id={}",
+                    version_range.to_string(), 
_tablet_meta->max_version().second, tablet_id());
+            return ResultError(Status::Error<VERSION_ALREADY_MERGED>(
+                    "missed versions is empty, version_range={}, 
max_version={}, tablet_id={}",
+                    version_range.to_string(), 
_tablet_meta->max_version().second, tablet_id()));
+        }
+        LOG(WARNING) << fmt::format("missed version for version_range={}, 
tablet_id={}, st={}",
+                                    version_range.to_string(), tablet_id(), 
st);
+        _print_missed_versions(missed_versions);
+        if (!options.skip_missing_versions) {
+            return ResultError(std::move(st));
+        }
+        LOG(WARNING) << "force skipping missing version for tablet:" << 
tablet_id();
+    }
+    DBUG_EXECUTE_IF("Tablet::capture_consistent_versions.inject_failure", {
+        auto tablet_id = dp->param<int64>("tablet_id", -1);
+        auto skip_by_option = dp->param<bool>("skip_by_option", false);
+        if (skip_by_option && !options.enable_fetch_rowsets_from_peers) {
+            return version_path;
+        }
+        if (tablet_id != -1 && (tablet_id == _tablet_meta->tablet_id()) || 
tablet_id == -2) {
+            return ResultError(Status::Error<VERSION_ALREADY_MERGED>("version 
already merged"));
+        }
+    });
+    return version_path;
+}
+
+[[nodiscard]] Result<CaptureRowsetResult> 
BaseTablet::capture_consistent_rowsets_unlocked(
+        const Version& version_range, const CaptureRowsetOps& options) const {
+    CaptureRowsetResult result;
+    auto& rowsets = result.rowsets;
+    auto maybe_versions = capture_consistent_versions_unlocked(version_range, 
options);
+    if (maybe_versions) {
+        const auto& version_paths = maybe_versions.value();
+        rowsets.reserve(version_paths.size());
+
+        auto rowset_for_version = [&](const Version& version,
+                                      bool include_stale) -> 
Result<RowsetSharedPtr> {
+            if (auto it = _rs_version_map.find(version); it != 
_rs_version_map.end()) {
+                return it->second;
+            } else {
+                VLOG_NOTICE << "fail to find Rowset in rs_version for version. 
tablet="
+                            << tablet_id() << ", version='" << version.first 
<< "-"
+                            << version.second;
+            }
+            if (include_stale) {
+                if (auto it = _stale_rs_version_map.find(version);
+                    it != _stale_rs_version_map.end()) {
+                    return it->second;
+                } else {
+                    LOG(WARNING) << fmt::format(
+                            "fail to find Rowset in stale_rs_version for 
version. tablet={}, "
+                            "version={}-{}",
+                            tablet_id(), version.first, version.second);
+                }
+            }
+            return ResultError(Status::Error<CAPTURE_ROWSET_ERROR>(
+                    "failed to find rowset for version={}", 
version.to_string()));
+        };
+
+        for (const auto& version : version_paths) {
+            auto ret = rowset_for_version(version, 
options.include_stale_rowsets);
+            if (!ret) {
+                return ResultError(std::move(ret.error()));
+            }
+
+            rowsets.push_back(std::move(ret.value()));
+        }
+        if (keys_type() == KeysType::UNIQUE_KEYS && 
enable_unique_key_merge_on_write()) {
+            result.delete_bitmap = _tablet_meta->delete_bitmap();
+        }
+        return result;
+    }
+
+    if (!config::is_cloud_mode() || !options.enable_fetch_rowsets_from_peers) {
+        return ResultError(std::move(maybe_versions.error()));
+    }
+    auto ret = _remote_capture_rowsets(version_range);
+    if (!ret) {
+        auto st = Status::Error<VERSION_ALREADY_MERGED>(
+                "version already merged, meet error during remote capturing 
rowsets, "
+                "error={}, version_range={}",
+                ret.error().to_string(), version_range.to_string());
+        return ResultError(std::move(st));
+    }
+    return ret;
+}
+
+[[nodiscard]] Result<std::vector<RowSetSplits>> 
BaseTablet::capture_rs_readers_unlocked(
+        const Version& version_range, const CaptureRowsetOps& options) const {
+    auto maybe_rs_list = capture_consistent_rowsets_unlocked(version_range, 
options);
+    if (!maybe_rs_list) {
+        return ResultError(std::move(maybe_rs_list.error()));
+    }
+    const auto& rs_list = maybe_rs_list.value().rowsets;
+    std::vector<RowSetSplits> rs_splits;
+    rs_splits.reserve(rs_list.size());
+    for (const auto& rs : rs_list) {
+        RowsetReaderSharedPtr rs_reader;
+        auto st = rs->create_reader(&rs_reader);
+        if (!st) {
+            return ResultError(Status::Error<CAPTURE_ROWSET_READER_ERROR>(
+                    "failed to create reader for rowset={}, reason={}", 
rs->rowset_id().to_string(),
+                    st.to_string()));
+        }
+        rs_splits.emplace_back(std::move(rs_reader));
+    }
+    return rs_splits;
+}
+
+[[nodiscard]] Result<TabletReadSource> BaseTablet::capture_read_source(
+        const Version& version_range, const CaptureRowsetOps& options) {
+    std::shared_lock rdlock(get_header_lock());
+    auto maybe_result = capture_consistent_rowsets_unlocked(version_range, 
options);
+    if (!maybe_result) {
+        return ResultError(std::move(maybe_result.error()));
+    }
+    auto rowsets_result = std::move(maybe_result.value());
+    TabletReadSource read_source;
+    read_source.delete_bitmap = std::move(rowsets_result.delete_bitmap);
+    const auto& rowsets = rowsets_result.rowsets;
+    read_source.rs_splits.reserve(rowsets.size());
+    for (const auto& rs : rowsets) {
+        RowsetReaderSharedPtr rs_reader;
+        auto st = rs->create_reader(&rs_reader);
+        if (!st) {
+            return ResultError(Status::Error<CAPTURE_ROWSET_READER_ERROR>(
+                    "failed to create reader for rowset={}, reason={}", 
rs->rowset_id().to_string(),
+                    st.to_string()));
+        }
+        read_source.rs_splits.emplace_back(std::move(rs_reader));
+    }
+    return read_source;
+}
+
+template <typename Fn, typename... Args>
+bool call_bthread(bthread_t& th, const bthread_attr_t* attr, Fn&& fn, 
Args&&... args) {
+    auto p_wrap_fn = new auto([=] { fn(args...); });
+    auto call_back = [](void* ar) -> void* {
+        auto f = reinterpret_cast<decltype(p_wrap_fn)>(ar);
+        (*f)();
+        delete f;
+        return nullptr;
+    };
+    return bthread_start_background(&th, attr, call_back, p_wrap_fn) == 0;
+}
+
+struct GetRowsetsCntl : std::enable_shared_from_this<GetRowsetsCntl> {
+    struct RemoteGetRowsetResult {
+        std::vector<RowsetMetaSharedPtr> rowsets;
+        std::unique_ptr<DeleteBitmap> delete_bitmap;
+    };
+
+    Status start_req_bg() {
+        task_cnt = req_addrs.size();
+        for (const auto& [ip, port] : req_addrs) {
+            bthread_t tid;
+            bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
+
+            bool succ = call_bthread(tid, &attr, [self = shared_from_this(), 
&ip, port]() {
+                LOG(INFO) << "start to get tablet rowsets from peer BE, ip=" 
<< ip;
+                Defer defer_log {[&ip, port]() {
+                    LOG(INFO) << "finish to get rowsets from peer BE, ip=" << 
ip
+                              << ", port=" << port;
+                }};
+
+                PGetTabletRowsetsRequest req;
+                req.set_tablet_id(self->tablet_id);
+                req.set_version_start(self->version_range.first);
+                req.set_version_end(self->version_range.second);
+                if (self->delete_bitmap_keys.has_value()) {
+                    
req.mutable_delete_bitmap_keys()->CopyFrom(self->delete_bitmap_keys.value());
+                }
+                brpc::Controller cntl;
+                cntl.set_timeout_ms(60000);
+                cntl.set_max_retry(3);
+                PGetTabletRowsetsResponse response;
+                auto start_tm_us = MonotonicMicros();
+#ifndef BE_TEST
+                std::shared_ptr<PBackendService_Stub> stub =
+                        
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(ip, port);
+                if (stub == nullptr) {
+                    self->result = ResultError(Status::InternalError(
+                            "failed to fetch get_tablet_rowsets stub, ip={}, 
port={}", ip, port));
+                    return;
+                }
+                stub->get_tablet_rowsets(&cntl, &req, &response, nullptr);
+#else
+                TEST_SYNC_POINT_CALLBACK("get_tablet_rowsets", &response);
+#endif
+                g_remote_fetch_tablet_rowsets_single_request_latency
+                        << MonotonicMicros() - start_tm_us;
+
+                std::unique_lock l(self->butex);
+                if (self->done) {
+                    return;
+                }
+                --self->task_cnt;
+                auto resp_st = Status::create(response.status());
+                DBUG_EXECUTE_IF("GetRowsetCntl::start_req_bg.inject_failure",
+                                { resp_st = Status::InternalError("inject 
error"); });
+                if (cntl.Failed() || !resp_st) {
+                    if (self->task_cnt != 0) {
+                        return;
+                    }
+                    std::stringstream reason;
+                    reason << "failed to get rowsets from all replicas, 
tablet_id="
+                           << self->tablet_id;
+                    if (cntl.Failed()) {
+                        reason << ", reason=[" << cntl.ErrorCode() << "] " << 
cntl.ErrorText();
+                    } else {
+                        reason << ", reason=" << resp_st.to_string();
+                    }
+                    self->result = 
ResultError(Status::InternalError(reason.str()));
+                    self->done = true;
+                    self->event.signal();
+                    return;
+                }
+
+                Defer done_cb {[&]() {
+                    self->done = true;
+                    self->event.signal();
+                }};
+                std::vector<RowsetMetaSharedPtr> rs_metas;
+                for (auto&& rs_pb : response.rowsets()) {
+                    auto rs_meta = std::make_shared<RowsetMeta>();
+                    if (!rs_meta->init_from_pb(rs_pb)) {
+                        self->result =
+                                ResultError(Status::InternalError("failed to 
init rowset from pb"));
+                        return;
+                    }
+                    rs_metas.push_back(std::move(rs_meta));
+                }
+                CaptureRowsetResult result;
+                self->result->rowsets = std::move(rs_metas);
+
+                if (response.has_delete_bitmap()) {
+                    self->result->delete_bitmap = 
std::make_unique<DeleteBitmap>(
+                            DeleteBitmap::from_pb(response.delete_bitmap(), 
self->tablet_id));
+                }
+            });
+
+            if (!succ) {
+                return Status::InternalError(
+                        "failed to create bthread when request rowsets for 
tablet={}", tablet_id);
+            }
+            if (bthread_join(tid, nullptr) != 0) {
+                return Status::InternalError("failed to join bthread tid={}", 
tid);
+            }
+        }
+        return Status::OK();
+    }
+
+    Result<RemoteGetRowsetResult> wait_for_ret() {
+        event.wait();
+        return std::move(result);
+    }
+
+    int64_t tablet_id;
+    std::vector<std::pair<std::string, int32_t>> req_addrs;
+    Version version_range;
+    std::optional<DeleteBitmapPB> delete_bitmap_keys = std::nullopt;
+
+private:
+    size_t task_cnt;
+
+    bthread::Mutex butex;
+    bthread::CountdownEvent event {1};
+    bool done = false;
+
+    Result<RemoteGetRowsetResult> result;
+};
+
+Result<std::vector<std::pair<std::string, int32_t>>> 
get_peer_replicas_addresses(
+        const int64_t tablet_id) {
+    auto* cluster_info = ExecEnv::GetInstance()->cluster_info();
+    DCHECK_NE(cluster_info, nullptr);
+    auto master_addr = cluster_info->master_fe_addr;
+    TGetTabletReplicaInfosRequest req;
+    req.tablet_ids.push_back(tablet_id);
+    TGetTabletReplicaInfosResult resp;
+    auto st = ThriftRpcHelper::rpc<FrontendServiceClient>(
+            master_addr.hostname, master_addr.port,
+            [&](FrontendServiceConnection& client) { 
client->getTabletReplicaInfos(resp, req); });
+    if (!st) {
+        return ResultError(Status::InternalError(
+                "failed to get tablet replica infos, rpc error={}, 
tablet_id={}", st.to_string(),
+                tablet_id));
+    }
+
+    auto it = resp.tablet_replica_infos.find(tablet_id);
+    if (it == resp.tablet_replica_infos.end()) {
+        return ResultError(Status::InternalError("replicas not found, 
tablet_id={}", tablet_id));
+    }
+    auto replicas = it->second;
+    auto local_host = BackendOptions::get_localhost();
+    bool include_local_host = false;
+    DBUG_EXECUTE_IF("get_peer_replicas_address.enable_local_host", { 
include_local_host = true; });
+    auto ret_view =
+            replicas | std::views::filter([&local_host, 
include_local_host](const auto& replica) {
+                return local_host.find(replica.host) == std::string::npos || 
include_local_host;
+            }) |
+            std::views::transform([](auto& replica) {
+                return std::make_pair(std::move(replica.host), 
replica.brpc_port);
+            });
+    return std::vector(ret_view.begin(), ret_view.end());
+}
+
+Result<CaptureRowsetResult> BaseTablet::_remote_capture_rowsets(
+        const Version& version_range) const {
+    auto start_tm_us = MonotonicMicros();
+    Defer defer {
+            [&]() { g_remote_fetch_tablet_rowsets_latency << MonotonicMicros() 
- start_tm_us; }};
+#ifndef BE_TEST
+    auto maybe_be_addresses = get_peer_replicas_addresses(tablet_id());
+#else
+    Result<std::vector<std::pair<std::string, int32_t>>> maybe_be_addresses;
+    TEST_SYNC_POINT_CALLBACK("get_peer_replicas_addresses", 
&maybe_be_addresses);
+#endif
+    
DBUG_EXECUTE_IF("Tablet::_remote_get_rowsets_meta.inject_replica_address_fail",
+                    { maybe_be_addresses = 
ResultError(Status::InternalError("inject failure")); });
+    if (!maybe_be_addresses) {
+        return ResultError(std::move(maybe_be_addresses.error()));
+    }
+    auto be_addresses = std::move(maybe_be_addresses.value());
+    if (be_addresses.empty()) {
+        LOG(WARNING) << "no peers replica for tablet=" << tablet_id();
+        return ResultError(Status::InternalError("no replicas for tablet={}", 
tablet_id()));
+    }
+
+    auto cntl = std::make_shared<GetRowsetsCntl>();
+    cntl->tablet_id = tablet_id();
+    cntl->req_addrs = std::move(be_addresses);
+    cntl->version_range = version_range;
+    bool is_mow = keys_type() == KeysType::UNIQUE_KEYS && 
enable_unique_key_merge_on_write();
+    CaptureRowsetResult result;
+    if (is_mow) {
+        result.delete_bitmap =
+                
std::make_unique<DeleteBitmap>(_tablet_meta->delete_bitmap()->snapshot());
+        DeleteBitmapPB delete_bitmap_keys;
+        auto keyset = result.delete_bitmap->delete_bitmap |
+                      std::views::transform([](const auto& kv) { return 
kv.first; });
+        for (const auto& key : keyset) {
+            const auto& [rs_id, seg_id, version] = key;
+            delete_bitmap_keys.mutable_rowset_ids()->Add(rs_id.to_string());
+            delete_bitmap_keys.mutable_segment_ids()->Add(seg_id);
+            delete_bitmap_keys.mutable_versions()->Add(version);
+        }
+        cntl->delete_bitmap_keys = std::move(delete_bitmap_keys);
+    }
+
+    RETURN_IF_ERROR_RESULT(cntl->start_req_bg());
+    auto maybe_meta = cntl->wait_for_ret();
+    if (!maybe_meta) {
+        auto err = Status::InternalError(
+                "tried to get rowsets from peer replicas and failed, "
+                "reason={}",
+                maybe_meta.error());
+        return ResultError(std::move(err));
+    }
+
+    auto& remote_meta = maybe_meta.value();
+    const auto& rs_metas = remote_meta.rowsets;
+    for (const auto& rs_meta : rs_metas) {
+        RowsetSharedPtr rs;
+        auto st = RowsetFactory::create_rowset(_tablet_meta->tablet_schema(), 
{}, rs_meta, &rs);
+        if (!st) {
+            return ResultError(std::move(st));
+        }
+        result.rowsets.push_back(std::move(rs));
+    }
+    if (is_mow) {
+        DCHECK_NE(result.delete_bitmap, nullptr);
+        result.delete_bitmap->merge(*remote_meta.delete_bitmap);
+    }
+    return result;
+}
+
+} // namespace doris
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index ae285a94aff..7fd6e96730e 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -496,12 +496,12 @@ Status LinkedSchemaChange::process(RowsetReaderSharedPtr 
rowset_reader, RowsetWr
     // copy delete bitmap to new tablet.
     if (new_tablet->keys_type() == UNIQUE_KEYS && 
new_tablet->enable_unique_key_merge_on_write()) {
         DeleteBitmap origin_delete_bitmap(base_tablet->tablet_id());
-        base_tablet->tablet_meta()->delete_bitmap().subset(
+        base_tablet->tablet_meta()->delete_bitmap()->subset(
                 {rowset_reader->rowset()->rowset_id(), 0, 0},
                 {rowset_reader->rowset()->rowset_id(), UINT32_MAX, INT64_MAX},
                 &origin_delete_bitmap);
         for (auto& iter : origin_delete_bitmap.delete_bitmap) {
-            int ret = new_tablet->tablet_meta()->delete_bitmap().set(
+            int ret = new_tablet->tablet_meta()->delete_bitmap()->set(
                     {rowset_writer->rowset_id(), std::get<1>(iter.first), 
std::get<2>(iter.first)},
                     iter.second);
             DCHECK(ret == 1);
@@ -958,7 +958,7 @@ Status SchemaChangeJob::_do_process_alter_tablet(const 
TAlterTabletReqV2& reques
             reader_context.sequence_id_idx = 
reader_context.tablet_schema->sequence_col_idx();
             reader_context.is_unique = _base_tablet->keys_type() == 
UNIQUE_KEYS;
             reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
-            reader_context.delete_bitmap = 
&_base_tablet->tablet_meta()->delete_bitmap();
+            reader_context.delete_bitmap = 
_base_tablet->tablet_meta()->delete_bitmap();
             reader_context.version = Version(0, end_version);
             for (auto& rs_split : rs_splits) {
                 res = rs_split.rs_reader->init(&reader_context);
@@ -1073,10 +1073,8 @@ Status 
SchemaChangeJob::_get_versions_to_be_changed(std::vector<Version>* versio
                                                           
_base_tablet->tablet_id());
     }
     *max_rowset = rowset;
-
-    RETURN_IF_ERROR(_base_tablet->capture_consistent_versions_unlocked(
-            Version(0, rowset->version().second), versions_to_be_changed, 
false, false));
-
+    *versions_to_be_changed = 
DORIS_TRY(_base_tablet->capture_consistent_versions_unlocked(
+            Version(0, rowset->version().second), {}));
     return Status::OK();
 }
 
@@ -1478,8 +1476,9 @@ Status 
SchemaChangeJob::_calc_delete_bitmap_for_mow_table(int64_t alter_version)
                   << "double write rowsets for version: " << alter_version + 1 
<< "-" << max_version
                   << " new_tablet=" << _new_tablet->tablet_id();
         std::shared_lock rlock(_new_tablet->get_header_lock());
-        RETURN_IF_ERROR(_new_tablet->capture_consistent_rowsets_unlocked(
-                {alter_version + 1, max_version}, &rowsets));
+        auto ret = DORIS_TRY(_new_tablet->capture_consistent_rowsets_unlocked(
+                {alter_version + 1, max_version}, CaptureRowsetOps {}));
+        rowsets = std::move(ret.rowsets);
     }
     for (auto rowset_ptr : rowsets) {
         std::lock_guard rwlock(_new_tablet->get_rowset_update_lock());
@@ -1497,8 +1496,9 @@ Status 
SchemaChangeJob::_calc_delete_bitmap_for_mow_table(int64_t alter_version)
         LOG(INFO) << "alter table for unique with merge-on-write, calculate 
delete bitmap of "
                   << "incremental rowsets for version: " << max_version + 1 << 
"-"
                   << new_max_version << " new_tablet=" << 
_new_tablet->tablet_id();
-        RETURN_IF_ERROR(_new_tablet->capture_consistent_rowsets_unlocked(
-                {max_version + 1, new_max_version}, &rowsets));
+        auto ret = DORIS_TRY(_new_tablet->capture_consistent_rowsets_unlocked(
+                {max_version + 1, new_max_version}, CaptureRowsetOps {}));
+        rowsets = std::move(ret.rowsets);
     }
     for (auto&& rowset_ptr : rowsets) {
         RETURN_IF_ERROR(Tablet::update_delete_bitmap_without_lock(_new_tablet, 
rowset_ptr));
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index a59ed36bb82..6d2c956ac04 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -40,6 +40,7 @@
 #include "common/logging.h"
 #include "common/status.h"
 #include "io/fs/local_file_system.h"
+#include "olap/base_tablet.h"
 #include "olap/data_dir.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
@@ -567,13 +568,23 @@ Status SnapshotManager::_create_snapshot_files(const 
TabletSharedPtr& ref_tablet
                     res = check_version_continuity(consistent_rowsets);
                     if (res.ok() && max_cooldowned_version < version) {
                         // Pick consistent rowsets of remaining required 
version
-                        res = ref_tablet->capture_consistent_rowsets_unlocked(
-                                {max_cooldowned_version + 1, version}, 
&consistent_rowsets);
+                        auto ret = 
ref_tablet->capture_consistent_rowsets_unlocked(
+                                {max_cooldowned_version + 1, version}, 
CaptureRowsetOps {});
+                        if (ret) {
+                            consistent_rowsets = std::move(ret->rowsets);
+                        } else {
+                            res = std::move(ret.error());
+                        }
                     }
                 } else {
                     // get shortest version path
-                    res = 
ref_tablet->capture_consistent_rowsets_unlocked(Version(0, version),
-                                                                          
&consistent_rowsets);
+                    auto ret = 
ref_tablet->capture_consistent_rowsets_unlocked(Version(0, version),
+                                                                               
CaptureRowsetOps {});
+                    if (ret) {
+                        consistent_rowsets = std::move(ret->rowsets);
+                    } else {
+                        res = std::move(ret.error());
+                    }
                 }
                 if (!res.ok()) {
                     LOG(WARNING) << "fail to select versions to span. res=" << 
res;
@@ -594,7 +605,7 @@ Status SnapshotManager::_create_snapshot_files(const 
TabletSharedPtr& ref_tablet
             if (ref_tablet->keys_type() == UNIQUE_KEYS &&
                 ref_tablet->enable_unique_key_merge_on_write()) {
                 delete_bitmap_snapshot =
-                        
ref_tablet->tablet_meta()->delete_bitmap().snapshot(version);
+                        
ref_tablet->tablet_meta()->delete_bitmap()->snapshot(version);
             }
         }
 
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 5c4770e3a33..403438f1dad 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -391,12 +391,14 @@ Status Tablet::revise_tablet_meta(const 
std::vector<RowsetSharedPtr>& to_add,
         }
 
         if (calc_delete_bitmap_ver.first <= calc_delete_bitmap_ver.second) {
-            calc_bm_status = 
capture_consistent_rowsets_unlocked(calc_delete_bitmap_ver,
-                                                                 
&calc_delete_bitmap_rowsets);
-            if (!calc_bm_status.ok()) {
-                LOG(WARNING) << "fail to capture_consistent_rowsets, res: " << 
calc_bm_status;
+            auto ret = 
capture_consistent_rowsets_unlocked(calc_delete_bitmap_ver,
+                                                           CaptureRowsetOps 
{});
+            if (!ret) {
+                LOG(WARNING) << "fail to capture_consistent_rowsets, res: " << 
ret.error();
+                calc_bm_status = std::move(ret.error());
                 break;
             }
+            calc_delete_bitmap_rowsets = std::move(ret->rowsets);
             // FIXME(plat1ko): Use `const TabletSharedPtr&` as parameter
             auto self = _engine.tablet_manager()->get_tablet(tablet_id());
             CHECK(self);
@@ -451,17 +453,16 @@ Status Tablet::revise_tablet_meta(const 
std::vector<RowsetSharedPtr>& to_add,
         // that we can capture by version
         if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
             Version full_version = Version(0, max_version_unlocked());
-            std::vector<RowsetSharedPtr> expected_rowsets;
-            auto st = capture_consistent_rowsets_unlocked(full_version, 
&expected_rowsets);
-            DCHECK(st.ok()) << st;
-            DCHECK_EQ(base_rowsets_for_full_clone.size(), 
expected_rowsets.size());
-            if (st.ok() && base_rowsets_for_full_clone.size() != 
expected_rowsets.size())
-                    [[unlikely]] {
+            auto ret = capture_consistent_rowsets_unlocked(full_version, 
CaptureRowsetOps {});
+            DCHECK(ret) << ret.error();
+            DCHECK_EQ(base_rowsets_for_full_clone.size(), ret->rowsets.size());
+
+            if (ret && base_rowsets_for_full_clone.size() != 
ret->rowsets.size()) [[unlikely]] {
                 LOG(WARNING) << "full clone succeeded, but the count("
                              << base_rowsets_for_full_clone.size()
                              << ") of base rowsets used for delete bitmap 
calculation is not match "
                                 "expect count("
-                             << expected_rowsets.size() << ") we capture from 
tablet meta";
+                             << ret->rowsets.size() << ") we capture from 
tablet meta";
             }
         }
     }
@@ -747,10 +748,9 @@ void Tablet::delete_expired_stale_rowset() {
             Version test_version = Version(0, lastest_delta->end_version());
             stale_version_path_map[*path_id_iter] = version_path;
 
-            Status status =
-                    capture_consistent_versions_unlocked(test_version, 
nullptr, false, false);
+            auto ret = capture_consistent_versions_unlocked(test_version, {});
             // 1. When there is no consistent versions, we must reconstruct 
the tracker.
-            if (!status.ok()) {
+            if (!ret) {
                 // 2. fetch missing version after delete
                 Versions after_missed_versions =
                         
get_missed_versions_unlocked(lastest_delta->end_version());
@@ -865,51 +865,11 @@ void Tablet::delete_expired_stale_rowset() {
                     { _engine.start_delete_unused_rowset(); });
 }
 
-Status Tablet::capture_consistent_versions_unlocked(const Version& 
spec_version,
-                                                    Versions* version_path,
-                                                    bool skip_missing_version, 
bool quiet) const {
-    Status status =
-            
_timestamped_version_tracker.capture_consistent_versions(spec_version, 
version_path);
-    if (!status.ok() && !quiet) {
-        Versions missed_versions = 
get_missed_versions_unlocked(spec_version.second);
-        if (missed_versions.empty()) {
-            // if version_path is null, it may be a compaction check logic.
-            // so to avoid print too many logs.
-            if (version_path != nullptr) {
-                LOG(WARNING) << "tablet:" << tablet_id()
-                             << ", version already has been merged. 
spec_version: " << spec_version
-                             << ", max_version: " << max_version_unlocked();
-            }
-            status = Status::Error<VERSION_ALREADY_MERGED, false>(
-                    "versions are already compacted, spec_version "
-                    "{}, max_version {}, tablet_id {}",
-                    spec_version.second, max_version_unlocked(), tablet_id());
-        } else {
-            if (version_path != nullptr) {
-                LOG(WARNING) << "status:" << status << ", tablet:" << 
tablet_id()
-                             << ", missed version for version:" << 
spec_version;
-                _print_missed_versions(missed_versions);
-                if (skip_missing_version) {
-                    LOG(WARNING) << "force skipping missing version for 
tablet:" << tablet_id();
-                    return Status::OK();
-                }
-            }
-        }
-    }
-
-    DBUG_EXECUTE_IF("TTablet::capture_consistent_versions.inject_failure", {
-        auto tablet_id = dp->param<int64>("tablet_id", -1);
-        if (tablet_id != -1 && tablet_id == _tablet_meta->tablet_id()) {
-            status = Status::Error<VERSION_ALREADY_MERGED>("version already 
merged");
-        }
-    });
-
-    return status;
-}
-
 Status Tablet::check_version_integrity(const Version& version, bool quiet) {
     std::shared_lock rdlock(_meta_lock);
-    return capture_consistent_versions_unlocked(version, nullptr, false, 
quiet);
+    [[maybe_unused]] auto _versions = DORIS_TRY(
+            capture_consistent_versions_unlocked(version, CaptureRowsetOps 
{.quiet = quiet}));
+    return Status::OK();
 }
 
 bool Tablet::exceed_version_limit(int32_t limit) {
@@ -939,22 +899,12 @@ void Tablet::acquire_version_and_rowsets(
     }
 }
 
-Status Tablet::capture_consistent_rowsets_unlocked(const Version& spec_version,
-                                                   
std::vector<RowsetSharedPtr>* rowsets) const {
-    std::vector<Version> version_path;
-    RETURN_IF_ERROR(
-            capture_consistent_versions_unlocked(spec_version, &version_path, 
false, false));
-    RETURN_IF_ERROR(_capture_consistent_rowsets_unlocked(version_path, 
rowsets));
-    return Status::OK();
-}
-
 Status Tablet::capture_rs_readers(const Version& spec_version, 
std::vector<RowSetSplits>* rs_splits,
                                   bool skip_missing_version) {
     std::shared_lock rlock(_meta_lock);
     std::vector<Version> version_path;
-    RETURN_IF_ERROR(capture_consistent_versions_unlocked(spec_version, 
&version_path,
-                                                         skip_missing_version, 
false));
-    RETURN_IF_ERROR(capture_rs_readers_unlocked(version_path, rs_splits));
+    *rs_splits = DORIS_TRY(capture_rs_readers_unlocked(
+            spec_version, CaptureRowsetOps {.skip_missing_versions = 
skip_missing_version}));
     return Status::OK();
 }
 
@@ -2514,8 +2464,8 @@ Status Tablet::save_delete_bitmap(const TabletTxnInfo* 
txn_info, int64_t txn_id,
     for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) {
         // skip sentinel mark, which is used for delete bitmap correctness 
check
         if (std::get<1>(key) != DeleteBitmap::INVALID_SEGMENT_ID) {
-            _tablet_meta->delete_bitmap().merge({std::get<0>(key), 
std::get<1>(key), cur_version},
-                                                bitmap);
+            _tablet_meta->delete_bitmap()->merge({std::get<0>(key), 
std::get<1>(key), cur_version},
+                                                 bitmap);
         }
     }
 
@@ -2523,7 +2473,7 @@ Status Tablet::save_delete_bitmap(const TabletTxnInfo* 
txn_info, int64_t txn_id,
 }
 
 void Tablet::merge_delete_bitmap(const DeleteBitmap& delete_bitmap) {
-    _tablet_meta->delete_bitmap().merge(delete_bitmap);
+    _tablet_meta->delete_bitmap()->merge(delete_bitmap);
 }
 
 bool Tablet::check_all_rowset_segment() {
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 467fc51f985..a8c9df89ff0 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -180,21 +180,12 @@ public:
     /// need to delete flag.
     void delete_expired_stale_rowset();
 
-    // Given spec_version, find a continuous version path and store it in 
version_path.
-    // If quiet is true, then only "does this path exist" is returned.
-    // If skip_missing_version is true, return ok even there are missing 
versions.
-    Status capture_consistent_versions_unlocked(const Version& spec_version, 
Versions* version_path,
-                                                bool skip_missing_version, 
bool quiet) const;
-
     // if quiet is true, no error log will be printed if there are missing 
versions
     Status check_version_integrity(const Version& version, bool quiet = false);
     bool check_version_exist(const Version& version) const;
     void acquire_version_and_rowsets(
             std::vector<std::pair<Version, RowsetSharedPtr>>* version_rowsets) 
const;
 
-    Status capture_consistent_rowsets_unlocked(
-            const Version& spec_version, std::vector<RowsetSharedPtr>* 
rowsets) const override;
-
     // If skip_missing_version is true, skip versions if they are missing.
     Status capture_rs_readers(const Version& spec_version, 
std::vector<RowSetSplits>* rs_splits,
                               bool skip_missing_version) override;
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index f74cf2bf1f6..3e7d48e44af 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -1764,7 +1764,7 @@ void TabletManager::get_topn_tablet_delete_bitmap_score(
     buf.reserve(n + 1);
     auto handler = [&](const TabletSharedPtr& tablet) {
         uint64_t delete_bitmap_count =
-                
tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count();
+                
tablet->tablet_meta()->delete_bitmap()->get_delete_bitmap_count();
         total_delete_map_count += delete_bitmap_count;
         if (delete_bitmap_count > *max_delete_bitmap_score) {
             max_delete_bitmap_score_tablet_id = tablet->tablet_id();
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 3a0ff3419ee..c1dbc8a9394 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -29,6 +29,7 @@
 #include <cstdint>
 #include <memory>
 #include <random>
+#include <ranges>
 #include <set>
 #include <utility>
 
@@ -455,12 +456,12 @@ void TabletMeta::init_column_from_tcolumn(uint32_t 
unique_id, const TColumn& tco
 
 void TabletMeta::remove_rowset_delete_bitmap(const RowsetId& rowset_id, const 
Version& version) {
     if (_enable_unique_key_merge_on_write) {
-        delete_bitmap().remove({rowset_id, 0, 0}, {rowset_id, UINT32_MAX, 0});
+        delete_bitmap()->remove({rowset_id, 0, 0}, {rowset_id, UINT32_MAX, 0});
         if (config::enable_mow_verbose_log) {
             LOG_INFO("delete rowset delete bitmap. tablet={}, rowset={}, 
version={}", tablet_id(),
                      rowset_id.to_string(), version.to_string());
         }
-        size_t rowset_cache_version_size = 
delete_bitmap().remove_rowset_cache_version(rowset_id);
+        size_t rowset_cache_version_size = 
delete_bitmap()->remove_rowset_cache_version(rowset_id);
         _check_mow_rowset_cache_version_size(rowset_cache_version_size);
     }
 }
@@ -705,7 +706,7 @@ void TabletMeta::init_from_pb(const TabletMetaPB& 
tablet_meta_pb) {
             auto seg_id = tablet_meta_pb.delete_bitmap().segment_ids(i);
             auto ver = tablet_meta_pb.delete_bitmap().versions(i);
             auto bitmap = 
tablet_meta_pb.delete_bitmap().segment_delete_bitmaps(i).data();
-            delete_bitmap().delete_bitmap[{rst_id, seg_id, ver}] = 
roaring::Roaring::read(bitmap);
+            delete_bitmap()->delete_bitmap[{rst_id, seg_id, ver}] = 
roaring::Roaring::read(bitmap);
         }
     }
 
@@ -789,7 +790,7 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
             stale_rs_ids.insert(rowset->rowset_id());
         }
         DeleteBitmapPB* delete_bitmap_pb = 
tablet_meta_pb->mutable_delete_bitmap();
-        for (auto& [id, bitmap] : delete_bitmap().snapshot().delete_bitmap) {
+        for (auto& [id, bitmap] : delete_bitmap()->snapshot().delete_bitmap) {
             auto& [rowset_id, segment_id, ver] = id;
             if (stale_rs_ids.count(rowset_id) != 0) {
                 continue;
@@ -1121,6 +1122,35 @@ DeleteBitmap& DeleteBitmap::operator=(DeleteBitmap&& o) {
     return *this;
 }
 
+DeleteBitmap DeleteBitmap::from_pb(const DeleteBitmapPB& pb, int64_t 
tablet_id) {
+    size_t len = pb.rowset_ids().size();
+    DCHECK_EQ(len, pb.segment_ids().size());
+    DCHECK_EQ(len, pb.versions().size());
+    DeleteBitmap delete_bitmap(tablet_id);
+    for (int32_t i = 0; i < len; ++i) {
+        RowsetId rs_id;
+        rs_id.init(pb.rowset_ids(i));
+        BitmapKey key = {rs_id, pb.segment_ids(i), pb.versions(i)};
+        delete_bitmap.delete_bitmap[key] =
+                roaring::Roaring::read(pb.segment_delete_bitmaps(i).data());
+    }
+    return delete_bitmap;
+}
+
+DeleteBitmapPB DeleteBitmap::to_pb() {
+    std::shared_lock l(lock);
+    DeleteBitmapPB ret;
+    for (const auto& [k, v] : delete_bitmap) {
+        ret.mutable_rowset_ids()->Add(std::get<0>(k).to_string());
+        ret.mutable_segment_ids()->Add(std::get<1>(k));
+        ret.mutable_versions()->Add(std::get<2>(k));
+        std::string bitmap_data(v.getSizeInBytes(), '\0');
+        v.write(bitmap_data.data());
+        ret.mutable_segment_delete_bitmaps()->Add(std::move(bitmap_data));
+    }
+    return ret;
+}
+
 DeleteBitmap DeleteBitmap::snapshot() const {
     std::shared_lock l(lock);
     return DeleteBitmap(*this);
@@ -1508,6 +1538,22 @@ std::shared_ptr<roaring::Roaring> 
DeleteBitmap::get_agg_without_cache(const Bitm
     return bitmap;
 }
 
+DeleteBitmap DeleteBitmap::diffset(const std::set<BitmapKey>& key_set) const {
+    std::shared_lock l(lock);
+    auto diff_key_set_view =
+            delete_bitmap | std::ranges::views::transform([](const auto& kv) { 
return kv.first; }) |
+            std::ranges::views::filter(
+                    [&key_set](const auto& key) { return 
!key_set.contains(key); });
+
+    DeleteBitmap dbm(_tablet_id);
+    for (const auto& key : diff_key_set_view) {
+        const auto* bitmap = get(key);
+        DCHECK_NE(bitmap, nullptr);
+        dbm.delete_bitmap[key] = *bitmap;
+    }
+    return dbm;
+}
+
 std::atomic<DeleteBitmap::AggCachePolicy*> DeleteBitmap::AggCache::s_repr 
{nullptr};
 
 std::string tablet_state_name(TabletState state) {
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 388ddc439dc..63ad0124f86 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -241,7 +241,7 @@ public:
     static void init_column_from_tcolumn(uint32_t unique_id, const TColumn& 
tcolumn,
                                          ColumnPB* column);
 
-    DeleteBitmap& delete_bitmap() { return *_delete_bitmap; }
+    std::shared_ptr<DeleteBitmap> delete_bitmap() { return _delete_bitmap; }
     void remove_rowset_delete_bitmap(const RowsetId& rowset_id, const Version& 
version);
 
     bool enable_unique_key_merge_on_write() const { return 
_enable_unique_key_merge_on_write; }
@@ -417,6 +417,10 @@ public:
     DeleteBitmap(DeleteBitmap&& r);
     DeleteBitmap& operator=(DeleteBitmap&& r);
 
+    static DeleteBitmap from_pb(const DeleteBitmapPB& pb, int64_t tablet_id);
+
+    DeleteBitmapPB to_pb();
+
     /**
      * Makes a snapshot of delete bitmap, read lock will be acquired in this
      * process
@@ -566,6 +570,14 @@ public:
 
     std::set<RowsetId> get_rowset_cache_version();
 
+    /**
+     * Calculate diffset with given `key_set`. All entries with keys contained 
in this delete bitmap but not
+     * in given key_set will be added to the output delete bitmap.
+     *
+     * @return Deletebitmap containning all entries in diffset
+    */
+    DeleteBitmap diffset(const std::set<BitmapKey>& key_set) const;
+
     class AggCachePolicy : public LRUCachePolicy {
     public:
         AggCachePolicy(size_t capacity)
diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp
index a453634ca83..7654a37de16 100644
--- a/be/src/olap/tablet_reader.cpp
+++ b/be/src/olap/tablet_reader.cpp
@@ -100,16 +100,6 @@ std::string TabletReader::KeysParam::to_string() const {
     return ss.str();
 }
 
-void TabletReader::ReadSource::fill_delete_predicates() {
-    DCHECK_EQ(delete_predicates.size(), 0);
-    for (auto&& split : rs_splits) {
-        auto& rs_meta = split.rs_reader->rowset()->rowset_meta();
-        if (rs_meta->has_delete_predicate()) {
-            delete_predicates.push_back(rs_meta);
-        }
-    }
-}
-
 TabletReader::~TabletReader() {
     for (auto* pred : _col_predicates) {
         delete pred;
@@ -658,7 +648,7 @@ Status TabletReader::init_reader_params_and_create_block(
     reader_params->version =
             Version(input_rowsets.front()->start_version(), 
input_rowsets.back()->end_version());
 
-    ReadSource read_source;
+    TabletReadSource read_source;
     for (const auto& rowset : input_rowsets) {
         RowsetReaderSharedPtr rs_reader;
         RETURN_IF_ERROR(rowset->create_reader(&rs_reader));
@@ -680,9 +670,6 @@ Status TabletReader::init_reader_params_and_create_block(
         merge_tablet_schema->merge_dropped_columns(*del_pred->tablet_schema());
     }
     reader_params->tablet_schema = merge_tablet_schema;
-    if (tablet->enable_unique_key_merge_on_write()) {
-        reader_params->delete_bitmap = &tablet->tablet_meta()->delete_bitmap();
-    }
 
     reader_params->return_columns.resize(read_tablet_schema->num_columns());
     std::iota(reader_params->return_columns.begin(), 
reader_params->return_columns.end(), 0);
diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h
index 87af3bb08eb..d5aac0b89b5 100644
--- a/be/src/olap/tablet_reader.h
+++ b/be/src/olap/tablet_reader.h
@@ -34,6 +34,7 @@
 #include "exprs/function_filter.h"
 #include "gutil/strings/substitute.h"
 #include "io/io_common.h"
+#include "olap/base_tablet.h"
 #include "olap/delete_handler.h"
 #include "olap/iterators.h"
 #include "olap/olap_common.h"
@@ -91,12 +92,6 @@ class TabletReader {
     };
 
 public:
-    struct ReadSource {
-        std::vector<RowSetSplits> rs_splits;
-        std::vector<RowsetMetaSharedPtr> delete_predicates;
-        // Fill delete predicates with `rs_splits`
-        void fill_delete_predicates();
-    };
     // Params for Reader,
     // mainly include tablet, data version and fetch range.
     struct ReaderParams {
@@ -117,9 +112,12 @@ public:
             return BeExecVersionManager::get_newest_version();
         }
 
-        void set_read_source(ReadSource read_source) {
+        void set_read_source(TabletReadSource read_source, bool 
skip_delete_bitmap = false) {
             rs_splits = std::move(read_source.rs_splits);
             delete_predicates = std::move(read_source.delete_predicates);
+            if (tablet->enable_unique_key_merge_on_write() && 
!skip_delete_bitmap) {
+                delete_bitmap = std::move(read_source.delete_bitmap);
+            }
         }
 
         BaseTabletSPtr tablet;
@@ -148,7 +146,7 @@ public:
 
         std::vector<RowSetSplits> rs_splits;
         // For unique key table with merge-on-write
-        DeleteBitmap* delete_bitmap = nullptr;
+        std::shared_ptr<DeleteBitmap> delete_bitmap = nullptr;
 
         // return_columns is init from query schema
         std::vector<uint32_t> return_columns;
diff --git a/be/src/olap/task/engine_checksum_task.cpp 
b/be/src/olap/task/engine_checksum_task.cpp
index 05ecfc0401b..c6cf69d54d9 100644
--- a/be/src/olap/task/engine_checksum_task.cpp
+++ b/be/src/olap/task/engine_checksum_task.cpp
@@ -24,6 +24,7 @@
 #include <vector>
 
 #include "io/io_common.h"
+#include "olap/base_tablet.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
 #include "olap/rowset/rowset.h"
@@ -81,13 +82,15 @@ Status EngineChecksumTask::_compute_checksum() {
     vectorized::Block block;
     {
         std::shared_lock rdlock(tablet->get_header_lock());
-        Status acquire_reader_st =
-                tablet->capture_consistent_rowsets_unlocked(version, 
&input_rowsets);
-        if (!acquire_reader_st.ok()) {
+        auto ret = tablet->capture_consistent_rowsets_unlocked(version, 
CaptureRowsetOps {});
+        if (ret) {
+            input_rowsets = std::move(ret->rowsets);
+        } else {
             LOG(WARNING) << "fail to captute consistent rowsets. tablet=" << 
tablet->tablet_id()
-                         << "res=" << acquire_reader_st;
-            return acquire_reader_st;
+                         << "res=" << ret.error();
+            return std::move(ret.error());
         }
+
         RETURN_IF_ERROR(TabletReader::init_reader_params_and_create_block(
                 tablet, ReaderType::READER_CHECKSUM, input_rowsets, 
&reader_params, &block));
     }
diff --git a/be/src/olap/task/engine_clone_task.cpp 
b/be/src/olap/task/engine_clone_task.cpp
index ecf1bdfc6d5..6a9e66f1d38 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -984,7 +984,7 @@ Status EngineCloneTask::_finish_full_clone(Tablet* tablet,
         }
     }
     if (tablet->enable_unique_key_merge_on_write()) {
-        
tablet->tablet_meta()->delete_bitmap().merge(cloned_tablet_meta->delete_bitmap());
+        
tablet->tablet_meta()->delete_bitmap()->merge(*cloned_tablet_meta->delete_bitmap());
     }
     return tablet->revise_tablet_meta(to_add, to_delete, false);
     // TODO(plat1ko): write cooldown meta to remote if this replica is 
cooldown replica
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp 
b/be/src/olap/task/engine_storage_migration_task.cpp
index 210aa6a8c56..d9352a0ea9d 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -32,8 +32,10 @@
 
 #include "common/config.h"
 #include "common/logging.h"
+#include "common/status.h"
 #include "gutil/strings/numbers.h"
 #include "io/fs/local_file_system.h"
+#include "olap/base_tablet.h"
 #include "olap/data_dir.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
@@ -87,8 +89,10 @@ Status EngineStorageMigrationTask::_get_versions(int32_t 
start_version, int32_t*
                    << ", start_version=" << start_version << ", end_version=" 
<< *end_version;
         return Status::OK();
     }
-    return _tablet->capture_consistent_rowsets_unlocked(Version(start_version, 
*end_version),
-                                                        consistent_rowsets);
+    auto ret = DORIS_TRY(_tablet->capture_consistent_rowsets_unlocked(
+            Version(start_version, *end_version), CaptureRowsetOps {}));
+    *consistent_rowsets = std::move(ret.rowsets);
+    return Status::OK();
 }
 
 bool EngineStorageMigrationTask::_is_timeout() {
@@ -354,7 +358,7 @@ void EngineStorageMigrationTask::_generate_new_header(
     }
     new_tablet_meta->revise_rs_metas(std::move(rs_metas));
     if (_tablet->keys_type() == UNIQUE_KEYS && 
_tablet->enable_unique_key_merge_on_write()) {
-        DeleteBitmap bm = 
_tablet->tablet_meta()->delete_bitmap().snapshot(end_version);
+        DeleteBitmap bm = 
_tablet->tablet_meta()->delete_bitmap()->snapshot(end_version);
         new_tablet_meta->revise_delete_bitmap_unlocked(bm);
     }
     new_tablet_meta->set_shard_id(new_shard);
diff --git a/be/src/olap/task/index_builder.cpp 
b/be/src/olap/task/index_builder.cpp
index 9f72056af8d..ffb226bee58 100644
--- a/be/src/olap/task/index_builder.cpp
+++ b/be/src/olap/task/index_builder.cpp
@@ -822,7 +822,7 @@ Status IndexBuilder::modify_rowsets(const 
Merger::Statistics* stats) {
         for (auto i = 0; i < _input_rowsets.size(); ++i) {
             RowsetId input_rowset_id = _input_rowsets[i]->rowset_id();
             RowsetId output_rowset_id = _output_rowsets[i]->rowset_id();
-            for (const auto& [k, v] : 
_tablet->tablet_meta()->delete_bitmap().delete_bitmap) {
+            for (const auto& [k, v] : 
_tablet->tablet_meta()->delete_bitmap()->delete_bitmap) {
                 RowsetId rs_id = std::get<0>(k);
                 if (rs_id == input_rowset_id) {
                     DeleteBitmap::BitmapKey output_rs_key = {output_rowset_id, 
std::get<1>(k),
@@ -832,7 +832,7 @@ Status IndexBuilder::modify_rowsets(const 
Merger::Statistics* stats) {
                 }
             }
         }
-        _tablet->tablet_meta()->delete_bitmap().merge(*delete_bitmap);
+        _tablet->tablet_meta()->delete_bitmap()->merge(*delete_bitmap);
 
         // modify_rowsets will remove the delete_bitmap for input rowsets,
         // should call it after merge delete_bitmap
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 53a7d8cd0f3..13d2044cc2a 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -27,6 +27,7 @@
 #include "cloud/cloud_tablet.h"
 #include "cloud/cloud_tablet_hotspot.h"
 #include "cloud/config.h"
+#include "common/config.h"
 #include "olap/parallel_scanner_builder.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_manager.h"
@@ -528,9 +529,11 @@ Status OlapScanLocalState::hold_tablets() {
     }
 
     for (size_t i = 0; i < _scan_ranges.size(); i++) {
-        RETURN_IF_ERROR(_tablets[i].tablet->capture_rs_readers(
-                {0, _tablets[i].version}, &_read_sources[i].rs_splits,
-                RuntimeFilterConsumer::_state->skip_missing_version()));
+        _read_sources[i] = DORIS_TRY(_tablets[i].tablet->capture_read_source(
+                {0, _tablets[i].version},
+                {.skip_missing_versions = 
RuntimeFilterConsumer::_state->skip_missing_version(),
+                 .enable_fetch_rowsets_from_peers =
+                         config::enable_fetch_rowsets_from_peer_replicas}));
         if (!PipelineXLocalState<>::_state->skip_delete_predicate()) {
             _read_sources[i].fill_delete_predicates();
         }
diff --git a/be/src/pipeline/exec/olap_scan_operator.h 
b/be/src/pipeline/exec/olap_scan_operator.h
index 24a1b1b876a..e1eea0c7822 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -229,7 +229,7 @@ private:
 
     std::mutex _profile_mtx;
     std::vector<TabletWithVersion> _tablets;
-    std::vector<TabletReader::ReadSource> _read_sources;
+    std::vector<TabletReadSource> _read_sources;
 };
 
 class OlapScanOperatorX final : public ScanOperatorX<OlapScanLocalState> {
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 423c62c6c4b..6d5d9055fc3 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -53,6 +53,9 @@
 #include <utility>
 #include <vector>
 
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/cloud_tablet_mgr.h"
+#include "cloud/config.h"
 #include "common/config.h"
 #include "common/consts.h"
 #include "common/exception.h"
@@ -162,6 +165,8 @@ 
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_active_threads, MetricUnit:
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_max_queue_size, 
MetricUnit::NOUNIT);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_max_threads, 
MetricUnit::NOUNIT);
 
+static bvar::LatencyRecorder 
g_process_remote_fetch_rowsets_latency("process_remote_fetch_rowsets");
+
 bthread_key_t btls_key;
 
 static void thread_context_deleter(void* d) {
@@ -2200,4 +2205,66 @@ void 
PInternalService::get_be_resource(google::protobuf::RpcController* controll
     }
 }
 
+void PInternalService::get_tablet_rowsets(google::protobuf::RpcController* 
controller,
+                                          const PGetTabletRowsetsRequest* 
request,
+                                          PGetTabletRowsetsResponse* response,
+                                          google::protobuf::Closure* done) {
+    DCHECK(config::is_cloud_mode());
+    auto start_time = GetMonoTimeMicros();
+    Defer defer {
+            [&]() { g_process_remote_fetch_rowsets_latency << 
GetMonoTimeMicros() - start_time; }};
+    brpc::ClosureGuard closure_guard(done);
+    LOG(INFO) << "process get tablet rowsets, request=" << 
request->ShortDebugString();
+    if (!request->has_tablet_id() || !request->has_version_start() || 
!request->has_version_end()) {
+        Status::InvalidArgument("missing params 
tablet/version_start/version_end")
+                .to_protobuf(response->mutable_status());
+        return;
+    }
+    CloudStorageEngine& storage = 
ExecEnv::GetInstance()->storage_engine().to_cloud();
+
+    auto maybe_tablet =
+            storage.tablet_mgr().get_tablet(request->tablet_id(), /*warmup 
data*/ false,
+                                            /*syn_delete_bitmap*/ false, 
/*delete_bitmap*/ nullptr,
+                                            /*local_only*/ true);
+    if (!maybe_tablet) {
+        maybe_tablet.error().to_protobuf(response->mutable_status());
+        return;
+    }
+    auto tablet = maybe_tablet.value();
+    Result<CaptureRowsetResult> ret;
+    {
+        std::shared_lock l(tablet->get_header_lock());
+        ret = tablet->capture_consistent_rowsets_unlocked(
+                {request->version_start(), request->version_end()},
+                CaptureRowsetOps {.enable_fetch_rowsets_from_peers = false});
+    }
+    if (!ret) {
+        ret.error().to_protobuf(response->mutable_status());
+        return;
+    }
+    auto rowsets = std::move(ret.value().rowsets);
+    for (const auto& rs : rowsets) {
+        RowsetMetaPB meta;
+        rs->rowset_meta()->to_rowset_pb(&meta);
+        response->mutable_rowsets()->Add(std::move(meta));
+    }
+    if (request->has_delete_bitmap_keys()) {
+        DCHECK(tablet->enable_unique_key_merge_on_write());
+        auto delete_bitmap = std::move(ret.value().delete_bitmap);
+        auto keys_pb = request->delete_bitmap_keys();
+        size_t len = keys_pb.rowset_ids().size();
+        DCHECK_EQ(len, keys_pb.segment_ids().size());
+        DCHECK_EQ(len, keys_pb.versions().size());
+        std::set<DeleteBitmap::BitmapKey> keys;
+        for (size_t i = 0; i < len; ++i) {
+            RowsetId rs_id;
+            rs_id.init(keys_pb.rowset_ids(i));
+            keys.emplace(rs_id, keys_pb.segment_ids(i), keys_pb.versions(i));
+        }
+        auto diffset = delete_bitmap->diffset(keys).to_pb();
+        *response->mutable_delete_bitmap() = std::move(diffset);
+    }
+    Status::OK().to_protobuf(response->mutable_status());
+}
+
 } // namespace doris
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index 66a0f867393..7262f4eed2d 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -240,6 +240,11 @@ public:
                          const PGetBeResourceRequest* request, 
PGetBeResourceResponse* response,
                          google::protobuf::Closure* done) override;
 
+    void get_tablet_rowsets(google::protobuf::RpcController* controller,
+                            const PGetTabletRowsetsRequest* request,
+                            PGetTabletRowsetsResponse* response,
+                            google::protobuf::Closure* done) override;
+
 private:
     void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* 
controller,
                                         const PExecPlanFragmentRequest* 
request,
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index e9c199074ec..c8008c98525 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -68,7 +68,7 @@
 
 namespace doris::vectorized {
 
-using ReadSource = TabletReader::ReadSource;
+using ReadSource = TabletReadSource;
 
 NewOlapScanner::NewOlapScanner(pipeline::ScanLocalStateBase* parent,
                                NewOlapScanner::Params&& params)
@@ -97,7 +97,8 @@ NewOlapScanner::NewOlapScanner(pipeline::ScanLocalStateBase* 
parent,
                   .filter_block_conjuncts {},
                   .key_group_cluster_key_idxes {},
           }) {
-    _tablet_reader_params.set_read_source(std::move(params.read_source));
+    _tablet_reader_params.set_read_source(std::move(params.read_source),
+                                          _state->skip_delete_bitmap());
     _is_init = false;
 }
 
@@ -193,12 +194,14 @@ Status NewOlapScanner::init() {
                 
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet);
             }
 
-            auto st = tablet->capture_rs_readers(_tablet_reader_params.version,
-                                                 &read_source.rs_splits,
-                                                 
_state->skip_missing_version());
-            if (!st.ok()) {
-                LOG(WARNING) << "fail to init reader.res=" << st;
-                return st;
+            auto maybe_read_source = tablet->capture_read_source(
+                    _tablet_reader_params.version,
+                    {.skip_missing_versions = _state->skip_missing_version(),
+                     .enable_fetch_rowsets_from_peers =
+                             config::enable_fetch_rowsets_from_peer_replicas});
+            if (!maybe_read_source) {
+                LOG(WARNING) << "fail to init reader. res=" << 
maybe_read_source.error();
+                return maybe_read_source.error();
             }
             if (config::enable_mow_verbose_log && 
tablet->enable_unique_key_merge_on_write()) {
                 LOG_INFO("finish capture_rs_readers for tablet={}, 
query_id={}",
@@ -309,7 +312,6 @@ Status NewOlapScanner::_init_tablet_reader_params(
               std::inserter(_tablet_reader_params.function_filters,
                             _tablet_reader_params.function_filters.begin()));
 
-    auto& tablet = _tablet_reader_params.tablet;
     auto& tablet_schema = _tablet_reader_params.tablet_schema;
     // Merge the columns in delete predicate that not in latest schema in to 
current tablet schema
     for (auto& del_pred : _tablet_reader_params.delete_predicates) {
@@ -369,10 +371,6 @@ Status NewOlapScanner::_init_tablet_reader_params(
 
     _tablet_reader_params.use_page_cache = _state->enable_page_cache();
 
-    if (tablet->enable_unique_key_merge_on_write() && 
!_state->skip_delete_bitmap()) {
-        _tablet_reader_params.delete_bitmap = 
&tablet->tablet_meta()->delete_bitmap();
-    }
-
     DBUG_EXECUTE_IF("NewOlapScanner::_init_tablet_reader_params.block", 
DBUG_BLOCK);
 
     if (!_state->skip_storage_engine_merge()) {
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h 
b/be/src/vec/exec/scan/new_olap_scanner.h
index fd1246b120b..a997ae2adf5 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -62,7 +62,7 @@ public:
         std::vector<OlapScanRange*> key_ranges;
         BaseTabletSPtr tablet;
         int64_t version;
-        TabletReader::ReadSource read_source;
+        TabletReadSource read_source;
         int64_t limit;
         bool aggregation;
     };
diff --git a/be/test/olap/delta_writer_test.cpp 
b/be/test/olap/delta_writer_test.cpp
index 7f6aadd6070..d3e9115baf1 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -996,7 +996,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
         StorageReadOptions opts;
         opts.stats = &stats;
         opts.tablet_schema = rowset1->tablet_schema();
-        opts.delete_bitmap.emplace(0, 
tablet->tablet_meta()->delete_bitmap().get_agg(
+        opts.delete_bitmap.emplace(0, 
tablet->tablet_meta()->delete_bitmap()->get_agg(
                                               {rowset1->rowset_id(), 0, 
cur_version}));
         std::unique_ptr<RowwiseIterator> iter;
         std::shared_ptr<Schema> schema = 
std::make_shared<Schema>(rowset1->tablet_schema());
@@ -1024,7 +1024,7 @@ TEST_F(TestDeltaWriter, 
vec_sequence_col_concurrent_write) {
         StorageReadOptions opts;
         opts.stats = &stats;
         opts.tablet_schema = rowset2->tablet_schema();
-        opts.delete_bitmap.emplace(0, 
tablet->tablet_meta()->delete_bitmap().get_agg(
+        opts.delete_bitmap.emplace(0, 
tablet->tablet_meta()->delete_bitmap()->get_agg(
                                               {rowset2->rowset_id(), 0, 
cur_version}));
         std::unique_ptr<RowwiseIterator> iter;
         std::shared_ptr<Schema> schema = 
std::make_shared<Schema>(rowset2->tablet_schema());
diff --git a/be/test/olap/segcompaction_mow_test.cpp 
b/be/test/olap/segcompaction_mow_test.cpp
index 62a3232889d..efe40dcb859 100644
--- a/be/test/olap/segcompaction_mow_test.cpp
+++ b/be/test/olap/segcompaction_mow_test.cpp
@@ -237,7 +237,7 @@ protected:
         std::vector<uint32_t> return_columns = {0, 1, 2};
         reader_context.return_columns = &return_columns;
         reader_context.stats = &_stats;
-        reader_context.delete_bitmap = delete_bitmap.get();
+        reader_context.delete_bitmap = delete_bitmap;
 
         std::vector<uint32_t> segment_num_rows;
         Status s;
diff --git a/be/test/olap/tablet_meta_test.cpp 
b/be/test/olap/tablet_meta_test.cpp
index b85c63ef714..b350e5e0061 100644
--- a/be/test/olap/tablet_meta_test.cpp
+++ b/be/test/olap/tablet_meta_test.cpp
@@ -73,23 +73,23 @@ TEST(TabletMetaTest, TestReviseMeta) {
     }
     ASSERT_EQ(4, tablet_meta.all_rs_metas().size());
 
-    tablet_meta.delete_bitmap().add({rsids[0], 1, 1}, 1);
-    tablet_meta.delete_bitmap().add({rsids[1], 0, 2}, 2);
-    tablet_meta.delete_bitmap().add({rsids[2], 1, 1}, 1);
-    tablet_meta.delete_bitmap().add({rsids[3], 0, 2}, 3);
-    tablet_meta.delete_bitmap().add({rsids[3], 0, 4}, 4);
-    ASSERT_EQ(5, tablet_meta.delete_bitmap().delete_bitmap.size());
+    tablet_meta.delete_bitmap()->add({rsids[0], 1, 1}, 1);
+    tablet_meta.delete_bitmap()->add({rsids[1], 0, 2}, 2);
+    tablet_meta.delete_bitmap()->add({rsids[2], 1, 1}, 1);
+    tablet_meta.delete_bitmap()->add({rsids[3], 0, 2}, 3);
+    tablet_meta.delete_bitmap()->add({rsids[3], 0, 4}, 4);
+    ASSERT_EQ(5, tablet_meta.delete_bitmap()->delete_bitmap.size());
 
     std::vector<RowsetMetaSharedPtr> new_rowsets;
     new_rowsets.push_back(src_rowsets[2]->rowset_meta());
     new_rowsets.push_back(src_rowsets[3]->rowset_meta());
     tablet_meta.revise_rs_metas(std::move(new_rowsets));
     // Take a snapshot with max_version=3.
-    DeleteBitmap snap = tablet_meta.delete_bitmap().snapshot(3);
+    DeleteBitmap snap = tablet_meta.delete_bitmap()->snapshot(3);
     tablet_meta.revise_delete_bitmap_unlocked(snap);
     ASSERT_EQ(2, tablet_meta.all_rs_metas().size());
-    ASSERT_EQ(2, tablet_meta.delete_bitmap().delete_bitmap.size());
-    for (auto entry : tablet_meta.delete_bitmap().delete_bitmap) {
+    ASSERT_EQ(2, tablet_meta.delete_bitmap()->delete_bitmap.size());
+    for (auto entry : tablet_meta.delete_bitmap()->delete_bitmap) {
         RowsetId rsid = std::get<0>(entry.first);
         ASSERT_TRUE(rsid == rsids[2] || rsid == rsids[3]);
         int64_t version = std::get<2>(entry.first);
diff --git a/be/test/olap/test_data/rowset_meta.json 
b/be/test/olap/test_data/rowset_meta.json
index d446e2a34e9..4fe585978aa 100644
--- a/be/test/olap/test_data/rowset_meta.json
+++ b/be/test/olap/test_data/rowset_meta.json
@@ -12,6 +12,10 @@
     "data_disk_size": 0,
     "index_disk_size": 0,
     "empty": true,
+    "load_id": {
+        "hi": 0,
+        "lo": 0
+    },
     "creation_time": 1552911435,
     "tablet_uid": {
         "hi": 10,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index 358fc1023b2..fce12193e35 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -602,6 +602,22 @@ public class CloudReplica extends Replica {
         secondaryClusterToBackends.remove(cluster);
     }
 
+    public List<Backend> getAllPrimaryBes() {
+        List<Backend> result = new ArrayList<Backend>();
+        primaryClusterToBackends.keySet().forEach(clusterId -> {
+            List<Long> backendIds = primaryClusterToBackends.get(clusterId);
+            if (backendIds == null || backendIds.isEmpty()) {
+                return;
+            }
+            Long beId = backendIds.get(0);
+            if (beId != -1) {
+                Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
+                result.add(backend);
+            }
+        });
+        return result;
+    }
+
     // ATTN: This func is only used by redundant tablet report clean in bes.
     // Only the master node will do the diff logic,
     // so just only need to clean up secondaryClusterToBackends on the master 
node.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 92cfe98e535..ddb49a0ee0a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -48,6 +48,7 @@ import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletMeta;
 import org.apache.doris.catalog.View;
 import org.apache.doris.cloud.catalog.CloudPartition;
+import org.apache.doris.cloud.catalog.CloudReplica;
 import org.apache.doris.cloud.catalog.CloudTablet;
 import org.apache.doris.cloud.proto.Cloud.CommitTxnResponse;
 import org.apache.doris.cluster.ClusterNamespace;
@@ -2756,15 +2757,24 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                     LOG.warn("replica {} not normal", replica.getId());
                     continue;
                 }
-                Backend backend = 
Env.getCurrentSystemInfo().getBackend(replica.getBackendIdWithoutException());
-                if (backend != null) {
-                    TReplicaInfo replicaInfo = new TReplicaInfo();
-                    replicaInfo.setHost(backend.getHost());
-                    replicaInfo.setBePort(backend.getBePort());
-                    replicaInfo.setHttpPort(backend.getHttpPort());
-                    replicaInfo.setBrpcPort(backend.getBrpcPort());
-                    replicaInfo.setReplicaId(replica.getId());
-                    replicaInfos.add(replicaInfo);
+                List<Backend> backends;
+                if (Config.isCloudMode()) {
+                    CloudReplica cloudReplica = (CloudReplica) replica;
+                    backends = cloudReplica.getAllPrimaryBes();
+                } else {
+                    Backend backend = 
Env.getCurrentSystemInfo().getBackend(replica.getBackendIdWithoutException());
+                    backends = Lists.newArrayList(backend);
+                }
+                for (Backend backend : backends) {
+                    if (backend != null) {
+                        TReplicaInfo replicaInfo = new TReplicaInfo();
+                        replicaInfo.setHost(backend.getHost());
+                        replicaInfo.setBePort(backend.getBePort());
+                        replicaInfo.setHttpPort(backend.getHttpPort());
+                        replicaInfo.setBrpcPort(backend.getBrpcPort());
+                        replicaInfo.setReplicaId(replica.getId());
+                        replicaInfos.add(replicaInfo);
+                    }
                 }
             }
             tabletReplicaInfos.put(tabletId, replicaInfos);
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 150c07fcf9a..20de386b3eb 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -989,6 +989,21 @@ message PGetBeResourceResponse {
     optional PGlobalResourceUsage global_be_resource_usage = 2;
 }
 
+message PGetTabletRowsetsRequest {
+    optional int64 tablet_id = 1;
+    optional int64 version_start = 2;
+    optional int64 version_end = 3;
+
+    optional DeleteBitmapPB delete_bitmap_keys = 4;
+}
+
+message PGetTabletRowsetsResponse {
+    required PStatus status = 1;
+    repeated RowsetMetaPB rowsets = 2;
+
+    optional DeleteBitmapPB delete_bitmap = 3;
+}
+
 service PBackendService {
     rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult);
     rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult);
@@ -1041,5 +1056,6 @@ service PBackendService {
     rpc test_jdbc_connection(PJdbcTestConnectionRequest) returns 
(PJdbcTestConnectionResult);
     rpc alter_vault_sync(PAlterVaultSyncRequest) returns 
(PAlterVaultSyncResponse);
     rpc get_be_resource(PGetBeResourceRequest) returns 
(PGetBeResourceResponse);
+    rpc get_tablet_rowsets(PGetTabletRowsetsRequest) returns 
(PGetTabletRowsetsResponse);
 };
 
diff --git 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_version_already_merged.out
 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_version_already_merged.out
new file mode 100644
index 00000000000..78964812ebf
Binary files /dev/null and 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_version_already_merged.out
 differ
diff --git a/regression-test/pipeline/p0/conf/be.conf 
b/regression-test/pipeline/p0/conf/be.conf
index 5627f048905..9b0bc0f8e71 100644
--- a/regression-test/pipeline/p0/conf/be.conf
+++ b/regression-test/pipeline/p0/conf/be.conf
@@ -80,3 +80,5 @@ large_cumu_compaction_task_min_thread_num=3
 
 # This feature has bug, so by default is false, only open it in pipeline to 
observe
 enable_parquet_page_index=true
+
+enable_fetch_rowsets_from_peer_replicas = true
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_version_already_merged.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_version_already_merged.groovy
new file mode 100644
index 00000000000..e1c6669b11b
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_version_already_merged.groovy
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.NodeType
+
+suite("test_cloud_version_already_merged", "nonConcurrent") {
+    if (!isCloudMode()) {
+        return
+    }
+    def tblName = "test_cloud_version_already_merged"
+    sql """ DROP TABLE IF EXISTS ${tblName} FORCE; """
+    sql """
+            CREATE TABLE IF NOT EXISTS ${tblName} (
+                `k1` int NOT NULL,
+                `c1` int,
+                `c2` int,
+                `c3` int
+                )UNIQUE KEY(k1)
+            DISTRIBUTED BY HASH(k1) BUCKETS 1
+            PROPERTIES (
+                "enable_unique_key_merge_on_write" = "true",
+                "disable_auto_compaction" = "true",
+                "replication_num" = "1");
+        """
+
+    sql "insert into ${tblName} values(1,-1,-1,-1);"
+    sql "insert into ${tblName} values(2,-2,-2,-2);"
+    sql "insert into ${tblName} values(3,-3,-3,-3);"
+    sql "insert into ${tblName} values(4,-4,-4,-4)"
+    sql "insert into ${tblName} values(5,-5,-5,-5)"
+    sql "insert into ${tblName} values(1,1,1,1);"
+    sql "insert into ${tblName} values(2,2,2,2);"
+    sql "insert into ${tblName} values(3,3,3,3);"
+    sql "insert into ${tblName} values(4,4,4,4)"
+    sql "insert into ${tblName} values(5,5,5,5)"
+
+
+    sql "sync;"
+    qt_sql "select * from ${tblName} order by k1;"
+
+    def backends = sql_return_maparray('show backends')
+    def tabletStats = sql_return_maparray("show tablets from ${tblName};")
+    assert tabletStats.size() == 1
+    def tabletId = tabletStats[0].TabletId
+    def tabletBackendId = tabletStats[0].BackendId
+    def tabletBackend
+    for (def be : backends) {
+        if (be.BackendId == tabletBackendId) {
+            tabletBackend = be
+            break;
+        }
+    }
+    logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with 
backendId=${tabletBackend.BackendId}");     
+
+    GetDebugPoint().clearDebugPointsForAllFEs()
+    GetDebugPoint().clearDebugPointsForAllBEs()
+
+    try {
+        GetDebugPoint().enableDebugPoint(tabletBackend.Host, 
tabletBackend.HttpPort as int, NodeType.BE, 
"Tablet::capture_consistent_versions.inject_failure", [tablet_id: tabletId, 
skip_by_option: true])
+        
GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host")
+
+        qt_sql """ SELECT * from ${tblName} ORDER BY k1 """
+
+    } finally {
+        GetDebugPoint().clearDebugPointsForAllFEs()
+        GetDebugPoint().clearDebugPointsForAllBEs()
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("Tablet::capture_consistent_versions.inject_failure",
 [tablet_id: tabletId])
+        
GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host")
+
+        test {
+            sql """ SELECT * from ${tblName} ORDER BY k1 """
+            exception "version already merged, meet error during remote 
capturing rowsets"
+        }
+
+    } finally {
+        GetDebugPoint().clearDebugPointsForAllFEs()
+        GetDebugPoint().clearDebugPointsForAllBEs()
+    }
+
+    try {
+        GetDebugPoint().enableDebugPoint(tabletBackend.Host, 
tabletBackend.HttpPort as int, NodeType.BE, 
"Tablet::capture_consistent_versions.inject_failure", [tablet_id: tabletId, 
skip_by_option: true])
+        
GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host")
+        
GetDebugPoint().enableDebugPointForAllBEs("GetRowsetCntl::start_req_bg.inject_failure");
+
+        test {
+            sql """ SELECT * from ${tblName} ORDER BY k1 """
+            exception "version already merged, meet error during remote 
capturing rowsets"
+        }
+
+    } finally {
+        GetDebugPoint().clearDebugPointsForAllFEs()
+        GetDebugPoint().clearDebugPointsForAllBEs()
+    }
+
+    try {
+        GetDebugPoint().enableDebugPoint(tabletBackend.Host, 
tabletBackend.HttpPort as int, NodeType.BE, 
"Tablet::capture_consistent_versions.inject_failure", [tablet_id: tabletId, 
skip_by_option: true])
+        
GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host")
+        
GetDebugPoint().enableDebugPointForAllBEs("Tablet::_remote_get_rowsets_meta.inject_replica_address_fail");
+
+        test {
+            sql """ SELECT * from ${tblName} ORDER BY k1 """
+            exception "version already merged, meet error during remote 
capturing rowsets"
+        }
+
+    } finally {
+        GetDebugPoint().clearDebugPointsForAllFEs()
+        GetDebugPoint().clearDebugPointsForAllBEs()
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to