This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 96711ec2344 branch-3.1: [opt](cloud) Reduce empty rowset pressure on 
meta service #54395 #55171 #55604 #55742 #55837 (#55934)
96711ec2344 is described below

commit 96711ec234433a9d187b16b7a3f04c95beee1baa
Author: Xin Liao <[email protected]>
AuthorDate: Mon Sep 15 11:58:17 2025 +0800

    branch-3.1: [opt](cloud) Reduce empty rowset pressure on meta service 
#54395 #55171 #55604 #55742 #55837 (#55934)
    
    pick from:#54395 #55171 #55604 #55742 #55837
---
 be/src/cloud/cloud_base_compaction.cpp             |   2 +-
 be/src/cloud/cloud_cumulative_compaction.cpp       |   2 +-
 be/src/cloud/cloud_delta_writer.cpp                |  27 +-
 be/src/cloud/cloud_delta_writer.h                  |   3 +
 be/src/cloud/cloud_full_compaction.cpp             |   2 +-
 be/src/cloud/cloud_meta_mgr.cpp                    | 124 ++++-
 be/src/cloud/cloud_meta_mgr.h                      |  11 +
 be/src/cloud/cloud_rowset_builder.cpp              |   4 +-
 be/src/cloud/cloud_rowset_builder.h                |   6 +
 be/src/cloud/cloud_schema_change_job.cpp           |   1 +
 be/src/cloud/cloud_tablet.cpp                      |   9 +-
 be/src/cloud/cloud_tablets_channel.cpp             |   7 +-
 be/src/cloud/config.cpp                            |   2 +
 be/src/cloud/config.h                              |   3 +
 be/src/common/config.cpp                           |   2 +
 be/src/olap/compaction.cpp                         |  55 ++-
 be/src/olap/compaction.h                           |   4 +
 be/src/olap/rowset/rowset.h                        |  10 +
 be/src/olap/rowset/rowset_meta.h                   |   4 +
 be/test/cloud/cloud_compaction_test.cpp            | 190 +++++++-
 be/test/cloud/cloud_meta_mgr_test.cpp              | 536 +++++++++++++++++++++
 cloud/src/meta-service/meta_service.cpp            |   2 +-
 cloud/src/meta-service/meta_service_job.cpp        |   4 +-
 cloud/src/recycler/recycler.cpp                    |  28 +-
 cloud/test/recycler_test.cpp                       |   1 +
 gensrc/proto/cloud.proto                           |   4 +-
 .../test_compaction_with_empty_rowset.out          | Bin 0 -> 349 bytes
 .../test_schema_change_mow_with_empty_rowset.out   | Bin 0 -> 5653 bytes
 .../pipeline/cloud_p0/conf/be_custom.conf          |   2 +-
 .../test_compaction_with_empty_rowset.groovy       | 101 ++++
 ...test_schema_change_mow_with_empty_rowset.groovy |  80 +++
 31 files changed, 1186 insertions(+), 40 deletions(-)

diff --git a/be/src/cloud/cloud_base_compaction.cpp 
b/be/src/cloud/cloud_base_compaction.cpp
index 5e00721bfc7..bd2d07d41b9 100644
--- a/be/src/cloud/cloud_base_compaction.cpp
+++ b/be/src/cloud/cloud_base_compaction.cpp
@@ -344,7 +344,7 @@ Status CloudBaseCompaction::modify_rowsets() {
     compaction_job->set_size_output_rowsets(_output_rowset->total_disk_size());
     compaction_job->set_num_input_segments(_input_segments);
     compaction_job->set_num_output_segments(_output_rowset->num_segments());
-    compaction_job->set_num_input_rowsets(_input_rowsets.size());
+    compaction_job->set_num_input_rowsets(num_input_rowsets());
     compaction_job->set_num_output_rowsets(1);
     
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
     compaction_job->add_input_versions(_input_rowsets.back()->end_version());
diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp 
b/be/src/cloud/cloud_cumulative_compaction.cpp
index d2ea6ac0e8a..102129bdd89 100644
--- a/be/src/cloud/cloud_cumulative_compaction.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction.cpp
@@ -270,7 +270,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
     compaction_job->set_size_output_rowsets(_output_rowset->total_disk_size());
     compaction_job->set_num_input_segments(_input_segments);
     compaction_job->set_num_output_segments(_output_rowset->num_segments());
-    compaction_job->set_num_input_rowsets(_input_rowsets.size());
+    compaction_job->set_num_input_rowsets(num_input_rowsets());
     compaction_job->set_num_output_rowsets(1);
     
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
     compaction_job->add_input_versions(_input_rowsets.back()->end_version());
diff --git a/be/src/cloud/cloud_delta_writer.cpp 
b/be/src/cloud/cloud_delta_writer.cpp
index 2533028314f..e0f9d203750 100644
--- a/be/src/cloud/cloud_delta_writer.cpp
+++ b/be/src/cloud/cloud_delta_writer.cpp
@@ -20,6 +20,7 @@
 #include "cloud/cloud_meta_mgr.h"
 #include "cloud/cloud_rowset_builder.h"
 #include "cloud/cloud_storage_engine.h"
+#include "cloud/config.h"
 #include "olap/delta_writer.h"
 #include "runtime/thread_context.h"
 
@@ -108,11 +109,31 @@ const RowsetMetaSharedPtr& 
CloudDeltaWriter::rowset_meta() {
 
 Status CloudDeltaWriter::commit_rowset() {
     std::lock_guard<bthread::Mutex> lock(_mtx);
+
+    // Handle empty rowset (no data written)
     if (!_is_init) {
-        // No data to write, but still need to write a empty rowset kv to keep 
version continuous
-        RETURN_IF_ERROR(_rowset_builder->init());
-        RETURN_IF_ERROR(_rowset_builder->build_rowset());
+        return _commit_empty_rowset();
+    }
+
+    // Handle normal rowset with data
+    return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
+}
+
+Status CloudDeltaWriter::_commit_empty_rowset() {
+    // If skip writing empty rowset metadata is enabled,
+    // we do not prepare rowset to meta service.
+    if (config::skip_writing_empty_rowset_metadata) {
+        rowset_builder()->set_skip_writing_rowset_metadata(true);
+    }
+
+    RETURN_IF_ERROR(_rowset_builder->init());
+    RETURN_IF_ERROR(_rowset_builder->build_rowset());
+
+    // If skip writing empty rowset metadata is enabled, we do not commit 
rowset to meta service.
+    if (config::skip_writing_empty_rowset_metadata) {
+        return Status::OK();
     }
+    // write a empty rowset kv to keep version continuous
     return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
 }
 
diff --git a/be/src/cloud/cloud_delta_writer.h 
b/be/src/cloud/cloud_delta_writer.h
index 4558b04acd1..005be63feb2 100644
--- a/be/src/cloud/cloud_delta_writer.h
+++ b/be/src/cloud/cloud_delta_writer.h
@@ -58,6 +58,9 @@ private:
     // Convert `_rowset_builder` from `BaseRowsetBuilder` to 
`CloudRowsetBuilder`
     CloudRowsetBuilder* rowset_builder();
 
+    // Handle commit for empty rowset (when no data is written)
+    Status _commit_empty_rowset();
+
     bthread::Mutex _mtx;
     CloudStorageEngine& _engine;
     QueryThreadContext _query_thread_context;
diff --git a/be/src/cloud/cloud_full_compaction.cpp 
b/be/src/cloud/cloud_full_compaction.cpp
index ac60c869154..d0d756cb7ea 100644
--- a/be/src/cloud/cloud_full_compaction.cpp
+++ b/be/src/cloud/cloud_full_compaction.cpp
@@ -235,7 +235,7 @@ Status CloudFullCompaction::modify_rowsets() {
     })
     compaction_job->set_num_input_segments(_input_segments);
     compaction_job->set_num_output_segments(_output_rowset->num_segments());
-    compaction_job->set_num_input_rowsets(_input_rowsets.size());
+    compaction_job->set_num_input_rowsets(num_input_rowsets());
     compaction_job->set_num_output_rowsets(1);
     
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
     compaction_job->add_input_versions(_input_rowsets.back()->end_version());
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index fbaf2daf9a3..1364cab47fb 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -160,6 +160,7 @@ bvar::Window<bvar::Adder<uint64_t>> 
g_cloud_ms_rpc_timeout_count_window(
         "cloud_meta_mgr_rpc_timeout_qps", &g_cloud_meta_mgr_rpc_timeout_count, 
30);
 bvar::LatencyRecorder g_cloud_be_mow_get_dbm_lock_backoff_sleep_time(
         "cloud_be_mow_get_dbm_lock_backoff_sleep_time");
+bvar::Adder<uint64_t> 
g_cloud_version_hole_filled_count("cloud_version_hole_filled_count");
 
 class MetaServiceProxy {
 public:
@@ -768,6 +769,12 @@ Status 
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
                 tablet->add_rowsets(std::move(rowsets), version_overlap, 
wlock, warmup_delta_data);
                 RETURN_IF_ERROR(tablet->merge_rowsets_schema());
             }
+
+            // Fill version holes
+            int64_t partition_max_version =
+                    resp.has_partition_max_version() ? 
resp.partition_max_version() : -1;
+            RETURN_IF_ERROR(fill_version_holes(tablet, partition_max_version, 
wlock));
+
             tablet->last_base_compaction_success_time_ms = 
stats.last_base_compaction_time_ms();
             tablet->last_cumu_compaction_success_time_ms = 
stats.last_cumu_compaction_time_ms();
             tablet->set_base_compaction_cnt(stats.base_compaction_cnt());
@@ -1675,5 +1682,120 @@ Status CloudMetaMgr::get_schema_dict(int64_t index_id,
     return Status::OK();
 }
 
-#include "common/compile_check_end.h"
+Status CloudMetaMgr::fill_version_holes(CloudTablet* tablet, int64_t 
max_version,
+                                        std::unique_lock<std::shared_mutex>& 
wlock) {
+    if (max_version <= 0) {
+        return Status::OK();
+    }
+
+    Versions existing_versions;
+    for (const auto& rs : tablet->tablet_meta()->all_rs_metas()) {
+        existing_versions.emplace_back(rs->version());
+    }
+
+    // If there are no existing versions, it may be a new tablet for restore, 
so skip filling holes.
+    if (existing_versions.empty()) {
+        return Status::OK();
+    }
+
+    std::vector<RowsetSharedPtr> hole_rowsets;
+    // sort the existing versions in ascending order
+    std::sort(existing_versions.begin(), existing_versions.end(),
+              [](const Version& a, const Version& b) {
+                  // simple because 2 versions are certainly not overlapping
+                  return a.first < b.first;
+              });
+
+    int64_t last_version = -1;
+    for (const Version& version : existing_versions) {
+        // missing versions are those that are not in the existing_versions
+        if (version.first > last_version + 1) {
+            // there is a hole between versions
+            auto prev_non_hole_rowset = tablet->get_rowset_by_version(version);
+            for (int64_t ver = last_version + 1; ver < version.first; ++ver) {
+                RowsetSharedPtr hole_rowset;
+                RETURN_IF_ERROR(create_empty_rowset_for_hole(
+                        tablet, ver, prev_non_hole_rowset->rowset_meta(), 
&hole_rowset));
+                hole_rowsets.push_back(hole_rowset);
+            }
+            LOG(INFO) << "Created empty rowset for version hole, from " << 
last_version + 1
+                      << " to " << version.first - 1 << " for tablet " << 
tablet->tablet_id();
+        }
+        last_version = version.second;
+    }
+
+    if (last_version + 1 <= max_version) {
+        LOG(INFO) << "Created empty rowset for version hole, from " << 
last_version + 1 << " to "
+                  << max_version << " for tablet " << tablet->tablet_id();
+        for (; last_version + 1 <= max_version; ++last_version) {
+            RowsetSharedPtr hole_rowset;
+            auto prev_non_hole_rowset = 
tablet->get_rowset_by_version(existing_versions.back());
+            RETURN_IF_ERROR(create_empty_rowset_for_hole(
+                    tablet, last_version + 1, 
prev_non_hole_rowset->rowset_meta(), &hole_rowset));
+            hole_rowsets.push_back(hole_rowset);
+        }
+    }
+
+    if (!hole_rowsets.empty()) {
+        size_t hole_count = hole_rowsets.size();
+        tablet->add_rowsets(std::move(hole_rowsets), false, wlock, false);
+        g_cloud_version_hole_filled_count << hole_count;
+    }
+    return Status::OK();
+}
+
+Status CloudMetaMgr::create_empty_rowset_for_hole(CloudTablet* tablet, int64_t 
version,
+                                                  RowsetMetaSharedPtr 
prev_rowset_meta,
+                                                  RowsetSharedPtr* rowset) {
+    // Create a RowsetMeta for the empty rowset
+    auto rs_meta = std::make_shared<RowsetMeta>();
+
+    // Generate a deterministic rowset ID for the hole (same tablet_id + 
version = same rowset_id)
+    RowsetId hole_rowset_id;
+    hole_rowset_id.init(2, 0, tablet->tablet_id(), version);
+    rs_meta->set_rowset_id(hole_rowset_id);
+
+    // Generate a deterministic load_id for the hole rowset (same tablet_id + 
version = same load_id)
+    PUniqueId load_id;
+    load_id.set_hi(tablet->tablet_id());
+    load_id.set_lo(version);
+    rs_meta->set_load_id(load_id);
+
+    // Copy schema and other metadata from template
+    rs_meta->set_tablet_schema(prev_rowset_meta->tablet_schema());
+    rs_meta->set_rowset_type(prev_rowset_meta->rowset_type());
+    rs_meta->set_tablet_schema_hash(prev_rowset_meta->tablet_schema_hash());
+    rs_meta->set_resource_id(prev_rowset_meta->resource_id());
+
+    // Basic tablet information
+    rs_meta->set_tablet_id(tablet->tablet_id());
+    rs_meta->set_index_id(tablet->index_id());
+    rs_meta->set_partition_id(tablet->partition_id());
+    rs_meta->set_tablet_uid(tablet->tablet_uid());
+    rs_meta->set_version(Version(version, version));
+    rs_meta->set_txn_id(version);
+
+    rs_meta->set_num_rows(0);
+    rs_meta->set_total_disk_size(0);
+    rs_meta->set_data_disk_size(0);
+    rs_meta->set_index_disk_size(0);
+    rs_meta->set_empty(true);
+    rs_meta->set_num_segments(0);
+    rs_meta->set_segments_overlap(NONOVERLAPPING);
+    rs_meta->set_rowset_state(VISIBLE);
+    rs_meta->set_creation_time(UnixSeconds());
+    rs_meta->set_newest_write_timestamp(UnixSeconds());
+
+    Status s = RowsetFactory::create_rowset(nullptr, "", rs_meta, rowset);
+    if (!s.ok()) {
+        LOG_WARNING("Failed to create empty rowset for hole")
+                .tag("tablet_id", tablet->tablet_id())
+                .tag("version", version)
+                .error(s);
+        return s;
+    }
+    (*rowset)->set_hole_rowset(true);
+
+    return Status::OK();
+}
 } // namespace doris::cloud
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index 2794a0a815e..bd3acbf8215 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -26,6 +26,7 @@
 #include <vector>
 
 #include "common/status.h"
