This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 286b47f04a2 [feat](cloud) Add snapshot data migrator & chain compactor 
skeleton (#56939)
286b47f04a2 is described below

commit 286b47f04a21069462896e826afb4e08a94c8370
Author: walter <[email protected]>
AuthorDate: Thu Oct 16 14:04:39 2025 +0800

    [feat](cloud) Add snapshot data migrator & chain compactor skeleton (#56939)
---
 cloud/src/common/config.h                       |   5 +
 cloud/src/common/stopwatch.h                    |   4 +
 cloud/src/meta-store/keys.cpp                   |  18 +-
 cloud/src/meta-store/keys.h                     |  12 +-
 cloud/src/recycler/recycler.cpp                 |  27 ++
 cloud/src/recycler/recycler.h                   |   4 +
 cloud/src/recycler/snapshot_chain_compactor.cpp | 323 +++++++++++++++++++++
 cloud/src/recycler/snapshot_chain_compactor.h   | 111 ++++++++
 cloud/src/recycler/snapshot_data_migrator.cpp   | 362 ++++++++++++++++++++++++
 cloud/src/recycler/snapshot_data_migrator.h     | 132 +++++++++
 cloud/src/snapshot/snapshot_manager.cpp         |   8 +
 cloud/src/snapshot/snapshot_manager.h           |  10 +
 cloud/test/keys_test.cpp                        |  48 ++++
 gensrc/proto/cloud.proto                        |  29 ++
 14 files changed, 1091 insertions(+), 2 deletions(-)

diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 317dbff7e6b..eaebca1f827 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -376,8 +376,13 @@ CONF_Bool(enable_multi_version_status, "false");
 // New instance enable cluster snapshot, it only works when 
enable_multi_version_status is true.
 // The new instance snapshot switch status will be set to SNAPSHOT_SWITCH_ON, 
and the auto snapshot will be open.
 CONF_Bool(enable_cluster_snapshot, "false");
+CONF_Bool(enable_snapshot_data_migrator, "false");
+CONF_Bool(enable_snapshot_chain_compactor, "false");
+CONF_Int32(snapshot_data_migrator_concurrent, "2");
+CONF_Int32(snapshot_chain_compactor_concurrent, "2");
 
 CONF_mString(aws_credentials_provider_version, "v2");
 CONF_Validator(aws_credentials_provider_version,
                [](const std::string& config) -> bool { return config == "v1" 
|| config == "v2"; });
+
 } // namespace doris::cloud::config
diff --git a/cloud/src/common/stopwatch.h b/cloud/src/common/stopwatch.h
index 4ac9abaa7f6..fa9fcfa7667 100644
--- a/cloud/src/common/stopwatch.h
+++ b/cloud/src/common/stopwatch.h
@@ -66,6 +66,10 @@ public:
                 .count();
     }
 
+    int64_t elapsed_ms() const { return elapsed_us() / 1000; }
+
+    int64_t elapsed_seconds() const { return elapsed_ms() / 1000; }
+
 private:
     std::chrono::steady_clock::time_point start_;
     std::chrono::steady_clock::duration elapsed_ {0};
diff --git a/cloud/src/meta-store/keys.cpp b/cloud/src/meta-store/keys.cpp
index 53280b24d1f..153a936223c 100644
--- a/cloud/src/meta-store/keys.cpp
+++ b/cloud/src/meta-store/keys.cpp
@@ -68,6 +68,8 @@ static const char* JOB_KEY_INFIX_RL_PROGRESS            = 
"routine_load_progress
 static const char* JOB_KEY_INFIX_STREAMING_JOB          = "streaming_job";
 static const char* JOB_KEY_INFIX_RESTORE_TABLET         = "restore_tablet";
 static const char* JOB_KEY_INFIX_RESTORE_ROWSET         = "restore_rowset";
+static const char* JOB_KEY_INFIX_SNAPSHOT_DATA_MIGRATOR = 
"snapshot_data_migrator";
+static const char* JOB_KEY_INFIX_SNAPSHOT_CHAIN_COMPACTOR = 
"snapshot_chain_compactor";
 
 static const char* COPY_JOB_KEY_INFIX                   = "job";
 static const char* COPY_FILE_KEY_INFIX                  = "loading_file";
@@ -146,7 +148,8 @@ static void encode_prefix(const T& t, std::string* key) {
         MetaDeleteBitmapInfo, MetaDeleteBitmapUpdateLockInfo, 
MetaPendingDeleteBitmapInfo, PartitionVersionKeyInfo,
         RecycleIndexKeyInfo, RecyclePartKeyInfo, RecycleRowsetKeyInfo, 
RecycleTxnKeyInfo, RecycleStageKeyInfo,
         StatsTabletKeyInfo, TableVersionKeyInfo, JobRestoreTabletKeyInfo, 
JobRestoreRowsetKeyInfo,
-        JobTabletKeyInfo, JobRecycleKeyInfo, RLJobProgressKeyInfo, 
StreamingJobKeyInfo,
+        JobTabletKeyInfo, JobRecycleKeyInfo, JobSnapshotDataMigratorKeyInfo, 
JobSnapshotChainCompactorKeyInfo,
+        RLJobProgressKeyInfo, StreamingJobKeyInfo,
         CopyJobKeyInfo, CopyFileKeyInfo,  StorageVaultKeyInfo, 
MetaSchemaPBDictionaryInfo,
         MowTabletJobInfo>);
 