+#include "olap/rowset/rowset_fwd.h"
 #include "olap/rowset/rowset_meta.h"
 #include "util/s3_util.h"
 
@@ -151,6 +152,15 @@ public:
     void remove_delete_bitmap_update_lock(int64_t table_id, int64_t lock_id, 
int64_t initiator,
                                           int64_t tablet_id);
 
+    // Fill version holes by creating empty rowsets for missing versions
+    Status fill_version_holes(CloudTablet* tablet, int64_t max_version,
+                              std::unique_lock<std::shared_mutex>& wlock);
+
+    // Create an empty rowset to fill a version hole
+    Status create_empty_rowset_for_hole(CloudTablet* tablet, int64_t version,
+                                        RowsetMetaSharedPtr prev_rowset_meta,
+                                        RowsetSharedPtr* rowset);
+
 private:
     bool sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64_t 
old_max_version,
                                             std::ranges::range auto&& rs_metas,
@@ -160,6 +170,7 @@ private:
                                      std::ranges::range auto&& rs_metas, const 
TabletStatsPB& stats,
                                      const TabletIndexPB& idx, DeleteBitmap* 
delete_bitmap,
                                      bool full_sync = false, SyncRowsetStats* 
sync_stats = nullptr);
+
     void check_table_size_correctness(const RowsetMeta& rs_meta);
     int64_t get_segment_file_size(const RowsetMeta& rs_meta);
     int64_t get_inverted_index_file_szie(const RowsetMeta& rs_meta);
diff --git a/be/src/cloud/cloud_rowset_builder.cpp 
b/be/src/cloud/cloud_rowset_builder.cpp
index 3da6a55aa44..a8c0ad87980 100644
--- a/be/src/cloud/cloud_rowset_builder.cpp
+++ b/be/src/cloud/cloud_rowset_builder.cpp
@@ -80,7 +80,9 @@ Status CloudRowsetBuilder::init() {
 
     _calc_delete_bitmap_token = 
_engine.calc_delete_bitmap_executor()->create_token();
 
-    
RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta(),
 ""));
+    if (!_skip_writing_rowset_metadata) {
+        
RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta(),
 ""));
+    }
 
     _is_init = true;
     return Status::OK();
diff --git a/be/src/cloud/cloud_rowset_builder.h 
b/be/src/cloud/cloud_rowset_builder.h
index 05e24e66382..afa5d7c7574 100644
--- a/be/src/cloud/cloud_rowset_builder.h
+++ b/be/src/cloud/cloud_rowset_builder.h
@@ -39,6 +39,8 @@ public:
 
     Status set_txn_related_delete_bitmap();
 
+    void set_skip_writing_rowset_metadata(bool skip) { 
_skip_writing_rowset_metadata = skip; }
+
 private:
     // Convert `_tablet` from `BaseTablet` to `CloudTablet`
     CloudTablet* cloud_tablet();
@@ -46,6 +48,10 @@ private:
     Status check_tablet_version_count();
 
     CloudStorageEngine& _engine;
+
+    // whether to skip writing rowset metadata to meta service.
+    // This is used for empty rowset when 
config::skip_writing_empty_rowset_metadata is true.
+    bool _skip_writing_rowset_metadata = false;
 };
 
 } // namespace doris
diff --git a/be/src/cloud/cloud_schema_change_job.cpp 
b/be/src/cloud/cloud_schema_change_job.cpp
index 9e8d3f85a08..ec4c8224050 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -459,6 +459,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t 
alter_version,
     
RETURN_IF_ERROR(_cloud_storage_engine.register_compaction_stop_token(_new_tablet,
 initiator));
     TabletMetaSharedPtr tmp_meta = 
std::make_shared<TabletMeta>(*(_new_tablet->tablet_meta()));
     tmp_meta->delete_bitmap()->delete_bitmap.clear();