@@ -183,6 +186,8 @@ static void encode_prefix(const T& t, std::string* key) {
         encode_bytes(STATS_KEY_PREFIX, key);
     } else if constexpr (std::is_same_v<T, JobTabletKeyInfo>
                       || std::is_same_v<T, JobRecycleKeyInfo>
+                      || std::is_same_v<T, JobSnapshotDataMigratorKeyInfo>
+                      || std::is_same_v<T, JobSnapshotChainCompactorKeyInfo>
                       || std::is_same_v<T, RLJobProgressKeyInfo>
                       || std::is_same_v<T, StreamingJobKeyInfo>) {
         encode_bytes(JOB_KEY_PREFIX, key);
@@ -459,6 +464,17 @@ void job_check_key(const JobRecycleKeyInfo& in, 
std::string* out) {
     encode_bytes("check", out); // "check"
 }
 
+void job_snapshot_data_migrator_key(const JobSnapshotDataMigratorKeyInfo& in, 
std::string* out) {
+    encode_prefix(in, out);                                  // 0x01 "job" 
${instance_id}
+    encode_bytes(JOB_KEY_INFIX_SNAPSHOT_DATA_MIGRATOR, out); // 
"snapshot_data_migrator"
+}
+
+void job_snapshot_chain_compactor_key(const JobSnapshotChainCompactorKeyInfo& 
in,
+                                      std::string* out) {
+    encode_prefix(in, out);                                    // 0x01 "job" 
${instance_id}
+    encode_bytes(JOB_KEY_INFIX_SNAPSHOT_CHAIN_COMPACTOR, out); // 
"snapshot_chain_compactor"
+}
+
 void rl_job_progress_key_info(const RLJobProgressKeyInfo& in, std::string* 
out) {
     encode_prefix(in, out);                       // 0x01 "job" ${instance_id}
     encode_bytes(JOB_KEY_INFIX_RL_PROGRESS, out); // "routine_load_progress"
diff --git a/cloud/src/meta-store/keys.h b/cloud/src/meta-store/keys.h
index b529efa450f..965b3eb0c1a 100644
--- a/cloud/src/meta-store/keys.h
+++ b/cloud/src/meta-store/keys.h
@@ -66,8 +66,9 @@
 // 0x01 "job" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} 
${tablet_id} -> TabletJobInfoPB
 // 0x01 "job" ${instance_id} "recycle"                                         
            -> JobRecyclePB
 // 0x01 "job" ${instance_id} "check"                                           
            -> JobRecyclePB
+// 0x01 "job" ${instance_id} "snapshot_data_migrator"                          
            -> JobRecyclePB
+// 0x01 "job" ${instance_id} "snapshot_chain_compactor"                        
            -> JobRecyclePB
 // 0x01 "job" ${instance_id} "streaming_job" ${db_id} ${job_id}                
            -> StreamingJobPB
-
 //
 // 0x01 "copy" ${instance_id} "job" ${stage_id} ${table_id} ${copy_id} 
${group_id}         -> CopyJobPB
 // 0x01 "copy" ${instance_id} "loading_file" ${stage_id} ${table_id} 
${obj_name} ${etag}   -> CopyFilePB
@@ -205,6 +206,11 @@ using RecycleStageKeyInfo  = BasicKeyInfo<19, 
std::tuple<std::string,  std::stri
 //                                                      0:instance_id
 using JobRecycleKeyInfo    = BasicKeyInfo<20 , std::tuple<std::string>>;
 
+//                                                      0:instance_id
+using JobSnapshotDataMigratorKeyInfo = BasicKeyInfo<53, 
std::tuple<std::string>>;
+//                                                      0:instance_id
+using JobSnapshotChainCompactorKeyInfo = BasicKeyInfo<54, 
std::tuple<std::string>>;
+
 //                                                      0:instance_id  
1:index_id  2:schema_version
 using MetaSchemaKeyInfo    = BasicKeyInfo<21, std::tuple<std::string,  
int64_t,    int64_t>>;
 
@@ -408,7 +414,11 @@ static inline std::string job_restore_rowset_key(const 
JobRestoreRowsetKeyInfo&
 
 void job_recycle_key(const JobRecycleKeyInfo& in, std::string* out);
 void job_check_key(const JobRecycleKeyInfo& in, std::string* out);
+void job_snapshot_data_migrator_key(const JobSnapshotDataMigratorKeyInfo& in, 
std::string* out);
+void job_snapshot_chain_compactor_key(const JobSnapshotChainCompactorKeyInfo& 
in, std::string* out);
 static inline std::string job_check_key(const JobRecycleKeyInfo& in) { 
std::string s; job_check_key(in, &s); return s; }
+static inline std::string job_snapshot_data_migrator_key(const 
JobSnapshotDataMigratorKeyInfo& in) { std::string s; 
job_snapshot_data_migrator_key(in, &s); return s; }
+static inline std::string job_snapshot_chain_compactor_key(const 
JobSnapshotChainCompactorKeyInfo& in) { std::string s; 
job_snapshot_chain_compactor_key(in, &s); return s; }
 void job_tablet_key(const JobTabletKeyInfo& in, std::string* out);
 static inline std::string job_tablet_key(const JobTabletKeyInfo& in) { 
std::string s; job_tablet_key(in, &s); return s; }
 void rl_job_progress_key_info(const RLJobProgressKeyInfo& in, std::string* 
out);
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 79fcbdcd45d..5f30536078f 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -415,6 +415,27 @@ int Recycler::start(brpc::Server* server) {
 
     workers_.emplace_back(std::mem_fn(&Recycler::lease_recycle_jobs), this);
     workers_.emplace_back(std::mem_fn(&Recycler::check_recycle_tasks), this);
+
+    if (config::enable_snapshot_data_migrator) {
+        snapshot_data_migrator_ = 
std::make_shared<SnapshotDataMigrator>(txn_kv_);
+        int ret = snapshot_data_migrator_->start();
+        if (ret != 0) {
+            LOG(ERROR) << "failed to start snapshot data migrator";
+            return ret;
+        }
+        LOG(INFO) << "snapshot data migrator started";
+    }
+
+    if (config::enable_snapshot_chain_compactor) {
+        snapshot_chain_compactor_ = 
std::make_shared<SnapshotChainCompactor>(txn_kv_);
+        int ret = snapshot_chain_compactor_->start();
+        if (ret != 0) {
+            LOG(ERROR) << "failed to start snapshot chain compactor";
+            return ret;
+        }
+        LOG(INFO) << "snapshot chain compactor started";
+    }
+
     return 0;
 }
 
@@ -434,6 +455,12 @@ void Recycler::stop() {
     if (checker_) {
         checker_->stop();
     }
+    if (snapshot_data_migrator_) {
+        snapshot_data_migrator_->stop();
+    }
+    if (snapshot_chain_compactor_) {
+        snapshot_chain_compactor_->stop();
+    }
 }
 
 class InstanceRecycler::InvertedIndexIdCache {
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index 2df94e63974..3d0b12da142 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -34,6 +34,8 @@
 #include "common/bvars.h"
 #include "meta-service/txn_lazy_committer.h"
 #include "meta-store/versionstamp.h"
+#include "recycler/snapshot_chain_compactor.h"
+#include "recycler/snapshot_data_migrator.h"
 #include "recycler/storage_vault_accessor.h"
 #include "recycler/white_black_list.h"
 #include "snapshot/snapshot_manager.h"
@@ -118,6 +120,8 @@ private:
 
     std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
     std::shared_ptr<SnapshotManager> snapshot_manager_;
+    std::shared_ptr<SnapshotDataMigrator> snapshot_data_migrator_;
+    std::shared_ptr<SnapshotChainCompactor> snapshot_chain_compactor_;
 };
 
 enum class RowsetRecyclingState {
diff --git a/cloud/src/recycler/snapshot_chain_compactor.cpp 
b/cloud/src/recycler/snapshot_chain_compactor.cpp
new file mode 100644
index 00000000000..f702f8326a8
--- /dev/null
+++ b/cloud/src/recycler/snapshot_chain_compactor.cpp
@@ -0,0 +1,323 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "recycler/snapshot_chain_compactor.h"
+
+#include <gen_cpp/cloud.pb.h>
+
+#include "common/config.h"
+#include "common/stopwatch.h"
+#include "common/util.h"
+#include "meta-store/keys.h"
+#include "mock_accessor.h"
+#include "recycler/hdfs_accessor.h"
+#include "recycler/s3_accessor.h"
+#include "recycler/util.h"
+
+namespace doris::cloud {
+
+SnapshotChainCompactor::SnapshotChainCompactor(std::shared_ptr<TxnKv> txn_kv)
+        : txn_kv_(std::move(txn_kv)) {}
+
+SnapshotChainCompactor::~SnapshotChainCompactor() {
+    if (!stopped()) {
+        stop();
+    }
+}
+
+int SnapshotChainCompactor::start() {
+    workers_.emplace_back([this]() { scan_instance_loop(); });
+    workers_.emplace_back([this] { lease_compaction_jobs(); });
+    for (int i = 0; i < config::snapshot_chain_compactor_concurrent; ++i) {
+        workers_.emplace_back([this]() { compaction_loop(); });
+    }
+
+    LOG_INFO("snapshot chain compactor started")
+            .tag("concurrent", config::snapshot_chain_compactor_concurrent);
+
+    return 0;
+}
+
+void SnapshotChainCompactor::stop() {
+    stopped_ = true;
+    notifier_.notify_all();
+    pending_instance_cond_.notify_all();
+    {
+        std::lock_guard lock(mtx_);
+        for (auto& [_, compactor] : compacting_instance_map_) {
+            compactor->stop();
+        }
+    }
+    for (auto& w : workers_) {
+        if (w.joinable()) w.join();
+    }
+}
+
+void SnapshotChainCompactor::compaction_loop() {
+    while (!stopped()) {
+        // fetch instance to check
+        InstanceInfoPB instance;
+        {
+            std::unique_lock lock(mtx_);
+            pending_instance_cond_.wait(
+                    lock, [&]() -> bool { return 
!pending_instance_queue_.empty() || stopped(); });
+            if (stopped()) {
+                return;
+            }
+            instance = std::move(pending_instance_queue_.front());
+            pending_instance_queue_.pop_front();
+            pending_instance_set_.erase(instance.instance_id());
+        }
+        const auto& instance_id = instance.instance_id();
+        {
+            std::lock_guard lock(mtx_);
+            // skip instance in compacting
+            if (compacting_instance_map_.count(instance_id)) continue;
+        }
+
+        auto compactor = std::make_shared<InstanceChainCompactor>(txn_kv_, 
instance);
+        if (compactor->init() != 0) {
+            LOG(WARNING) << "failed to init instance compactor, instance_id="
+                         << instance.instance_id();
+            continue;
+        }
+
+        std::string job_key = 
job_snapshot_chain_compactor_key(instance.instance_id());
+        int ret =
+                prepare_instance_recycle_job(txn_kv_.get(), job_key, 
instance.instance_id(),
+                                             ip_port_, 
config::recycle_job_lease_expired_ms * 1000);
+        if (ret != 0) { // Prepare failed
+            continue;
+        } else {
+            std::lock_guard lock(mtx_);
+            compacting_instance_map_.emplace(instance_id, compactor);
+        }
+        if (stopped()) return;
+
+        compactor->do_compact();
+        {
+            std::lock_guard lock(mtx_);
+            compacting_instance_map_.erase(instance.instance_id());
+        }
+    }
+}
+
+void SnapshotChainCompactor::scan_instance_loop() {
+    std::this_thread::sleep_for(
+            
std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds));
+    while (!stopped()) {
+        std::vector<InstanceInfoPB> instances;
+        get_all_instances(txn_kv_.get(), instances);
+        LOG(INFO) << "Snapshot chain compactor get instances: " << 
[&instances] {
+            std::stringstream ss;
+            for (auto& i : instances) ss << ' ' << i.instance_id();
+            return ss.str();
+        }();
+        if (!instances.empty()) {
+            // enqueue instances
+            std::lock_guard lock(mtx_);
+            for (auto& instance : instances) {
+                if (instance.status() == InstanceInfoPB::DELETED) continue;
+                if (!is_snapshot_chain_need_compact(instance)) continue;
+                auto [_, success] = 
pending_instance_set_.insert(instance.instance_id());
+                // skip instance already in pending queue
+                if (success) {
+                    pending_instance_queue_.push_back(std::move(instance));
+                }
+            }
+            pending_instance_cond_.notify_all();
+        }
+        {
+            std::unique_lock lock(mtx_);
+            notifier_.wait_for(lock, 
std::chrono::seconds(config::recycle_interval_seconds),
+                               [&]() { return stopped(); });
+        }
+    }
+}
+
+void SnapshotChainCompactor::lease_compaction_jobs() {
+    while (!stopped()) {
+        std::vector<std::string> instances;
+        instances.reserve(compacting_instance_map_.size());
+        {
+            std::lock_guard lock(mtx_);
+            for (auto& [id, _] : compacting_instance_map_) {
+                instances.push_back(id);
+            }
+        }
+        for (auto& i : instances) {
+            std::string job_key = job_snapshot_chain_compactor_key(i);
+            int ret = lease_instance_recycle_job(txn_kv_.get(), job_key, i, 
ip_port_);
+            if (ret == 1) {
+                std::lock_guard lock(mtx_);
+                if (auto it = compacting_instance_map_.find(i);
+                    it != compacting_instance_map_.end()) {
+                    it->second->stop();
+                }
+            }
+        }
+        {
+            std::unique_lock lock(mtx_);
+            notifier_.wait_for(lock,
+                               
std::chrono::milliseconds(config::recycle_job_lease_expired_ms / 3),
+                               [&]() { return stopped(); });
+        }
+    }
+}
+
+bool SnapshotChainCompactor::is_snapshot_chain_need_compact(const 
InstanceInfoPB& instance_info) {
+    // TODO:
+    return false;
+}
+
+InstanceChainCompactor::InstanceChainCompactor(std::shared_ptr<TxnKv> txn_kv,
+                                               const InstanceInfoPB& instance)
+        : txn_kv_(std::move(txn_kv)),
+          instance_id_(instance.instance_id()),
+          instance_info_(instance) {}
+
+InstanceChainCompactor::~InstanceChainCompactor() {
+    if (!stopped()) {
+        stop();
+    }
+}
+
+int InstanceChainCompactor::init() {
+    int ret = init_obj_store_accessors();
+    if (ret != 0) {
+        return ret;
+    }
+
+    return init_storage_vault_accessors();
+}
+
+int InstanceChainCompactor::init_obj_store_accessors() {
+    for (const auto& obj_info : instance_info_.obj_info()) {
+#ifdef UNIT_TEST
+        auto accessor = std::make_shared<MockAccessor>();
+#else
+        auto s3_conf = S3Conf::from_obj_store_info(obj_info);
+        if (!s3_conf) {
+            LOG(WARNING) << "failed to init object accessor, instance_id=" << 
instance_id_;
+            return -1;
+        }
+
+        std::shared_ptr<S3Accessor> accessor;
+        int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
+        if (ret != 0) {
+            LOG(WARNING) << "failed to init object accessor. instance_id=" << 
instance_id_
+                         << " resource_id=" << obj_info.id();
+            return ret;
+        }
+#endif
+
+        accessor_map_.emplace(obj_info.id(), std::move(accessor));
+    }
+
+    return 0;
+}
+
+int InstanceChainCompactor::init_storage_vault_accessors() {
+    if (instance_info_.resource_ids().empty()) {
+        return 0;
+    }
+
+    FullRangeGetOptions opts(txn_kv_);
+    opts.prefetch = true;
+    auto it = txn_kv_->full_range_get(storage_vault_key({instance_id_, ""}),
+                                      storage_vault_key({instance_id_, 
"\xff"}), std::move(opts));
+
+    for (auto kv = it->next(); kv.has_value(); kv = it->next()) {
+        auto [k, v] = *kv;
+        StorageVaultPB vault;
+        if (!vault.ParseFromArray(v.data(), v.size())) {
+            LOG(WARNING) << "malformed storage vault, unable to deserialize 
key=" << hex(k);
+            return -1;
+        }
+        
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::init_storage_vault_accessors.mock_vault",
+                                 &accessor_map_, &vault);
+        if (vault.has_hdfs_info()) {
+#ifdef ENABLE_HDFS_STORAGE_VAULT
+            auto accessor = std::make_shared<HdfsAccessor>(vault.hdfs_info());
+            int ret = accessor->init();
+            if (ret != 0) {
+                LOG(WARNING) << "failed to init hdfs accessor. instance_id=" 
<< instance_id_
+                             << " resource_id=" << vault.id() << " name=" << 
vault.name();
+                return ret;
+            }
+
+            accessor_map_.emplace(vault.id(), std::move(accessor));
+#else
+            LOG(ERROR) << "HDFS is disabled (via the ENABLE_HDFS_STORAGE_VAULT 
build option), "
+                       << "but HDFS storage vaults were detected";
+#endif
+        } else if (vault.has_obj_info()) {
+#ifdef UNIT_TEST
+            auto accessor = std::make_shared<MockAccessor>();
+#else
+            auto s3_conf = S3Conf::from_obj_store_info(vault.obj_info());
+            if (!s3_conf) {
+                LOG(WARNING) << "failed to init object accessor, instance_id=" 
<< instance_id_;
+                return -1;
+            }
+
+            std::shared_ptr<S3Accessor> accessor;
+            int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
+            if (ret != 0) {
+                LOG(WARNING) << "failed to init s3 accessor. instance_id=" << 
instance_id_
+                             << " resource_id=" << vault.id() << " name=" << 
vault.name();
+                return ret;
+            }
+#endif
+
+            accessor_map_.emplace(vault.id(), std::move(accessor));
+        }
+    }
+
+    if (!it->is_valid()) {
+        LOG_WARNING("failed to get storage vault kv");
+        return -1;
+    }
+    return 0;
+}
+
+int InstanceChainCompactor::do_compact() {
+    LOG_WARNING("start snapshot chain compaction").tag("instance_id", 
instance_id_);
+
+    StopWatch stop_watch;
+    DORIS_CLOUD_DEFER {
+        LOG_WARNING("snapshot chain compaction done")
+                .tag("instance_id", instance_id_)
+                .tag("cost(sec)", stop_watch.elapsed_seconds());
+    };
+
+    SnapshotManager snapshot_mgr(txn_kv_);
+    int res = snapshot_mgr.compact_snapshot_chains(this);
+    if (res != 0) {
+        LOG_WARNING("failed to compact snapshot chains").tag("instance_id", 
instance_id_);
+        return res;
+    }
+
+    return handle_compaction_completion();
+}
+
+int InstanceChainCompactor::handle_compaction_completion() {
+    // TODO:
+    return 0;
+}
+
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/recycler/snapshot_chain_compactor.h 
b/cloud/src/recycler/snapshot_chain_compactor.h
new file mode 100644
index 00000000000..f7d81ef95d2
--- /dev/null
+++ b/cloud/src/recycler/snapshot_chain_compactor.h
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <condition_variable>
+#include <deque>
+#include <memory>
+#include <string>
+#include <string_view>
+#include <thread>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "meta-service/txn_lazy_committer.h"
+#include "recycler/storage_vault_accessor.h"
+#include "snapshot/snapshot_manager.h"
+
+namespace doris::cloud {
+
+class InstanceChainCompactor;
+
+class SnapshotChainCompactor {
+public:
+    explicit SnapshotChainCompactor(std::shared_ptr<TxnKv> txn_kv);
+    ~SnapshotChainCompactor();
+
+    // returns 0 for success otherwise error
+    int start();
+
+    void stop();
+
+    bool stopped() const { return stopped_.load(std::memory_order_acquire); }
+
+private:
+    void scan_instance_loop();
+    void compaction_loop();
+    void lease_compaction_jobs();
+
+    bool is_snapshot_chain_need_compact(const InstanceInfoPB& instance_info);
+
+    std::shared_ptr<TxnKv> txn_kv_;
+    std::atomic_bool stopped_ {false};
+    std::string ip_port_;
+    std::vector<std::thread> workers_;
+
+    std::mutex mtx_;
+    // notify compaction workers
+    std::condition_variable pending_instance_cond_;
+    std::deque<InstanceInfoPB> pending_instance_queue_;
+    std::unordered_set<std::string> pending_instance_set_;
+    std::unordered_map<std::string, std::shared_ptr<InstanceChainCompactor>>
+            compacting_instance_map_;
+    // notify instance scanner and lease thread
+    std::condition_variable notifier_;
+
+    std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
+    std::shared_ptr<SnapshotManager> snapshot_manager_;
+};
+
+class InstanceChainCompactor {
+public:
+    InstanceChainCompactor(std::shared_ptr<TxnKv> txn_kv, const 
InstanceInfoPB& instance);
+    ~InstanceChainCompactor();
+
+    std::string_view instance_id() const { return instance_id_; }
+    const InstanceInfoPB& instance_info() const { return instance_info_; }
+
+    // returns 0 for success otherwise error
+    int init();
+
+    void stop() { stopped_.store(true, std::memory_order_release); }
+    bool stopped() const { return stopped_.load(std::memory_order_acquire); }
+
+    // returns 0 for success otherwise error
+    int do_compact();
+
+private:
+    // returns 0 for success otherwise error
+    int init_obj_store_accessors();
+
+    // returns 0 for success otherwise error
+    int init_storage_vault_accessors();
+
+    int handle_compaction_completion();
+
+    std::atomic_bool stopped_ {false};
+    std::shared_ptr<TxnKv> txn_kv_;
+    std::string instance_id_;
+    InstanceInfoPB instance_info_;
+
+    std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>> 
accessor_map_;
+};
+
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/recycler/snapshot_data_migrator.cpp 
b/cloud/src/recycler/snapshot_data_migrator.cpp
new file mode 100644
index 00000000000..57c495aa886
--- /dev/null
+++ b/cloud/src/recycler/snapshot_data_migrator.cpp
@@ -0,0 +1,362 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "recycler/snapshot_data_migrator.h"
+
+#include <gen_cpp/cloud.pb.h>
+
+#include "common/config.h"
+#include "common/defer.h"
+#include "common/stopwatch.h"
+#include "common/util.h"
+#include "meta-store/keys.h"
+#include "mock_accessor.h"
+#include "recycler/hdfs_accessor.h"
+#include "recycler/s3_accessor.h"
+#include "recycler/util.h"
+
+namespace doris::cloud {
+
+SnapshotDataMigrator::SnapshotDataMigrator(std::shared_ptr<TxnKv> txn_kv)
+        : txn_kv_(std::move(txn_kv)) {}
+
+SnapshotDataMigrator::~SnapshotDataMigrator() {
+    if (!stopped()) {
+        stop();
+    }
+}
+
+int SnapshotDataMigrator::start() {
+    workers_.emplace_back([this]() { scan_instance_loop(); });
+    workers_.emplace_back([this] { lease_migration_jobs(); });
+    for (int i = 0; i < config::snapshot_data_migrator_concurrent; ++i) {
+        workers_.emplace_back([this]() { migration_loop(); });
+    }
+
+    LOG_INFO("snapshot data migrator started")
+            .tag("concurrent", config::snapshot_data_migrator_concurrent);
+
+    return 0;
+}
+
+void SnapshotDataMigrator::stop() {
+    stopped_ = true;
+    notifier_.notify_all();
+    pending_instance_cond_.notify_all();
+    {
+        std::lock_guard lock(mtx_);
+        for (auto& [_, migrator] : migrating_instance_map_) {
+            migrator->stop();
+        }
+    }
+    for (auto& w : workers_) {
+        if (w.joinable()) w.join();
+    }
+}
+
+void SnapshotDataMigrator::migration_loop() {
+    while (!stopped()) {
+        // fetch instance to check
+        InstanceInfoPB instance;
+        {
+            std::unique_lock lock(mtx_);
+            pending_instance_cond_.wait(
+                    lock, [&]() -> bool { return 
!pending_instance_queue_.empty() || stopped(); });
+            if (stopped()) {
+                return;
+            }
+            instance = std::move(pending_instance_queue_.front());
+            pending_instance_queue_.pop_front();
+            pending_instance_set_.erase(instance.instance_id());
+        }
+        const auto& instance_id = instance.instance_id();
+        {
+            std::lock_guard lock(mtx_);
+            // skip instance in recycling
+            if (migrating_instance_map_.count(instance_id)) continue;
+        }
+
+        auto migrator = std::make_shared<InstanceDataMigrator>(txn_kv_, 
instance, migrate_context_);
+        if (migrator->init() != 0) {
+            LOG(WARNING) << "failed to init instance migrator, instance_id="
+                         << instance.instance_id();
+            continue;
+        }
+
+        std::string job_key = 
job_snapshot_data_migrator_key(instance.instance_id());
+        int ret =
+                prepare_instance_recycle_job(txn_kv_.get(), job_key, 
instance.instance_id(),
+                                             ip_port_, 
config::recycle_job_lease_expired_ms * 1000);
+        if (ret != 0) { // Prepare failed
+            continue;
+        } else {
+            std::lock_guard lock(mtx_);
+            migrating_instance_map_.emplace(instance_id, migrator);
+        }
+        if (stopped()) return;
+
+        migrator->do_migrate();
+        {
+            std::lock_guard lock(mtx_);
+            migrating_instance_map_.erase(instance.instance_id());
+        }
+    }
+}
+
+void SnapshotDataMigrator::scan_instance_loop() {
+    std::this_thread::sleep_for(
+            
std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds));
+    while (!stopped()) {
+        std::vector<InstanceInfoPB> instances;
+        get_all_instances(txn_kv_.get(), instances);
+        LOG(INFO) << "Snapshot data migrator get instances: " << [&instances] {
+            std::stringstream ss;
+            for (auto& i : instances) ss << ' ' << i.instance_id();
+            return ss.str();
+        }();
+        if (!instances.empty()) {
+            // enqueue instances
+            std::lock_guard lock(mtx_);
+            for (auto& instance : instances) {
+                if (instance.status() == InstanceInfoPB::DELETED) continue;
+                if (!is_instance_need_migrate(instance)) continue;
+                auto [_, success] = 
pending_instance_set_.insert(instance.instance_id());
+                // skip instance already in pending queue
+                if (success) {
+                    pending_instance_queue_.push_back(std::move(instance));
+                }
+            }
+            pending_instance_cond_.notify_all();
+        }
+        {
+            std::unique_lock lock(mtx_);
+            notifier_.wait_for(lock, 
std::chrono::seconds(config::scan_instances_interval_seconds),
+                               [&]() { return stopped(); });
+        }
+    }
+}
+
+void SnapshotDataMigrator::lease_migration_jobs() {
+    while (!stopped()) {
+        std::vector<std::string> instances;
+        instances.reserve(migrating_instance_map_.size());
+        {
+            std::lock_guard lock(mtx_);
+            for (auto& [id, _] : migrating_instance_map_) {
+                instances.push_back(id);
+            }
+        }
+        for (auto& i : instances) {
+            std::string job_key = job_snapshot_data_migrator_key(i);
+            int ret = lease_instance_recycle_job(txn_kv_.get(), job_key, i, 
ip_port_);
+            if (ret == 1) {
+                std::lock_guard lock(mtx_);
+                if (auto it = migrating_instance_map_.find(i);
+                    it != migrating_instance_map_.end()) {
+                    it->second->stop();
+                }
+            }
+        }
+        {
+            std::unique_lock lock(mtx_);
+            notifier_.wait_for(lock,
+                               
std::chrono::milliseconds(config::recycle_job_lease_expired_ms / 3),
+                               [&]() { return stopped(); });
+        }
+    }
+}
+
+bool SnapshotDataMigrator::is_instance_need_migrate(const InstanceInfoPB& 
instance_info) {
+    // The multi-version feature is enabled, but the snapshot feature is not 
ready.
+    return instance_info.multi_version_status() != 
MultiVersionStatus::MULTI_VERSION_DISABLED &&
+           instance_info.snapshot_switch_status() == 
SnapshotSwitchStatus::SNAPSHOT_SWITCH_DISABLED;
+}
+
+InstanceDataMigrator::InstanceDataMigrator(std::shared_ptr<TxnKv> txn_kv,
+                                           const InstanceInfoPB& instance,
+                                           SnapshotDataMigrateContext& 
migrate_context)
+        : txn_kv_(std::move(txn_kv)),
+          instance_id_(instance.instance_id()),
+          instance_info_(instance),
+          migrate_context_(migrate_context) {}
+
+InstanceDataMigrator::~InstanceDataMigrator() {
+    if (!stopped()) {
+        stop();
+    }
+}
+
+int InstanceDataMigrator::init() {
+    int ret = init_obj_store_accessors();
+    if (ret != 0) {
+        return ret;
+    }
+
+    return init_storage_vault_accessors();
+}
+
+int InstanceDataMigrator::init_obj_store_accessors() {
+    for (const auto& obj_info : instance_info_.obj_info()) {
+#ifdef UNIT_TEST
+        auto accessor = std::make_shared<MockAccessor>();
+#else
+        auto s3_conf = S3Conf::from_obj_store_info(obj_info);
+        if (!s3_conf) {
+            LOG(WARNING) << "failed to init object accessor, instance_id=" << 
instance_id_;
+            return -1;
+        }
+
+        std::shared_ptr<S3Accessor> accessor;
+        int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
+        if (ret != 0) {
+            LOG(WARNING) << "failed to init object accessor. instance_id=" << 
instance_id_
+                         << " resource_id=" << obj_info.id();
+            return ret;
+        }
+#endif
+
+        accessor_map_.emplace(obj_info.id(), std::move(accessor));
+    }
+
+    return 0;
+}
+
+int InstanceDataMigrator::init_storage_vault_accessors() {
+    if (instance_info_.resource_ids().empty()) {
+        return 0;
+    }
+
+    FullRangeGetOptions opts(txn_kv_);
+    opts.prefetch = true;
+    auto it = txn_kv_->full_range_get(storage_vault_key({instance_id_, ""}),
+                                      storage_vault_key({instance_id_, 
"\xff"}), std::move(opts));
+
+    for (auto kv = it->next(); kv.has_value(); kv = it->next()) {
+        auto [k, v] = *kv;
+        StorageVaultPB vault;
+        if (!vault.ParseFromArray(v.data(), v.size())) {
+            LOG(WARNING) << "malformed storage vault, unable to deserialize 
key=" << hex(k);
+            return -1;
+        }
+        
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::init_storage_vault_accessors.mock_vault",
+                                 &accessor_map_, &vault);
+        if (vault.has_hdfs_info()) {
+#ifdef ENABLE_HDFS_STORAGE_VAULT
+            auto accessor = std::make_shared<HdfsAccessor>(vault.hdfs_info());
+            int ret = accessor->init();
+            if (ret != 0) {
+                LOG(WARNING) << "failed to init hdfs accessor. instance_id=" 
<< instance_id_
+                             << " resource_id=" << vault.id() << " name=" << 
vault.name();
+                return ret;
+            }
+
+            accessor_map_.emplace(vault.id(), std::move(accessor));
+#else
+            LOG(ERROR) << "HDFS is disabled (via the ENABLE_HDFS_STORAGE_VAULT 
build option), "
+                       << "but HDFS storage vaults were detected";
+#endif
+        } else if (vault.has_obj_info()) {
+#ifdef UNIT_TEST
+            auto accessor = std::make_shared<MockAccessor>();
+#else
+            auto s3_conf = S3Conf::from_obj_store_info(vault.obj_info());
+            if (!s3_conf) {
+                LOG(WARNING) << "failed to init object accessor, instance_id=" 
<< instance_id_;
+                return -1;
+            }
+
+            std::shared_ptr<S3Accessor> accessor;
+            int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
+            if (ret != 0) {
+                LOG(WARNING) << "failed to init s3 accessor. instance_id=" << 
instance_id_
+                             << " resource_id=" << vault.id() << " name=" << 
vault.name();
+                return ret;
+            }
+#endif
+
+            accessor_map_.emplace(vault.id(), std::move(accessor));
+        }
+    }
+
+    if (!it->is_valid()) {
+        LOG_WARNING("failed to get storage vault kv");
+        return -1;
+    }
+    return 0;
+}
+
+int InstanceDataMigrator::do_migrate() {
+    LOG_WARNING("start data migration").tag("instance_id", instance_id_);
+
+    StopWatch stop_watch;
+    DORIS_CLOUD_DEFER {
+        LOG_WARNING("data migration done")
+                .tag("instance_id", instance_id_)
+                .tag("cost(sec)", stop_watch.elapsed_seconds());
+    };
+
+    SnapshotManager snapshot_mgr(txn_kv_);
+    int res = snapshot_mgr.migrate_to_versioned_keys(this);
+    if (res != 0) {
+        LOG_WARNING("failed to migrate snapshot keys").tag("instance_id", 
instance_id_);
+        return res;
+    }
+
+    return enable_instance_snapshot_switch();
+}
+
+int InstanceDataMigrator::enable_instance_snapshot_switch() {
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG_WARNING("failed to create txn for data 
migration").tag("instance_id", instance_id_);
+        return -1;
+    }
+
+    std::string key = instance_key(instance_id_);
+    std::string instance_value;
+    err = txn->get(key, &instance_value);
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG_WARNING("failed to get instance info in data migration")
+                .tag("instance_id", instance_id_)
+                .tag("error", err);
+        return -1;
+    }
+
+    InstanceInfoPB instance_info;
+    if (!instance_info.ParseFromString(instance_value)) {
+        LOG_WARNING("failed to parse instance info in data migration")
+                .tag("instance_id", instance_id_);
+        return -1;
+    }
+
+    
instance_info.set_snapshot_switch_status(SnapshotSwitchStatus::SNAPSHOT_SWITCH_OFF);
+    txn->put(key, instance_info.SerializeAsString());
+    err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG_WARNING("failed to commit instance info in data migration")
+                .tag("instance_id", instance_id_)
+                .tag("error", err);
+        return -1;
+    }
+
+    LOG_WARNING("finish data migration").tag("instance_id", instance_id_);
+
+    return 0;
+}
+
+} // namespace doris::cloud
diff --git a/cloud/src/recycler/snapshot_data_migrator.h 
b/cloud/src/recycler/snapshot_data_migrator.h
new file mode 100644
index 00000000000..ec4cae009f9
--- /dev/null
+++ b/cloud/src/recycler/snapshot_data_migrator.h
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <condition_variable>
+#include <cstdint>
+#include <deque>
+#include <memory>
+#include <string>
+#include <string_view>
+#include <thread>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "meta-service/txn_lazy_committer.h"
+#include "recycler/storage_vault_accessor.h"
+#include "snapshot/snapshot_manager.h"
+
+namespace doris::cloud {
+// class TxnKv;
+// class InstanceInfoPB;
+// class StorageVaultAccessor;
+// class SimpleThreadPool;
+// class Checker;
+// class SnapshotDataMigratorMetricsContext;
+class InstanceDataMigrator;
+
+struct SnapshotDataMigrateContext {
+    std::mutex mutex;
+
+    // The indexes that have been migrated.
+    std::unordered_set<int64_t> migrated_indexes;
+    // The partitions that have been migrated.
+    std::unordered_set<int64_t> migrated_partitions;
+};
+
+class SnapshotDataMigrator {
+public:
+    explicit SnapshotDataMigrator(std::shared_ptr<TxnKv> txn_kv);
+    ~SnapshotDataMigrator();
+
+    // returns 0 for success otherwise error
+    int start();
+
+    void stop();
+
+    bool stopped() const { return stopped_.load(std::memory_order_acquire); }
+
+private:
+    void scan_instance_loop();
+    void migration_loop();
+    void lease_migration_jobs();
+
+    bool is_instance_need_migrate(const InstanceInfoPB& instance_info);
+
+    SnapshotDataMigrateContext migrate_context_;
+
+    std::shared_ptr<TxnKv> txn_kv_;
+    std::atomic_bool stopped_ {false};
+    std::string ip_port_;
+    std::vector<std::thread> workers_;
+
+    std::mutex mtx_;
+    // notify migration workers
+    std::condition_variable pending_instance_cond_;
+    std::deque<InstanceInfoPB> pending_instance_queue_;
+    std::unordered_set<std::string> pending_instance_set_;
+    std::unordered_map<std::string, std::shared_ptr<InstanceDataMigrator>> 
migrating_instance_map_;
+    // notify instance scanner and lease thread
+    std::condition_variable notifier_;
+
+    std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
+    std::shared_ptr<SnapshotManager> snapshot_manager_;
+};
+
+class InstanceDataMigrator {
+public:
+    InstanceDataMigrator(std::shared_ptr<TxnKv> txn_kv, const InstanceInfoPB& 
instance,
+                         SnapshotDataMigrateContext& migrate_context);
+    ~InstanceDataMigrator();
+
+    std::string_view instance_id() const { return instance_id_; }
+    const InstanceInfoPB& instance_info() const { return instance_info_; }
+
+    // returns 0 for success otherwise error
+    int init();
+
+    void stop() { stopped_.store(true, std::memory_order_release); }
+    bool stopped() const { return stopped_.load(std::memory_order_acquire); }
+
+    // returns 0 for success otherwise error
+    int do_migrate();
+
+    SnapshotDataMigrateContext& get_migrate_context() { return 
migrate_context_; }
+
+private:
+    // returns 0 for success otherwise error
+    int init_obj_store_accessors();
+
+    // returns 0 for success otherwise error
+    int init_storage_vault_accessors();
+
+    // Enable instance snapshot switch after data migration is done.
+    int enable_instance_snapshot_switch();
+
+    std::atomic_bool stopped_ {false};
+    std::shared_ptr<TxnKv> txn_kv_;
+    std::string instance_id_;
+    InstanceInfoPB instance_info_;
+    SnapshotDataMigrateContext& migrate_context_;
+
+    std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>> 
accessor_map_;
+};
+
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/snapshot/snapshot_manager.cpp 
b/cloud/src/snapshot/snapshot_manager.cpp
index 2f38e36a575..d18e8dc3676 100644
--- a/cloud/src/snapshot/snapshot_manager.cpp
+++ b/cloud/src/snapshot/snapshot_manager.cpp
@@ -150,4 +150,12 @@ int 
SnapshotManager::recycle_snapshot_meta_and_data(std::string_view instance_id
     return 0;
 }
 
+int SnapshotManager::migrate_to_versioned_keys(InstanceDataMigrator* migrator) 
{
+    return 0;
+}
+
+int SnapshotManager::compact_snapshot_chains(InstanceChainCompactor* 
compactor) {
+    return 0;
+}
+
 } // namespace doris::cloud