+    tmp_meta->clear_rowsets();
     std::shared_ptr<CloudTablet> tmp_tablet =
             std::make_shared<CloudTablet>(_cloud_storage_engine, tmp_meta);
     {
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 23232fb1563..d38ae02ae25 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -661,7 +661,6 @@ std::vector<RecycledRowsets> 
CloudTablet::recycle_cached_data(
 
 void CloudTablet::reset_approximate_stats(int64_t num_rowsets, int64_t 
num_segments,
                                           int64_t num_rows, int64_t data_size) 
{
-    _approximate_num_rowsets.store(num_rowsets, std::memory_order_relaxed);
     _approximate_num_segments.store(num_segments, std::memory_order_relaxed);
     _approximate_num_rows.store(num_rows, std::memory_order_relaxed);
     _approximate_data_size.store(data_size, std::memory_order_relaxed);
@@ -672,10 +671,16 @@ void CloudTablet::reset_approximate_stats(int64_t 
num_rowsets, int64_t num_segme
         if (v.second < cp) {
             continue;
         }
-
         cumu_num_deltas += r->is_segments_overlapping() ? r->num_segments() : 
1;
         ++cumu_num_rowsets;
     }
+    // num_rowsets may be less than the size of _rs_version_map when there are 
some hole rowsets
+    // in the version map, so we use the max value to ensure that the 
approximate number
+    // of rowsets is at least the size of _rs_version_map.
+    // Note that this is not the exact number of rowsets, but an approximate 
number.
+    int64_t approximate_num_rowsets =
+            std::max(num_rowsets, 
static_cast<int64_t>(_rs_version_map.size()));
+    _approximate_num_rowsets.store(approximate_num_rowsets, 
std::memory_order_relaxed);
     _approximate_cumu_num_rowsets.store(cumu_num_rowsets, 
std::memory_order_relaxed);
     _approximate_cumu_num_deltas.store(cumu_num_deltas, 
std::memory_order_relaxed);
 }
diff --git a/be/src/cloud/cloud_tablets_channel.cpp 
b/be/src/cloud/cloud_tablets_channel.cpp
index c38680a81dc..1f99fd53a8b 100644
--- a/be/src/cloud/cloud_tablets_channel.cpp
+++ b/be/src/cloud/cloud_tablets_channel.cpp
@@ -20,6 +20,7 @@
 #include "cloud/cloud_delta_writer.h"
 #include "cloud/cloud_meta_mgr.h"
 #include "cloud/cloud_storage_engine.h"
+#include "cloud/config.h"
 #include "olap/delta_writer.h"
 #include "runtime/tablets_channel.h"
 
@@ -59,6 +60,7 @@ Status CloudTabletsChannel::add_batch(const 
PTabletWriterAddBlockRequest& reques
     _build_tablet_to_rowidxs(request, &tablet_to_rowidxs);
 
     std::unordered_set<int64_t> partition_ids;
+    std::vector<CloudDeltaWriter*> writers;
     {
         // add_batch may concurrency with inc_open but not under _lock.
         // so need to protect it with _tablet_writers_lock.
@@ -69,8 +71,11 @@ Status CloudTabletsChannel::add_batch(const 
PTabletWriterAddBlockRequest& reques
                 return Status::InternalError("unknown tablet to append data, 
tablet={}", tablet_id);
             }
             partition_ids.insert(tablet_writer_it->second->partition_id());
+            
writers.push_back(static_cast<CloudDeltaWriter*>(tablet_writer_it->second.get()));
         }
-        if (!partition_ids.empty()) {
+        if (config::skip_writing_empty_rowset_metadata && !writers.empty()) {
+            RETURN_IF_ERROR(CloudDeltaWriter::batch_init(writers));
+        } else if (!partition_ids.empty()) {
             RETURN_IF_ERROR(_init_writers_by_partition_ids(partition_ids));
         }
     }
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index 3db8a758b27..16d6aa7f782 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -90,6 +90,8 @@ DEFINE_mInt32(meta_service_conflict_error_retry_times, "10");
 
 DEFINE_Bool(enable_check_storage_vault, "true");
 
+DEFINE_mBool(skip_writing_empty_rowset_metadata, "false");
+
 DEFINE_mInt64(warmup_tablet_replica_info_cache_ttl_sec, "600");
 
 DEFINE_mInt64(warm_up_rowset_slow_log_ms, "1000");
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index 18a6097c56b..a52ac758e67 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -107,6 +107,9 @@ DECLARE_mInt32(delete_bitmap_lock_expiration_seconds);
 
 DECLARE_mInt32(get_delete_bitmap_lock_max_retry_times);
 
+// Skip writing empty rowset metadata to meta service
+DECLARE_mBool(skip_writing_empty_rowset_metadata);
+
 // enable large txn lazy commit in meta-service `commit_txn`
 DECLARE_mBool(enable_cloud_txn_lazy_commit);
 
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index aba7561d1a3..243cd65351a 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -2018,6 +2018,8 @@ Status set_fuzzy_configs() {
             ((distribution(*generator) % 2) == 0) ? "10" : "4294967295";
     fuzzy_field_and_value["max_segment_partial_column_cache_size"] =
             ((distribution(*generator) % 2) == 0) ? "5" : "10";
+    fuzzy_field_and_value["skip_writing_empty_rowset_metadata"] =
+            ((distribution(*generator) % 2) == 0) ? "true" : "false";
 
     std::uniform_int_distribution<int64_t> distribution2(-2, 10);
     fuzzy_field_and_value["segments_key_bounds_truncation_threshold"] =
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index a3ccc407078..34ddf37e5e3 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -1431,6 +1431,46 @@ Status CloudCompactionMixin::modify_rowsets() {
     return Status::OK();
 }
 
+Status 
CloudCompactionMixin::set_storage_resource_from_input_rowsets(RowsetWriterContext&
 ctx) {
+    // Set storage resource from input rowsets by iterating backwards to find 
the first rowset
+    // with non-empty resource_id. This handles two scenarios:
+    // 1. Hole rowsets compaction: Multiple hole rowsets may lack storage 
resource.
+    //    Example: [0-1, 2-2, 3-3, 4-4, 5-5] where 2-5 are hole rowsets.
+    //    If 0-1 lacks resource_id, then 2-5 also lack resource_id.
+    // 2. Schema change: New tablet may have later version empty rowsets 
without resource_id,
+    //    but middle rowsets get resource_id after historical rowsets are 
converted.
+    //    We iterate backwards to find the most recent rowset with valid 
resource_id.
+
+    for (const auto& rowset : std::ranges::reverse_view(_input_rowsets)) {
+        const auto& resource_id = rowset->rowset_meta()->resource_id();
+
+        if (!resource_id.empty()) {
+            ctx.storage_resource = 
*DORIS_TRY(rowset->rowset_meta()->remote_storage_resource());
+            return Status::OK();
+        }
+
+        // Validate that non-empty rowsets (num_segments > 0) must have valid 
resource_id
+        // Only hole rowsets or empty rowsets are allowed to have empty 
resource_id
+        if (rowset->num_segments() > 0) {
+            auto error_msg = fmt::format(
+                    "Non-empty rowset must have valid resource_id. "
+                    "rowset_id={}, version=[{}-{}], is_hole_rowset={}, 
num_segments={}, "
+                    "tablet_id={}, table_id={}",
+                    rowset->rowset_id().to_string(), rowset->start_version(), 
rowset->end_version(),
+                    rowset->is_hole_rowset(), rowset->num_segments(), 
_tablet->tablet_id(),
+                    _tablet->table_id());
+
+#ifndef BE_TEST
+            DCHECK(false) << error_msg;
+#endif
+
+            return Status::InternalError<false>(error_msg);
+        }
+    }
+
+    return Status::OK();
+}
+
 Status 
CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx) {
     // only do index compaction for dup_keys and unique_keys with mow enabled
     if (config::inverted_index_compaction_enable &&
@@ -1440,9 +1480,8 @@ Status 
CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
         construct_index_compaction_columns(ctx);
     }
 
-    // Use the storage resource of the previous rowset
-    ctx.storage_resource =
-            
*DORIS_TRY(_input_rowsets.back()->rowset_meta()->remote_storage_resource());
+    // Use the storage resource of the previous rowset.
+    RETURN_IF_ERROR(set_storage_resource_from_input_rowsets(ctx));
 
     ctx.txn_id = 
boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) &
                  std::numeric_limits<int64_t>::max(); // MUST be positive
@@ -1492,4 +1531,14 @@ Status CloudCompactionMixin::garbage_collection() {
     return Status::OK();
 }
 
+// should skip hole rowsets, ortherwise the count will be wrong in ms
+int64_t CloudCompactionMixin::num_input_rowsets() const {
+    int64_t count = 0;
+    for (const auto& r : _input_rowsets) {
+        if (!r->is_hole_rowset()) {
+            count++;
+        }
+    }
+    return count;
+}
 } // namespace doris
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index de295a8e7ff..f2b9e86c4bb 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -189,6 +189,8 @@ public:
 
     int64_t initiator() const;
 
+    int64_t num_input_rowsets() const;
+
 protected:
     CloudTablet* cloud_tablet() { return 
static_cast<CloudTablet*>(_tablet.get()); }
 
@@ -203,6 +205,8 @@ protected:
 private:
     Status construct_output_rowset_writer(RowsetWriterContext& ctx) override;
 
+    Status set_storage_resource_from_input_rowsets(RowsetWriterContext& ctx);
+
     Status execute_compact_impl(int64_t permits);
 
     Status build_basic_info();
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 3d2a3d965f0..9e4ceb870cc 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -318,6 +318,11 @@ public:
 
     std::vector<std::string> get_index_file_names();
 
+    // check if the rowset is a hole rowset
+    bool is_hole_rowset() const { return _is_hole_rowset; }
+    // set the rowset as a hole rowset
+    void set_hole_rowset(bool is_hole_rowset) { _is_hole_rowset = 
is_hole_rowset; }
+
 protected:
     friend class RowsetFactory;
 
@@ -361,6 +366,11 @@ protected:
 
     // <column_uniq_id>, skip index compaction
     std::set<int32_t> skip_index_compaction;
+
+    // only used for cloud mode, it indicates whether this rowset is a hole 
rowset.
+    // a hole rowset is a rowset that has no data, but is used to fill the 
version gap
+    // it is used to ensure that the version sequence is continuous.
+    bool _is_hole_rowset = false;
 };
 
 // `rs_metas` MUST already be sorted by `RowsetMeta::comparator`
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 1ec5b30f0fe..8d79037b87c 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -68,6 +68,10 @@ public:
 
     const std::string& resource_id() const { return 
_rowset_meta_pb.resource_id(); }
 
+    void set_resource_id(const std::string& resource_id) {
+        _rowset_meta_pb.set_resource_id(resource_id);
+    }
+
     bool is_local() const { return !_rowset_meta_pb.has_resource_id(); }
 
     bool has_variant_type_in_schema() const;
diff --git a/be/test/cloud/cloud_compaction_test.cpp 
b/be/test/cloud/cloud_compaction_test.cpp
index 37847025c9e..049da4b15ae 100644
--- a/be/test/cloud/cloud_compaction_test.cpp
+++ b/be/test/cloud/cloud_compaction_test.cpp
@@ -27,11 +27,11 @@
 #include "cloud/cloud_storage_engine.h"
 #include "cloud/cloud_tablet.h"
 #include "cloud/cloud_tablet_mgr.h"
-#include "gtest/gtest_pred_impl.h"
 #include "json2pb/json_to_pb.h"
 #include "olap/olap_common.h"
 #include "olap/rowset/rowset_factory.h"
 #include "olap/rowset/rowset_meta.h"
+#include "olap/storage_policy.h"
 #include "olap/tablet_meta.h"
 #include "util/uid_util.h"
 
@@ -197,4 +197,192 @@ TEST_F(CloudCompactionTest, 
failure_cumu_compaction_tablet_sleep_test) {
     ASSERT_EQ(st, Status::OK());
     ASSERT_EQ(tablets.size(), 0);
 }
+
+static RowsetSharedPtr create_rowset(Version version, int num_segments, bool 
overlapping,
+                                     int data_size) {
+    auto rs_meta = std::make_shared<RowsetMeta>();
+    rs_meta->set_rowset_type(BETA_ROWSET); // important
+    rs_meta->_rowset_meta_pb.set_start_version(version.first);
+    rs_meta->_rowset_meta_pb.set_end_version(version.second);
+    rs_meta->set_num_segments(num_segments);
+    rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING);
+    rs_meta->set_total_disk_size(data_size);
+    RowsetSharedPtr rowset;
+    Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta, &rowset);
+    if (!st.ok()) {
+        return nullptr;
+    }
+    return rowset;
+}
+
+class TestableCloudCompaction : public CloudCompactionMixin {
+public:
+    TestableCloudCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet)
+            : CloudCompactionMixin(engine, tablet, "test_compaction") {}
+
+    // Set input rowsets for testing
+    void set_input_rowsets(const std::vector<RowsetSharedPtr>& rowsets) {
+        _input_rowsets = rowsets;
+    }
+
+    Status prepare_compact() override { return Status::OK(); }
+
+    ReaderType compaction_type() const override { return 
ReaderType::READER_CUMULATIVE_COMPACTION; }
+
+    std::string_view compaction_name() const override { return 
"test_compaction"; }
+};
+
+TEST_F(CloudCompactionTest, test_set_storage_resource_from_input_rowsets) {
+    S3Conf s3_conf {.bucket = "bucket",
+                    .prefix = "prefix",
+                    .client_conf = {
+                            .endpoint = "endpoint",
+                            .region = "region",
+                            .ak = "ak",
+                            .sk = "sk",
+                            .token = "",
+                            .bucket = "",
+                            .role_arn = "",
+                            .external_id = "",
+                    }};
+    std::string resource_id = "10000";
+    auto res = io::S3FileSystem::create(std::move(s3_conf), resource_id);
+    ASSERT_TRUE(res.has_value()) << res.error();
+    auto fs = res.value();
+    StorageResource storage_resource(fs);
+
+    CloudTabletSPtr tablet = std::make_shared<CloudTablet>(_engine, 
_tablet_meta);
+    TestableCloudCompaction compaction(_engine, tablet);
+
+    // Test case 1: All rowsets are empty (num_segments = 0) - should succeed
+    {
+        std::vector<RowsetSharedPtr> rowsets;
+
+        RowsetSharedPtr rowset1 = create_rowset(Version(2, 2), 0, false, 41);
+        ASSERT_TRUE(rowset1 != nullptr);
+        rowset1->set_hole_rowset(true); // Mark as hole rowset since 
num_segments=0
+        rowsets.push_back(rowset1);
+
+        RowsetSharedPtr rowset2 = create_rowset(Version(3, 3), 0, false, 41);
+        ASSERT_TRUE(rowset2 != nullptr);
+        rowset2->set_hole_rowset(true); // Mark as hole rowset since 
num_segments=0
+        rowsets.push_back(rowset2);
+
+        compaction.set_input_rowsets(rowsets);
+
+        RowsetWriterContext ctx;
+        Status st = compaction.set_storage_resource_from_input_rowsets(ctx);
+        ASSERT_TRUE(st.ok()) << st.to_string();
+        // No storage resource should be set since no rowset has resource_id
+        ASSERT_FALSE(ctx.storage_resource.has_value());
+    }
+
+    // Test case 2: Backward iteration - last rowset has resource_id
+    {
+        std::vector<RowsetSharedPtr> rowsets;
+
+        // First rowset: empty, no resource_id
+        RowsetSharedPtr rowset1 = create_rowset(Version(2, 2), 0, false, 41);
+        ASSERT_TRUE(rowset1 != nullptr);
+        rowset1->set_hole_rowset(true);
+        rowsets.push_back(rowset1);
+
+        // Second rowset: empty, no resource_id
+        RowsetSharedPtr rowset2 = create_rowset(Version(3, 3), 0, false, 41);
+        ASSERT_TRUE(rowset2 != nullptr);
+        rowset2->set_hole_rowset(true);
+        rowsets.push_back(rowset2);
+
+        // Third rowset: has resource_id (should be found during backward 
iteration)
+        RowsetSharedPtr rowset3 = create_rowset(Version(4, 4), 1, false, 41);
+        ASSERT_TRUE(rowset3 != nullptr);
+        rowset3->rowset_meta()->set_remote_storage_resource(storage_resource);
+        rowsets.push_back(rowset3);
+
+        compaction.set_input_rowsets(rowsets);
+
+        RowsetWriterContext ctx;
+        Status st = compaction.set_storage_resource_from_input_rowsets(ctx);
+        ASSERT_TRUE(st.ok()) << st.to_string();
+        // Storage resource should be set from rowset3
+        ASSERT_TRUE(ctx.storage_resource.has_value());
+    }
+
+    // Test case 3: Multiple rowsets with resource_id - should use the last 
one (backward iteration)
+    {
+        std::vector<RowsetSharedPtr> rowsets;
+
+        // First rowset: has resource_id
+        RowsetSharedPtr rowset1 = create_rowset(Version(2, 2), 1, false, 41);
+        ASSERT_TRUE(rowset1 != nullptr);
+        StorageResource first_resource(fs);
+        rowset1->rowset_meta()->set_remote_storage_resource(first_resource);
+        rowsets.push_back(rowset1);
+
+        // Second rowset: empty, no resource_id
+        RowsetSharedPtr rowset2 = create_rowset(Version(3, 3), 0, false, 41);
+        ASSERT_TRUE(rowset2 != nullptr);
+        rowset2->set_hole_rowset(true);
+        rowsets.push_back(rowset2);
+
+        // Third rowset: has different resource_id (should be used due to 
backward iteration)
+        RowsetSharedPtr rowset3 = create_rowset(Version(4, 4), 1, false, 41);
+        ASSERT_TRUE(rowset3 != nullptr);
+        rowset3->rowset_meta()->set_remote_storage_resource(storage_resource);
+        rowsets.push_back(rowset3);
+
+        compaction.set_input_rowsets(rowsets);
+
+        RowsetWriterContext ctx;
+        Status st = compaction.set_storage_resource_from_input_rowsets(ctx);
+        ASSERT_TRUE(st.ok()) << st.to_string();
+        // Storage resource should be set from rowset3 (last one with 
resource_id)
+        ASSERT_TRUE(ctx.storage_resource.has_value());
+    }
+
+    // Test case 4: Non-empty rowset in the middle without resource_id - 
should fail
+    {
+        std::vector<RowsetSharedPtr> rowsets;
+
+        // First rowset: has resource_id
+        RowsetSharedPtr rowset1 = create_rowset(Version(2, 2), 1, false, 41);
+        ASSERT_TRUE(rowset1 != nullptr);
+        rowset1->rowset_meta()->set_remote_storage_resource(storage_resource);
+        rowsets.push_back(rowset1);
+
+        // Second rowset: non-empty but no resource_id (invalid)
+        RowsetSharedPtr rowset2 = create_rowset(Version(3, 3), 2, false, 41);
+        ASSERT_TRUE(rowset2 != nullptr);
+        // Intentionally don't set resource_id
+        rowsets.push_back(rowset2);
+
+        // Third rowset: empty, no resource_id
+        RowsetSharedPtr rowset3 = create_rowset(Version(4, 4), 0, false, 41);
+        ASSERT_TRUE(rowset3 != nullptr);
+        rowset3->set_hole_rowset(true); // Mark as hole rowset since 
num_segments=0
+        rowsets.push_back(rowset3);
+
+        compaction.set_input_rowsets(rowsets);
+
+        RowsetWriterContext ctx;
+        Status st = compaction.set_storage_resource_from_input_rowsets(ctx);
+        ASSERT_TRUE(st.is<ErrorCode::INTERNAL_ERROR>());
+        ASSERT_TRUE(st.to_string().find("Non-empty rowset must have valid 
resource_id") !=
+                    std::string::npos)
+                << st.to_string();
+    }
+
+    // Test case 5: Empty input rowsets - should succeed
+    {
+        std::vector<RowsetSharedPtr> rowsets; // Empty vector
+
+        compaction.set_input_rowsets(rowsets);
+
+        RowsetWriterContext ctx;
+        Status st = compaction.set_storage_resource_from_input_rowsets(ctx);
+        ASSERT_TRUE(st.ok()) << st.to_string();
+        // No storage resource should be set
+        ASSERT_FALSE(ctx.storage_resource.has_value());
+    }
+}
 } // namespace doris