diff --git a/cloud/src/snapshot/snapshot_manager.h 
b/cloud/src/snapshot/snapshot_manager.h
index 7413d8ca6bb..a4d4deed0a0 100644
--- a/cloud/src/snapshot/snapshot_manager.h
+++ b/cloud/src/snapshot/snapshot_manager.h
@@ -27,6 +27,8 @@ namespace doris::cloud {
 class InstanceRecycler;
 class InstanceChecker;
 class StorageVaultAccessor;
+class InstanceDataMigrator;
+class InstanceChainCompactor;
 
 // A abstract class for managing cluster snapshots.
 class SnapshotManager {
@@ -78,6 +80,14 @@ public:
     static bool parse_snapshot_versionstamp(std::string_view snapshot_id,
                                             Versionstamp* versionstamp);
 
+    // Migrate the single version keys to multi-version keys for the instance.
+    // Return 0 for success otherwise error.
+    virtual int migrate_to_versioned_keys(InstanceDataMigrator* migrator);
+
+    // Compress snapshot chains for the instance.
+    // Return 0 for success otherwise error.
+    virtual int compact_snapshot_chains(InstanceChainCompactor* compactor);
+
 private:
     SnapshotManager(const SnapshotManager&) = delete;
     SnapshotManager& operator=(const SnapshotManager&) = delete;
diff --git a/cloud/test/keys_test.cpp b/cloud/test/keys_test.cpp
index 60fcc2b375b..142627e7fef 100644
--- a/cloud/test/keys_test.cpp
+++ b/cloud/test/keys_test.cpp
@@ -936,6 +936,54 @@ TEST(KeysTest, JobKeysTest) {
         EXPECT_EQ("check", dec_job_suffix);
         EXPECT_EQ(instance_id, dec_instance_id);
     }
+
+    // 0x01 "job" ${instance_id} "snapshot_data_migrator"                      
                -> JobSnapshotDataMigratorPB
+    {
+        JobSnapshotDataMigratorKeyInfo job_key {instance_id};
+        std::string encoded_job_key0;
+        job_snapshot_data_migrator_key(job_key, &encoded_job_key0);
+        std::cout << hex(encoded_job_key0) << std::endl;
+
+        std::string dec_instance_id;
+
+        std::string_view key_sv(encoded_job_key0);
+        std::string dec_job_prefix;
+        std::string dec_job_suffix;
+
+        remove_user_space_prefix(&key_sv);
+        ASSERT_EQ(decode_bytes(&key_sv, &dec_job_prefix), 0);
+        ASSERT_EQ(decode_bytes(&key_sv, &dec_instance_id), 0);
+        ASSERT_EQ(decode_bytes(&key_sv, &dec_job_suffix), 0);
+        ASSERT_TRUE(key_sv.empty());
+
+        EXPECT_EQ("job", dec_job_prefix);
+        EXPECT_EQ("snapshot_data_migrator", dec_job_suffix);
+        EXPECT_EQ(instance_id, dec_instance_id);
+    }
+
+    // 0x01 "job" ${instance_id} "snapshot_chain_compactor"                    
                -> JobSnapshotChainCompactorPB
+    {
+        JobSnapshotChainCompactorKeyInfo job_key {instance_id};
+        std::string encoded_job_key0;
+        job_snapshot_chain_compactor_key(job_key, &encoded_job_key0);
+        std::cout << hex(encoded_job_key0) << std::endl;
+
+        std::string dec_instance_id;
+
+        std::string_view key_sv(encoded_job_key0);
+        std::string dec_job_prefix;
+        std::string dec_job_suffix;
+
+        remove_user_space_prefix(&key_sv);
+        ASSERT_EQ(decode_bytes(&key_sv, &dec_job_prefix), 0);
+        ASSERT_EQ(decode_bytes(&key_sv, &dec_instance_id), 0);
+        ASSERT_EQ(decode_bytes(&key_sv, &dec_job_suffix), 0);
+        ASSERT_TRUE(key_sv.empty());
+
+        EXPECT_EQ("job", dec_job_prefix);
+        EXPECT_EQ("snapshot_chain_compactor", dec_job_suffix);
+        EXPECT_EQ(instance_id, dec_instance_id);
+    }
 }
 
 TEST(KeysTest, SystemKeysTest) {
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 7ce5bd13d91..04b55662f18 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -68,6 +68,32 @@ enum SnapshotSwitchStatus {
     SNAPSHOT_SWITCH_ON = 2;
 }
 
+enum KeySetType {
+    UNKNOWN_KEY_SET = 0;
+
+    // Single version keys
+    SINGLE_VERSION_PARTITION_VERSION = 1;
+    SINGLE_VERSION_TABLE_VERSION = 2;
+    SINGLE_VERSION_META_ROWSET = 3;
+    SINGLE_VERSION_META_TABLET = 4;
+    SINGLE_VERSION_META_SCHEMA = 5;
+    SINGLE_VERSION_TABLET_STATS = 6;
+    SINGLE_VERSION_TABLET_INDEX = 7;
+
+    // Multi version keys
+    MULTI_VERSION_PARTITION_VERSION = 10;
+    MULTI_VERSION_TABLE_VERSION = 12;
+    MULTI_VERSION_INDEX_PARTITION = 13;
+    MULTI_VERSION_INDEX_INDEX = 14;
+    MULTI_VERSION_INDEX_TABLET = 15;
+    MULTI_VERSION_TABLET_STATS = 16;
+    MULTI_VERSION_META_PARTITION = 17;
+    MULTI_VERSION_META_INDEX = 18;
+    MULTI_VERSION_META_TABLET = 19;
+    MULTI_VERSION_META_SCHEMA = 20;
+    MULTI_VERSION_META_ROWSET = 21;
+}
+
 message InstanceInfoPB {
     enum Status {
         NORMAL = 0;
@@ -108,6 +134,9 @@ message InstanceInfoPB {
     // Snapshot properties.
     optional int64 max_reserved_snapshot = 117;
     optional int64 snapshot_interval_seconds = 118;
+
+    repeated KeySetType migrated_key_sets = 119;    // convert single version 
keys to multi version keys
+    repeated KeySetType compacted_key_sets = 120;   // compact snapshot chain 
...
 }
 
 message StagePB {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to