diff --git a/be/test/cloud/cloud_meta_mgr_test.cpp 
b/be/test/cloud/cloud_meta_mgr_test.cpp
index b938d949553..b925dd6f645 100644
--- a/be/test/cloud/cloud_meta_mgr_test.cpp
+++ b/be/test/cloud/cloud_meta_mgr_test.cpp
@@ -21,6 +21,15 @@
 
 #include <chrono>
 #include <memory>
+#include <set>
+
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/cloud_tablet.h"
+#include "olap/olap_common.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/tablet_meta.h"
+#include "util/uid_util.h"
 
 namespace doris {
 using namespace cloud;
@@ -88,4 +97,531 @@ TEST_F(CloudMetaMgrTest, bthread_fork_join_test) {
     // clang-format on
 }
 
+TEST_F(CloudMetaMgrTest, test_fill_version_holes_no_holes) {
+    CloudStorageEngine engine(EngineOptions {});
+    CloudMetaMgr meta_mgr;
+
+    TabletMetaSharedPtr tablet_meta(
+            new TabletMeta(1001, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, 
{{7, 8}},
+                           UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, 
TCompressionType::LZ4F));
+    auto tablet = std::make_shared<CloudTablet>(engine, 
std::make_shared<TabletMeta>(*tablet_meta));
+
+    // Add consecutive versions: 0, 1, 2, 3, 4
+    std::vector<RowsetSharedPtr> rowsets;
+    for (int64_t version = 0; version <= 4; ++version) {
+        auto rs_meta = std::make_shared<RowsetMeta>();
+        rs_meta->set_tablet_id(1001);
+        rs_meta->set_index_id(2);
+        rs_meta->set_partition_id(15673);
+        rs_meta->set_tablet_uid(UniqueId(9, 10));
+        rs_meta->set_version(Version(version, version));
+        rs_meta->set_rowset_type(BETA_ROWSET);
+        rs_meta->set_rowset_id(engine.next_rowset_id());
+        rs_meta->set_num_rows(100);
+        rs_meta->set_empty(false);
+        rs_meta->set_tablet_schema(tablet->tablet_schema());
+
+        // Create rowset and add it to tablet
+        RowsetSharedPtr rowset;
+        auto status = RowsetFactory::create_rowset(tablet->tablet_schema(), 
tablet->tablet_path(),
+                                                   rs_meta, &rowset);
+        EXPECT_TRUE(status.ok());
+        rowsets.push_back(rowset);
+    }
+
+    // Add all rowsets to tablet
+    {
+        std::unique_lock<std::shared_mutex> lock(tablet->get_header_lock());
+        tablet->add_rowsets(std::move(rowsets), false, lock, false);
+    }
+
+    // Test fill_version_holes directly - should not add any rowsets since 
there are no holes
+    std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
+    Status status = meta_mgr.fill_version_holes(tablet.get(), 4, wlock);
+    EXPECT_TRUE(status.ok());
+
+    // Verify tablet still has the same number of rowsets (no holes to fill)
+    EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 5);
+    // Verify rows number is correct
+    for (const auto& rs_meta : tablet->tablet_meta()->all_rs_metas()) {
+        EXPECT_EQ(rs_meta->num_rows(), 100);
+    }
+}
+
+TEST_F(CloudMetaMgrTest, test_fill_version_holes_with_holes) {
+    CloudStorageEngine engine(EngineOptions {});
+    CloudMetaMgr meta_mgr;
+
+    TabletMetaSharedPtr tablet_meta(
+            new TabletMeta(1001, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, 
{{7, 8}},
+                           UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, 
TCompressionType::LZ4F));
+    auto tablet = std::make_shared<CloudTablet>(engine, 
std::make_shared<TabletMeta>(*tablet_meta));
+
+    // Add non-consecutive versions: 0, 2, 4 (missing 1, 3)
+    std::vector<int64_t> versions = {0, 2, 4};
+    std::vector<RowsetSharedPtr> rowsets;
+    for (int64_t version : versions) {
+        auto rs_meta = std::make_shared<RowsetMeta>();
+        rs_meta->set_tablet_id(1001);
+        rs_meta->set_index_id(2);
+        rs_meta->set_partition_id(15673);
+        rs_meta->set_tablet_uid(UniqueId(9, 10));
+        rs_meta->set_version(Version(version, version));
+        rs_meta->set_rowset_type(BETA_ROWSET);
+        rs_meta->set_rowset_id(engine.next_rowset_id());
+        rs_meta->set_num_rows(100);
+        rs_meta->set_empty(false);
+        rs_meta->set_tablet_schema(tablet->tablet_schema());
+
+        // Create rowset and add it to list
+        RowsetSharedPtr rowset;
+        auto status = RowsetFactory::create_rowset(tablet->tablet_schema(), 
tablet->tablet_path(),
+                                                   rs_meta, &rowset);
+        EXPECT_TRUE(status.ok());
+        rowsets.push_back(rowset);
+    }
+
+    // Add all rowsets to tablet
+    {
+        std::unique_lock<std::shared_mutex> lock(tablet->get_header_lock());
+        tablet->add_rowsets(std::move(rowsets), false, lock, false);
+    }
+
+    // Initially we have 3 rowsets (versions 0, 2, 4)
+    EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 3);
+
+    // Test fill_version_holes directly to fill missing versions 1 and 3
+    std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
+    Status status = meta_mgr.fill_version_holes(tablet.get(), 4, wlock);
+    EXPECT_TRUE(status.ok());
+
+    // After filling holes, we should have 5 rowsets (versions 0, 1, 2, 3, 4)
+    EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 5);
+
+    // Verify all versions are present
+    auto rs_metas = tablet->tablet_meta()->all_rs_metas();
+    std::set<int64_t> found_versions;
+    for (const auto& rs_meta : rs_metas) {
+        found_versions.insert(rs_meta->version().first);
+    }
+    EXPECT_EQ(found_versions.size(), 5);
+    EXPECT_TRUE(found_versions.contains(0));
+    EXPECT_TRUE(found_versions.contains(1));
+    EXPECT_TRUE(found_versions.contains(2));
+    EXPECT_TRUE(found_versions.contains(3));
+    EXPECT_TRUE(found_versions.contains(4));
+
+    // Verify the hole rowsets (versions 1 and 3) are empty
+    for (const auto& rs_meta : rs_metas) {
+        if (rs_meta->version().first == 1 || rs_meta->version().first == 3) {
+            EXPECT_TRUE(rs_meta->empty());
+            EXPECT_EQ(rs_meta->num_rows(), 0);
+        } else {
+            EXPECT_FALSE(rs_meta->empty());
+            EXPECT_EQ(rs_meta->num_rows(), 100);
+        }
+    }
+}
+
+// Test create_empty_rowset_for_hole function
+TEST_F(CloudMetaMgrTest, test_create_empty_rowset_for_hole) {
+    CloudStorageEngine engine(EngineOptions {});
+    CloudMetaMgr meta_mgr;
+
+    TabletMetaSharedPtr tablet_meta(
+            new TabletMeta(1001, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, 
{{7, 8}},
+                           UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, 
TCompressionType::LZ4F));
+    auto tablet = std::make_shared<CloudTablet>(engine, 
std::make_shared<TabletMeta>(*tablet_meta));
+
+    // Create a previous rowset meta to pass as reference
+    auto prev_rs_meta = std::make_shared<RowsetMeta>();
+    prev_rs_meta->set_tablet_id(1001);
+    prev_rs_meta->set_index_id(2);
+    prev_rs_meta->set_partition_id(15673);
+    prev_rs_meta->set_tablet_uid(UniqueId(9, 10));
+    prev_rs_meta->set_version(Version(1, 1));
+    prev_rs_meta->set_rowset_type(BETA_ROWSET);
+    prev_rs_meta->set_rowset_id(engine.next_rowset_id());
+    prev_rs_meta->set_num_rows(100);
+    prev_rs_meta->set_empty(false);
+    prev_rs_meta->set_tablet_schema(tablet->tablet_schema());
+
+    // Test creating an empty rowset for version hole
+    RowsetSharedPtr hole_rowset;
+    Status status =
+            meta_mgr.create_empty_rowset_for_hole(tablet.get(), 2, 
prev_rs_meta, &hole_rowset);
+    EXPECT_TRUE(status.ok()) << "Failed to create empty rowset for hole: " << 
status;
+    EXPECT_NE(hole_rowset, nullptr);
+
+    // Verify the hole rowset properties
+    auto hole_rs_meta = hole_rowset->rowset_meta();
+    EXPECT_EQ(hole_rs_meta->tablet_id(), 15673);
+    EXPECT_EQ(hole_rs_meta->index_id(), 0);
+    EXPECT_EQ(hole_rs_meta->partition_id(), 2);
+    EXPECT_EQ(hole_rs_meta->tablet_uid(), UniqueId(9, 10));
+    EXPECT_EQ(hole_rs_meta->version(), Version(2, 2));
+    EXPECT_EQ(hole_rs_meta->rowset_type(), BETA_ROWSET);
+    EXPECT_EQ(hole_rs_meta->num_rows(), 0);
+    EXPECT_EQ(hole_rs_meta->total_disk_size(), 0);
+    EXPECT_EQ(hole_rs_meta->data_disk_size(), 0);
+    EXPECT_EQ(hole_rs_meta->index_disk_size(), 0);
+    EXPECT_TRUE(hole_rs_meta->empty());
+    EXPECT_EQ(hole_rs_meta->num_segments(), 0);
+    EXPECT_EQ(hole_rs_meta->segments_overlap(), NONOVERLAPPING);
+    EXPECT_EQ(hole_rs_meta->rowset_state(), VISIBLE);
+    EXPECT_TRUE(hole_rowset->is_hole_rowset());
+    EXPECT_EQ(hole_rowset->txn_id(), 2); // txn_id should match version
+    RowsetId expected_rowset_id;
+    expected_rowset_id.init(2, 0, 15673, 2);
+    EXPECT_EQ(hole_rowset->rowset_meta()->rowset_id(), expected_rowset_id);
+
+    // Test creating multiple hole rowsets with different versions
+    RowsetSharedPtr hole_rowset_v3;
+    status = meta_mgr.create_empty_rowset_for_hole(tablet.get(), 3, 
prev_rs_meta, &hole_rowset_v3);
+    EXPECT_TRUE(status.ok());
+    EXPECT_NE(hole_rowset_v3, nullptr);
+    EXPECT_EQ(hole_rowset_v3->rowset_meta()->version(), Version(3, 3));
+    EXPECT_TRUE(hole_rowset_v3->is_hole_rowset());
+
+    // Verify different hole rowsets have different rowset IDs
+    EXPECT_NE(hole_rowset->rowset_meta()->rowset_id(), 
hole_rowset_v3->rowset_meta()->rowset_id());
+}
+
+TEST_F(CloudMetaMgrTest, test_fill_version_holes_edge_cases) {
+    CloudStorageEngine engine(EngineOptions {});
+    CloudMetaMgr meta_mgr;
+
+    // Test case 1: max_version <= 0
+    {
+        TabletMetaSharedPtr tablet_meta(new TabletMeta(
+                1001, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, 
UniqueId(9, 10),
+                TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
+        auto tablet =
+                std::make_shared<CloudTablet>(engine, 
std::make_shared<TabletMeta>(*tablet_meta));
+
+        std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
+        Status status = meta_mgr.fill_version_holes(tablet.get(), 0, wlock);
+        EXPECT_TRUE(status.ok());
+
+        status = meta_mgr.fill_version_holes(tablet.get(), -1, wlock);
+        EXPECT_TRUE(status.ok());
+
+        EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 0);
+    }
+
+    // Test case 2: empty tablet (no existing versions)
+    {
+        TabletMetaSharedPtr tablet_meta(new TabletMeta(
+                1002, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, 
UniqueId(9, 10),
+                TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
+        auto tablet =
+                std::make_shared<CloudTablet>(engine, 
std::make_shared<TabletMeta>(*tablet_meta));
+
+        std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
+        Status status = meta_mgr.fill_version_holes(tablet.get(), 5, wlock);
+        EXPECT_TRUE(status.ok());
+
+        // Should still have no rowsets
+        EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 0);
+    }
+}
+
+TEST_F(CloudMetaMgrTest, test_fill_version_holes_trailing_holes) {
+    CloudStorageEngine engine(EngineOptions {});
+    CloudMetaMgr meta_mgr;
+
+    TabletMetaSharedPtr tablet_meta(
+            new TabletMeta(1003, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, 
{{7, 8}},
+                           UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, 
TCompressionType::LZ4F));
+    auto tablet = std::make_shared<CloudTablet>(engine, 
std::make_shared<TabletMeta>(*tablet_meta));
+
+    // Add only versions 0, 1, 2 but max_version is 5 (missing 3, 4, 5)
+    std::vector<RowsetSharedPtr> rowsets;
+    for (int64_t version = 0; version <= 2; ++version) {
+        auto rs_meta = std::make_shared<RowsetMeta>();
+        rs_meta->set_tablet_id(1003);
+        rs_meta->set_index_id(2);
+        rs_meta->set_partition_id(15673);
+        rs_meta->set_tablet_uid(UniqueId(9, 10));
+        rs_meta->set_version(Version(version, version));
+        rs_meta->set_rowset_type(BETA_ROWSET);
+        rs_meta->set_rowset_id(engine.next_rowset_id());
+        rs_meta->set_num_rows(100);
+        rs_meta->set_empty(false);
+        rs_meta->set_tablet_schema(tablet->tablet_schema());
+
+        RowsetSharedPtr rowset;
+        auto status = RowsetFactory::create_rowset(tablet->tablet_schema(), 
tablet->tablet_path(),
+                                                   rs_meta, &rowset);
+        EXPECT_TRUE(status.ok());
+        rowsets.push_back(rowset);
+    }
+
+    // Add all rowsets to tablet
+    {
+        std::unique_lock<std::shared_mutex> lock(tablet->get_header_lock());
+        tablet->add_rowsets(std::move(rowsets), false, lock, false);
+    }
+
+    // Initially we have 3 rowsets (versions 0, 1, 2)
+    EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 3);
+
+    // Test fill_version_holes to fill trailing holes (versions 3, 4, 5)
+    std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
+    Status status = meta_mgr.fill_version_holes(tablet.get(), 5, wlock);
+    EXPECT_TRUE(status.ok());
+
+    // After filling holes, we should have 6 rowsets (versions 0, 1, 2, 3, 4, 
5)
+    EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 6);
+
+    // Verify all versions are present
+    auto rs_metas = tablet->tablet_meta()->all_rs_metas();
+    std::set<int64_t> found_versions;
+    for (const auto& rs_meta : rs_metas) {
+        found_versions.insert(rs_meta->version().first);
+    }
+    EXPECT_EQ(found_versions.size(), 6);
+    for (int64_t v = 0; v <= 5; ++v) {
+        EXPECT_TRUE(found_versions.contains(v)) << "Missing version " << v;
+    }
+
+    // Verify the trailing hole rowsets (versions 3, 4, 5) are empty
+    for (const auto& rs_meta : rs_metas) {
+        if (rs_meta->version().first >= 3) {
+            EXPECT_TRUE(rs_meta->empty())
+                    << "Version " << rs_meta->version().first << " should be 
empty";
+            EXPECT_EQ(rs_meta->num_rows(), 0);
+            EXPECT_EQ(rs_meta->total_disk_size(), 0);
+        } else {
+            EXPECT_FALSE(rs_meta->empty())
+                    << "Version " << rs_meta->version().first << " should not 
be empty";
+            EXPECT_EQ(rs_meta->num_rows(), 100);
+        }
+    }
+}
+
+TEST_F(CloudMetaMgrTest, test_fill_version_holes_single_hole) {
+    CloudStorageEngine engine(EngineOptions {});
+    CloudMetaMgr meta_mgr;
+
+    TabletMetaSharedPtr tablet_meta(
+            new TabletMeta(1004, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, 
{{7, 8}},
+                           UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, 
TCompressionType::LZ4F));
+    auto tablet = std::make_shared<CloudTablet>(engine, 
std::make_shared<TabletMeta>(*tablet_meta));
+
+    // Add versions 0, 2 (missing only version 1)
+    std::vector<int64_t> versions = {0, 2};
+    std::vector<RowsetSharedPtr> rowsets;
+    for (int64_t version : versions) {
+        auto rs_meta = std::make_shared<RowsetMeta>();
+        rs_meta->set_tablet_id(1004);
+        rs_meta->set_index_id(2);
+        rs_meta->set_partition_id(15673);
+        rs_meta->set_tablet_uid(UniqueId(9, 10));
+        rs_meta->set_version(Version(version, version));
+        rs_meta->set_rowset_type(BETA_ROWSET);
+        rs_meta->set_rowset_id(engine.next_rowset_id());
+        rs_meta->set_num_rows(100);
+        rs_meta->set_empty(false);
+        rs_meta->set_tablet_schema(tablet->tablet_schema());
+
+        RowsetSharedPtr rowset;
+        auto status = RowsetFactory::create_rowset(tablet->tablet_schema(), 
tablet->tablet_path(),
+                                                   rs_meta, &rowset);
+        EXPECT_TRUE(status.ok());
+        rowsets.push_back(rowset);
+    }
+
+    // Add all rowsets to tablet
+    {
+        std::unique_lock<std::shared_mutex> lock(tablet->get_header_lock());
+        tablet->add_rowsets(std::move(rowsets), false, lock, false);
+    }
+
+    // Initially we have 2 rowsets (versions 0, 2)
+    EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 2);
+
+    // Test fill_version_holes to fill single hole (version 1)
+    std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
+    Status status = meta_mgr.fill_version_holes(tablet.get(), 2, wlock);
+    EXPECT_TRUE(status.ok());
+
+    // After filling holes, we should have 3 rowsets (versions 0, 1, 2)
+    EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 3);
+
+    // Verify all versions are present
+    auto rs_metas = tablet->tablet_meta()->all_rs_metas();
+    std::set<int64_t> found_versions;
+    for (const auto& rs_meta : rs_metas) {
+        found_versions.insert(rs_meta->version().first);
+    }
+    EXPECT_EQ(found_versions.size(), 3);
+    EXPECT_TRUE(found_versions.contains(0));
+    EXPECT_TRUE(found_versions.contains(1));
+    EXPECT_TRUE(found_versions.contains(2));
+
+    // Verify the hole rowset (version 1) is empty
+    for (const auto& rs_meta : rs_metas) {
+        if (rs_meta->version().first == 1) {
+            EXPECT_TRUE(rs_meta->empty());
+            EXPECT_EQ(rs_meta->num_rows(), 0);
+            EXPECT_EQ(rs_meta->total_disk_size(), 0);
+        } else {
+            EXPECT_FALSE(rs_meta->empty());
+            EXPECT_EQ(rs_meta->num_rows(), 100);
+        }
+    }
+}
+
+TEST_F(CloudMetaMgrTest, test_fill_version_holes_multiple_consecutive_holes) {
+    CloudStorageEngine engine(EngineOptions {});
+    CloudMetaMgr meta_mgr;
+
+    TabletMetaSharedPtr tablet_meta(
+            new TabletMeta(1005, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, 
{{7, 8}},
+                           UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, 
TCompressionType::LZ4F));
+    auto tablet = std::make_shared<CloudTablet>(engine, 
std::make_shared<TabletMeta>(*tablet_meta));
+
+    // Add versions 0, 5 (missing 1, 2, 3, 4 - multiple consecutive holes)
+    std::vector<int64_t> versions = {0, 5};
+    std::vector<RowsetSharedPtr> rowsets;
+    for (int64_t version : versions) {
+        auto rs_meta = std::make_shared<RowsetMeta>();
+        rs_meta->set_tablet_id(1005);
+        rs_meta->set_index_id(2);
+        rs_meta->set_partition_id(15673);
+        rs_meta->set_tablet_uid(UniqueId(9, 10));
+        rs_meta->set_version(Version(version, version));
+        rs_meta->set_rowset_type(BETA_ROWSET);
+        rs_meta->set_rowset_id(engine.next_rowset_id());
+        rs_meta->set_num_rows(100);
+        rs_meta->set_empty(false);
+        rs_meta->set_tablet_schema(tablet->tablet_schema());
+
+        RowsetSharedPtr rowset;
+        auto status = RowsetFactory::create_rowset(tablet->tablet_schema(), 
tablet->tablet_path(),
+                                                   rs_meta, &rowset);
+        EXPECT_TRUE(status.ok());
+        rowsets.push_back(rowset);
+    }
+
+    // Add all rowsets to tablet
+    {
+        std::unique_lock<std::shared_mutex> lock(tablet->get_header_lock());
+        tablet->add_rowsets(std::move(rowsets), false, lock, false);
+    }
+
+    // Initially we have 2 rowsets (versions 0, 5)
+    EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 2);
+
+    // Test fill_version_holes to fill multiple consecutive holes (versions 1, 
2, 3, 4)
+    std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
+    Status status = meta_mgr.fill_version_holes(tablet.get(), 5, wlock);
+    EXPECT_TRUE(status.ok());
+
+    // After filling holes, we should have 6 rowsets (versions 0, 1, 2, 3, 4, 
5)
+    EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 6);
+
+    // Verify all versions are present
+    auto rs_metas = tablet->tablet_meta()->all_rs_metas();
+    std::set<int64_t> found_versions;
+    for (const auto& rs_meta : rs_metas) {
+        found_versions.insert(rs_meta->version().first);
+    }
+    EXPECT_EQ(found_versions.size(), 6);
+    for (int64_t v = 0; v <= 5; ++v) {
+        EXPECT_TRUE(found_versions.contains(v)) << "Missing version " << v;
+    }
+
+    // Verify the hole rowsets (versions 1, 2, 3, 4) are empty
+    for (const auto& rs_meta : rs_metas) {
+        if (rs_meta->version().first >= 1 && rs_meta->version().first <= 4) {
+            EXPECT_TRUE(rs_meta->empty())
+                    << "Version " << rs_meta->version().first << " should be 
empty";
+            EXPECT_EQ(rs_meta->num_rows(), 0);
+            EXPECT_EQ(rs_meta->total_disk_size(), 0);
+        } else {
+            EXPECT_FALSE(rs_meta->empty())
+                    << "Version " << rs_meta->version().first << " should not 
be empty";
+            EXPECT_EQ(rs_meta->num_rows(), 100);
+        }
+    }
+}
+
+TEST_F(CloudMetaMgrTest, test_fill_version_holes_mixed_holes) {
+    CloudStorageEngine engine(EngineOptions {});
+    CloudMetaMgr meta_mgr;
+
+    TabletMetaSharedPtr tablet_meta(
+            new TabletMeta(1006, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, 
{{7, 8}},
+                           UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, 
TCompressionType::LZ4F));
+    auto tablet = std::make_shared<CloudTablet>(engine, 
std::make_shared<TabletMeta>(*tablet_meta));
+
+    // Add versions 0, 2, 5, 6 (missing 1, 3, 4 and potential trailing holes 
up to max_version)
+    std::vector<int64_t> versions = {0, 2, 5, 6};
+    std::vector<RowsetSharedPtr> rowsets;
+    for (int64_t version : versions) {
+        auto rs_meta = std::make_shared<RowsetMeta>();
+        rs_meta->set_tablet_id(1006);
+        rs_meta->set_index_id(2);
+        rs_meta->set_partition_id(15673);
+        rs_meta->set_tablet_uid(UniqueId(9, 10));
+        rs_meta->set_version(Version(version, version));
+        rs_meta->set_rowset_type(BETA_ROWSET);
+        rs_meta->set_rowset_id(engine.next_rowset_id());
+        rs_meta->set_num_rows(100);
+        rs_meta->set_empty(false);
+        rs_meta->set_tablet_schema(tablet->tablet_schema());
+
+        RowsetSharedPtr rowset;
+        auto status = RowsetFactory::create_rowset(tablet->tablet_schema(), 
tablet->tablet_path(),
+                                                   rs_meta, &rowset);
+        EXPECT_TRUE(status.ok());
+        rowsets.push_back(rowset);
+    }
+
+    // Add all rowsets to tablet
+    {
+        std::unique_lock<std::shared_mutex> lock(tablet->get_header_lock());
+        tablet->add_rowsets(std::move(rowsets), false, lock, false);
+    }
+
+    // Initially we have 4 rowsets (versions 0, 2, 5, 6)
+    EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 4);
+
+    // Test fill_version_holes with max_version = 8 (should fill 1, 3, 4, 7, 8)
+    std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
+    Status status = meta_mgr.fill_version_holes(tablet.get(), 8, wlock);
+    EXPECT_TRUE(status.ok());
+
+    // After filling holes, we should have 9 rowsets (versions 0-8)
+    EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 9);
+
+    // Verify all versions are present
+    auto rs_metas = tablet->tablet_meta()->all_rs_metas();
+    std::set<int64_t> found_versions;
+    for (const auto& rs_meta : rs_metas) {
+        found_versions.insert(rs_meta->version().first);
+    }
+    EXPECT_EQ(found_versions.size(), 9);
+    for (int64_t v = 0; v <= 8; ++v) {
+        EXPECT_TRUE(found_versions.contains(v)) << "Missing version " << v;
+    }
+
+    // Verify the hole rowsets (versions 1, 3, 4, 7, 8) are empty
+    std::set<int64_t> original_versions = {0, 2, 5, 6};
+    std::set<int64_t> hole_versions = {1, 3, 4, 7, 8};
+    for (const auto& rs_meta : rs_metas) {
+        int64_t version = rs_meta->version().first;
+        if (hole_versions.contains(version)) {
+            EXPECT_TRUE(rs_meta->empty()) << "Version " << version << " should 
be empty";
+            EXPECT_EQ(rs_meta->num_rows(), 0);
+            EXPECT_EQ(rs_meta->total_disk_size(), 0);
+        } else if (original_versions.contains(version)) {
+            EXPECT_FALSE(rs_meta->empty()) << "Version " << version << " 
should not be empty";
+            EXPECT_EQ(rs_meta->num_rows(), 100);
+        }
+    }
+}
+
 } // namespace doris
diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index d5efa55c606..8e4d8d93b14 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -2461,7 +2461,7 @@ void 
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
                     LOG(WARNING) << msg;
                     return;
                 }
-
+                response->set_partition_max_version(version_pb.version());
                 if (version_pb.pending_txn_ids_size() > 0) {
                     DCHECK(version_pb.pending_txn_ids_size() == 1);
                     stats.get_bytes += txn->get_bytes();
diff --git a/cloud/src/meta-service/meta_service_job.cpp 
b/cloud/src/meta-service/meta_service_job.cpp
index 588a5cca1d6..501dc4f43b3 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -1000,7 +1000,9 @@ void process_compaction_job(MetaServiceCode& code, 
std::string& msg, std::string
 
     TEST_SYNC_POINT_CALLBACK("process_compaction_job::loop_input_done", 
&num_rowsets);
 
-    if (num_rowsets < 1) {
+    // compaction.num_input_rowsets is 0 when multiple hole rowsets are 
compacted,
+    // we can continue to process the job for this case.
+    if (num_rowsets < 1 && compaction.num_input_rowsets() > 0) {
         SS << "too few input rowsets, tablet_id=" << tablet_id << " 
num_rowsets=" << num_rowsets;
         code = MetaServiceCode::UNDEFINED_ERR;
         msg = ss.str();
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 8faa59e7ce7..17bf92c08aa 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -2168,27 +2168,13 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id, 
RecyclerMetricsContext&
     
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::recycle_tablet.create_rowset_meta", 
&resp);
 
     for (const auto& rs_meta : resp.rowset_meta()) {
-        /*
-        * For compatibility, we skip the loop for [0-1] here. 
-        * The purpose of this loop is to delete object files,
-        * and since [0-1] only has meta and doesn't have object files, 
-        * skipping it doesn't affect system correctness. 
-        *
-        * If not skipped, the check "if (!rs_meta.has_resource_id())" below 
-        * would return error -1 directly, causing the recycle operation to 
fail.
-        *
-        * [0-1] doesn't have resource id is a bug.
-        * In the future, we will fix this problem, after that,
-        * we can remove this if statement.
-        *
-        * TODO(Yukang-Lian): remove this if statement when [0-1] has resource 
id in the future.
-        */
-
-        if (rs_meta.end_version() == 1) {
-            // Assert that [0-1] has no resource_id to make sure
-            // this if statement will not be forgetted to remove
-            // when the resource id bug is fixed
-            DCHECK(!rs_meta.has_resource_id()) << "rs_meta" << 
rs_meta.ShortDebugString();
+        // The rowset has no resource id and segments when it was generated by 
compaction
+        // with multiple hole rowsets or it's version is [0-1], so we can skip 
it.
+        if (!rs_meta.has_resource_id() && rs_meta.num_segments() == 0) {
+            LOG_INFO("rowset meta does not have a resource id and no segments, 
skip this rowset")
+                    .tag("rs_meta", rs_meta.ShortDebugString())
+                    .tag("instance_id", instance_id_)
+                    .tag("tablet_id", tablet_id);
             recycle_rowsets_number += 1;
             continue;
         }
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 987a6a15572..03f089e080a 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -4940,6 +4940,7 @@ TEST(RecyclerTest, recycle_tablet_without_resource_id) {
     sp->set_call_back("InstanceRecycler::recycle_tablet.create_rowset_meta", 
[](auto&& args) {
         auto* resp = try_any_cast<GetRowsetResponse*>(args[0]);
         auto* rs = resp->add_rowset_meta();
+        rs->set_num_segments(1); // force delete rowset data
         EXPECT_EQ(rs->has_resource_id(), false);
     });
     sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) {
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 3e18b2dde73..51fbf1e6cbc 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1077,7 +1077,9 @@ message GetRowsetResponse {
     repeated doris.RowsetMetaCloudPB rowset_meta = 2;
     optional TabletStatsPB stats = 3;
     // Return dict value if SchemaOp is RETURN_DICT
-    optional SchemaCloudDictionary schema_dict = 4; 
+    optional SchemaCloudDictionary schema_dict = 4;
+    // The current max version of the partition
+    optional int64 partition_max_version = 5;
 }
 
 message GetSchemaDictRequest {
diff --git 
a/regression-test/data/compaction/test_compaction_with_empty_rowset.out 
b/regression-test/data/compaction/test_compaction_with_empty_rowset.out
new file mode 100644
index 00000000000..8f98a92a26c
Binary files /dev/null and 
b/regression-test/data/compaction/test_compaction_with_empty_rowset.out differ
diff --git 
a/regression-test/data/schema_change_p0/test_schema_change_mow_with_empty_rowset.out
 
b/regression-test/data/schema_change_p0/test_schema_change_mow_with_empty_rowset.out
new file mode 100644
index 00000000000..c3605547fe0
Binary files /dev/null and 
b/regression-test/data/schema_change_p0/test_schema_change_mow_with_empty_rowset.out
 differ
diff --git a/regression-test/pipeline/cloud_p0/conf/be_custom.conf 
b/regression-test/pipeline/cloud_p0/conf/be_custom.conf
index 946f05cbb19..23d34651ce5 100644
--- a/regression-test/pipeline/cloud_p0/conf/be_custom.conf
+++ b/regression-test/pipeline/cloud_p0/conf/be_custom.conf
@@ -46,5 +46,5 @@ 
sys_log_verbose_modules=query_context,runtime_query_statistics_mgr
 
 # So feature has bug, so by default is false, only open it in pipeline to 
observe
 enable_parquet_page_index=true
-
+enable_fuzzy_mode=true
 enable_prefill_all_dbm_agg_cache_after_compaction=true
diff --git 
a/regression-test/suites/compaction/test_compaction_with_empty_rowset.groovy 
b/regression-test/suites/compaction/test_compaction_with_empty_rowset.groovy
new file mode 100644
index 00000000000..6f6f869917d
--- /dev/null
+++ b/regression-test/suites/compaction/test_compaction_with_empty_rowset.groovy
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+
+suite("test_compaction_mow_with_empty_rowset", "p0") {
+    def tableName = "test_compaction_with_empty_rowset"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+
+    sql """
+    CREATE TABLE IF NOT EXISTS ${tableName} (
+      `k1` int(11) NULL,
+      `k2` tinyint(4) NULL,
+      `k3` smallint(6) NULL,
+      `k4` int(30) NULL,
+      `k5` largeint(40) NULL,
+      `k6` float NULL,
+      `k7` double NULL,
+      `k8` decimal(9, 0) NULL,
+      `k9` char(10) NULL,
+      `k10` varchar(1024) NULL,
+      `k11` text NULL,
+      `k12` date NULL,
+      `k13` datetime NULL
+    ) ENGINE=OLAP
+    unique KEY(k1, k2, k3)
+    DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+    PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "enable_unique_key_merge_on_write" = "true"
+    );
+    """
+
+    for (int i = 0; i < 10; i++) {
+        sql """ insert into ${tableName} values (1, 2, 3, 4, 5, 6.6, 1.7, 8.8,
+                'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00') """
+    }   
+
+    qt_sql """ select * from ${tableName} order by k1, k2, k3 """
+
+
+    def tablets = sql_return_maparray """ show tablets from ${tableName}; """
+
+    def replicaNum = get_table_replica_num(tableName)
+    logger.info("get table replica num: " + replicaNum)
+
+    // trigger compactions for all tablets in ${tableName}
+    trigger_and_wait_compaction(tableName, "cumulative")
+    int rowCount = 0
+    for (def tablet in tablets) {
+        String tablet_id = tablet.TabletId
+        def (code, out, err) = curl("GET", tablet.CompactionStatus)
+        logger.info("Show tablets status: code=" + code + ", out=" + out + ", 
err=" + err)
+        assertEquals(code, 0)
+        def tabletJson = parseJson(out.trim())
+        assert tabletJson.rowsets instanceof List
+        for (String rowset in (List<String>) tabletJson.rowsets) {
+            rowCount += Integer.parseInt(rowset.split(" ")[1])
+        }
+    }
+    assert (rowCount < 10 * replicaNum)
+    qt_sql2 """ select * from ${tableName} order by k1, k2, k3 """
+
+    for (int i = 0; i < 10; i++) {
+        sql """ insert into ${tableName} values (2, 2, 3, 4, 5, 6.6, 1.7, 8.8,
+                'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00') """
+    }   
+
+    // trigger compactions for all tablets in ${tableName}
+    trigger_and_wait_compaction(tableName, "cumulative")
+    rowCount = 0
+    for (def tablet in tablets) {
+        String tablet_id = tablet.TabletId
+        def (code, out, err) = curl("GET", tablet.CompactionStatus)
+        logger.info("Show tablets status: code=" + code + ", out=" + out + ", 
err=" + err)
+        assertEquals(code, 0)
+        def tabletJson = parseJson(out.trim())
+        assert tabletJson.rowsets instanceof List
+        for (String rowset in (List<String>) tabletJson.rowsets) {
+            rowCount += Integer.parseInt(rowset.split(" ")[1])
+        }
+    }
+    assert (rowCount < 20 * replicaNum)
+    qt_sql3 """ select * from ${tableName} order by k1, k2, k3 """
+}
diff --git 
a/regression-test/suites/schema_change_p0/test_schema_change_mow_with_empty_rowset.groovy
 
b/regression-test/suites/schema_change_p0/test_schema_change_mow_with_empty_rowset.groovy
new file mode 100644
index 00000000000..a0fe1d58321
--- /dev/null
+++ 
b/regression-test/suites/schema_change_p0/test_schema_change_mow_with_empty_rowset.groovy
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
+suite("test_schema_change_mow_with_empty_rowset", "p0") {
+    def tableName = "test_sc_mow_with_empty_rowset"
+
+    def getJobState = { tbl ->
+        def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE 
IndexName='${tbl}' ORDER BY createtime DESC LIMIT 1 """
+        return jobStateResult[0][9]
+    }
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+
+    sql """
+    CREATE TABLE IF NOT EXISTS ${tableName} (
+      `k1` int(11) NULL,
+      `k2` tinyint(4) NULL,
+      `k3` smallint(6) NULL,
+      `k4` int(30) NULL,
+      `k5` largeint(40) NULL,
+      `k6` float NULL,
+      `k7` double NULL,
+      `k8` decimal(9, 0) NULL,
+      `k9` char(10) NULL,
+      `k10` varchar(1024) NULL,
+      `k11` text NULL,
+      `k12` date NULL,
+      `k13` datetime NULL
+    ) ENGINE=OLAP
+    unique KEY(k1, k2, k3)
+    DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+    PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "enable_unique_key_merge_on_write" = "true"
+    );
+    """
+
+    for (int i = 0; i < 100; i++) {
+        sql """ insert into ${tableName} values ($i, 2, 3, 4, 5, 6.6, 1.7, 8.8,
+    'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00') """
+    }   
+
+    sql """ alter table ${tableName} modify column k4 string NULL"""
+
+    for (int i = 0; i < 20; i++) {
+        sql """ insert into ${tableName} values (100, 2, 3, 4, 5, 6.6, 1.7, 
8.8,
+    'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00') """
+    }   
+
+    Awaitility.await().atMost(30, TimeUnit.SECONDS).pollDelay(10, 
TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(
+        {
+            String res = getJobState(tableName)
+            if (res == "FINISHED" || res == "CANCELLED") {
+                assertEquals("FINISHED", res)
+                return true
+            }
+            return false
+        }
+    )
+
+    qt_sql """ select * from ${tableName} order by k1, k2, k3 """
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to