This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 286b47f04a2 [feat](cloud) Add snapshot data migrator & chain compactor
skeleton (#56939)
286b47f04a2 is described below
commit 286b47f04a21069462896e826afb4e08a94c8370
Author: walter <[email protected]>
AuthorDate: Thu Oct 16 14:04:39 2025 +0800
[feat](cloud) Add snapshot data migrator & chain compactor skeleton (#56939)
---
cloud/src/common/config.h | 5 +
cloud/src/common/stopwatch.h | 4 +
cloud/src/meta-store/keys.cpp | 18 +-
cloud/src/meta-store/keys.h | 12 +-
cloud/src/recycler/recycler.cpp | 27 ++
cloud/src/recycler/recycler.h | 4 +
cloud/src/recycler/snapshot_chain_compactor.cpp | 323 +++++++++++++++++++++
cloud/src/recycler/snapshot_chain_compactor.h | 111 ++++++++
cloud/src/recycler/snapshot_data_migrator.cpp | 362 ++++++++++++++++++++++++
cloud/src/recycler/snapshot_data_migrator.h | 132 +++++++++
cloud/src/snapshot/snapshot_manager.cpp | 8 +
cloud/src/snapshot/snapshot_manager.h | 10 +
cloud/test/keys_test.cpp | 48 ++++
gensrc/proto/cloud.proto | 29 ++
14 files changed, 1091 insertions(+), 2 deletions(-)
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 317dbff7e6b..eaebca1f827 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -376,8 +376,13 @@ CONF_Bool(enable_multi_version_status, "false");
// New instance enable cluster snapshot, it only works when
enable_multi_version_status is true.
// The new instance snapshot switch status will be set to SNAPSHOT_SWITCH_ON,
and the auto snapshot will be open.
CONF_Bool(enable_cluster_snapshot, "false");
+CONF_Bool(enable_snapshot_data_migrator, "false");
+CONF_Bool(enable_snapshot_chain_compactor, "false");
+CONF_Int32(snapshot_data_migrator_concurrent, "2");
+CONF_Int32(snapshot_chain_compactor_concurrent, "2");
CONF_mString(aws_credentials_provider_version, "v2");
CONF_Validator(aws_credentials_provider_version,
[](const std::string& config) -> bool { return config == "v1"
|| config == "v2"; });
+
} // namespace doris::cloud::config
diff --git a/cloud/src/common/stopwatch.h b/cloud/src/common/stopwatch.h
index 4ac9abaa7f6..fa9fcfa7667 100644
--- a/cloud/src/common/stopwatch.h
+++ b/cloud/src/common/stopwatch.h
@@ -66,6 +66,10 @@ public:
.count();
}
+ int64_t elapsed_ms() const { return elapsed_us() / 1000; }
+
+ int64_t elapsed_seconds() const { return elapsed_ms() / 1000; }
+
private:
std::chrono::steady_clock::time_point start_;
std::chrono::steady_clock::duration elapsed_ {0};
diff --git a/cloud/src/meta-store/keys.cpp b/cloud/src/meta-store/keys.cpp
index 53280b24d1f..153a936223c 100644
--- a/cloud/src/meta-store/keys.cpp
+++ b/cloud/src/meta-store/keys.cpp
@@ -68,6 +68,8 @@ static const char* JOB_KEY_INFIX_RL_PROGRESS =
"routine_load_progress
static const char* JOB_KEY_INFIX_STREAMING_JOB = "streaming_job";
static const char* JOB_KEY_INFIX_RESTORE_TABLET = "restore_tablet";
static const char* JOB_KEY_INFIX_RESTORE_ROWSET = "restore_rowset";
+static const char* JOB_KEY_INFIX_SNAPSHOT_DATA_MIGRATOR =
"snapshot_data_migrator";
+static const char* JOB_KEY_INFIX_SNAPSHOT_CHAIN_COMPACTOR =
"snapshot_chain_compactor";
static const char* COPY_JOB_KEY_INFIX = "job";
static const char* COPY_FILE_KEY_INFIX = "loading_file";
@@ -146,7 +148,8 @@ static void encode_prefix(const T& t, std::string* key) {
MetaDeleteBitmapInfo, MetaDeleteBitmapUpdateLockInfo,
MetaPendingDeleteBitmapInfo, PartitionVersionKeyInfo,
RecycleIndexKeyInfo, RecyclePartKeyInfo, RecycleRowsetKeyInfo,
RecycleTxnKeyInfo, RecycleStageKeyInfo,
StatsTabletKeyInfo, TableVersionKeyInfo, JobRestoreTabletKeyInfo,
JobRestoreRowsetKeyInfo,
- JobTabletKeyInfo, JobRecycleKeyInfo, RLJobProgressKeyInfo,
StreamingJobKeyInfo,
+ JobTabletKeyInfo, JobRecycleKeyInfo, JobSnapshotDataMigratorKeyInfo,
JobSnapshotChainCompactorKeyInfo,
+ RLJobProgressKeyInfo, StreamingJobKeyInfo,
CopyJobKeyInfo, CopyFileKeyInfo, StorageVaultKeyInfo,
MetaSchemaPBDictionaryInfo,
MowTabletJobInfo>);
@@ -183,6 +186,8 @@ static void encode_prefix(const T& t, std::string* key) {
encode_bytes(STATS_KEY_PREFIX, key);
} else if constexpr (std::is_same_v<T, JobTabletKeyInfo>
|| std::is_same_v<T, JobRecycleKeyInfo>
+ || std::is_same_v<T, JobSnapshotDataMigratorKeyInfo>
+ || std::is_same_v<T, JobSnapshotChainCompactorKeyInfo>
|| std::is_same_v<T, RLJobProgressKeyInfo>
|| std::is_same_v<T, StreamingJobKeyInfo>) {
encode_bytes(JOB_KEY_PREFIX, key);
@@ -459,6 +464,17 @@ void job_check_key(const JobRecycleKeyInfo& in,
std::string* out) {
encode_bytes("check", out); // "check"
}
+void job_snapshot_data_migrator_key(const JobSnapshotDataMigratorKeyInfo& in,
std::string* out) {
+ encode_prefix(in, out); // 0x01 "job"
${instance_id}
+ encode_bytes(JOB_KEY_INFIX_SNAPSHOT_DATA_MIGRATOR, out); //
"snapshot_data_migrator"
+}
+
+void job_snapshot_chain_compactor_key(const JobSnapshotChainCompactorKeyInfo&
in,
+ std::string* out) {
+ encode_prefix(in, out); // 0x01 "job"
${instance_id}
+ encode_bytes(JOB_KEY_INFIX_SNAPSHOT_CHAIN_COMPACTOR, out); //
"snapshot_chain_compactor"
+}
+
void rl_job_progress_key_info(const RLJobProgressKeyInfo& in, std::string*
out) {
encode_prefix(in, out); // 0x01 "job" ${instance_id}
encode_bytes(JOB_KEY_INFIX_RL_PROGRESS, out); // "routine_load_progress"
diff --git a/cloud/src/meta-store/keys.h b/cloud/src/meta-store/keys.h
index b529efa450f..965b3eb0c1a 100644
--- a/cloud/src/meta-store/keys.h
+++ b/cloud/src/meta-store/keys.h
@@ -66,8 +66,9 @@
// 0x01 "job" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id}
${tablet_id} -> TabletJobInfoPB
// 0x01 "job" ${instance_id} "recycle"
-> JobRecyclePB
// 0x01 "job" ${instance_id} "check"
-> JobRecyclePB
+// 0x01 "job" ${instance_id} "snapshot_data_migrator"
-> JobRecyclePB
+// 0x01 "job" ${instance_id} "snapshot_chain_compactor"
-> JobRecyclePB
// 0x01 "job" ${instance_id} "streaming_job" ${db_id} ${job_id}
-> StreamingJobPB
-
//
// 0x01 "copy" ${instance_id} "job" ${stage_id} ${table_id} ${copy_id}
${group_id} -> CopyJobPB
// 0x01 "copy" ${instance_id} "loading_file" ${stage_id} ${table_id}
${obj_name} ${etag} -> CopyFilePB
@@ -205,6 +206,11 @@ using RecycleStageKeyInfo = BasicKeyInfo<19,
std::tuple<std::string, std::stri
// 0:instance_id
using JobRecycleKeyInfo = BasicKeyInfo<20 , std::tuple<std::string>>;
+// 0:instance_id
+using JobSnapshotDataMigratorKeyInfo = BasicKeyInfo<53,
std::tuple<std::string>>;
+// 0:instance_id
+using JobSnapshotChainCompactorKeyInfo = BasicKeyInfo<54,
std::tuple<std::string>>;
+
// 0:instance_id
1:index_id 2:schema_version
using MetaSchemaKeyInfo = BasicKeyInfo<21, std::tuple<std::string,
int64_t, int64_t>>;
@@ -408,7 +414,11 @@ static inline std::string job_restore_rowset_key(const
JobRestoreRowsetKeyInfo&
void job_recycle_key(const JobRecycleKeyInfo& in, std::string* out);
void job_check_key(const JobRecycleKeyInfo& in, std::string* out);
+void job_snapshot_data_migrator_key(const JobSnapshotDataMigratorKeyInfo& in,
std::string* out);
+void job_snapshot_chain_compactor_key(const JobSnapshotChainCompactorKeyInfo&
in, std::string* out);
static inline std::string job_check_key(const JobRecycleKeyInfo& in) {
std::string s; job_check_key(in, &s); return s; }
+static inline std::string job_snapshot_data_migrator_key(const
JobSnapshotDataMigratorKeyInfo& in) { std::string s;
job_snapshot_data_migrator_key(in, &s); return s; }
+static inline std::string job_snapshot_chain_compactor_key(const
JobSnapshotChainCompactorKeyInfo& in) { std::string s;
job_snapshot_chain_compactor_key(in, &s); return s; }
void job_tablet_key(const JobTabletKeyInfo& in, std::string* out);
static inline std::string job_tablet_key(const JobTabletKeyInfo& in) {
std::string s; job_tablet_key(in, &s); return s; }
void rl_job_progress_key_info(const RLJobProgressKeyInfo& in, std::string*
out);
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 79fcbdcd45d..5f30536078f 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -415,6 +415,27 @@ int Recycler::start(brpc::Server* server) {
workers_.emplace_back(std::mem_fn(&Recycler::lease_recycle_jobs), this);
workers_.emplace_back(std::mem_fn(&Recycler::check_recycle_tasks), this);
+
+ if (config::enable_snapshot_data_migrator) {
+ snapshot_data_migrator_ =
std::make_shared<SnapshotDataMigrator>(txn_kv_);
+ int ret = snapshot_data_migrator_->start();
+ if (ret != 0) {
+ LOG(ERROR) << "failed to start snapshot data migrator";
+ return ret;
+ }
+ LOG(INFO) << "snapshot data migrator started";
+ }
+
+ if (config::enable_snapshot_chain_compactor) {
+ snapshot_chain_compactor_ =
std::make_shared<SnapshotChainCompactor>(txn_kv_);
+ int ret = snapshot_chain_compactor_->start();
+ if (ret != 0) {
+ LOG(ERROR) << "failed to start snapshot chain compactor";
+ return ret;
+ }
+ LOG(INFO) << "snapshot chain compactor started";
+ }
+
return 0;
}
@@ -434,6 +455,12 @@ void Recycler::stop() {
if (checker_) {
checker_->stop();
}
+ if (snapshot_data_migrator_) {
+ snapshot_data_migrator_->stop();
+ }
+ if (snapshot_chain_compactor_) {
+ snapshot_chain_compactor_->stop();
+ }
}
class InstanceRecycler::InvertedIndexIdCache {
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index 2df94e63974..3d0b12da142 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -34,6 +34,8 @@
#include "common/bvars.h"
#include "meta-service/txn_lazy_committer.h"
#include "meta-store/versionstamp.h"
+#include "recycler/snapshot_chain_compactor.h"
+#include "recycler/snapshot_data_migrator.h"
#include "recycler/storage_vault_accessor.h"
#include "recycler/white_black_list.h"
#include "snapshot/snapshot_manager.h"
@@ -118,6 +120,8 @@ private:
std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
std::shared_ptr<SnapshotManager> snapshot_manager_;
+ std::shared_ptr<SnapshotDataMigrator> snapshot_data_migrator_;
+ std::shared_ptr<SnapshotChainCompactor> snapshot_chain_compactor_;
};
enum class RowsetRecyclingState {
diff --git a/cloud/src/recycler/snapshot_chain_compactor.cpp
b/cloud/src/recycler/snapshot_chain_compactor.cpp
new file mode 100644
index 00000000000..f702f8326a8
--- /dev/null
+++ b/cloud/src/recycler/snapshot_chain_compactor.cpp
@@ -0,0 +1,323 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "recycler/snapshot_chain_compactor.h"
+
+#include <gen_cpp/cloud.pb.h>
+
+#include "common/config.h"
+#include "common/stopwatch.h"
+#include "common/util.h"
+#include "meta-store/keys.h"
+#include "mock_accessor.h"
+#include "recycler/hdfs_accessor.h"
+#include "recycler/s3_accessor.h"
+#include "recycler/util.h"
+
+namespace doris::cloud {
+
+SnapshotChainCompactor::SnapshotChainCompactor(std::shared_ptr<TxnKv> txn_kv)
+ : txn_kv_(std::move(txn_kv)) {}
+
+SnapshotChainCompactor::~SnapshotChainCompactor() {
+ if (!stopped()) {
+ stop();
+ }
+}
+
+int SnapshotChainCompactor::start() {
+ workers_.emplace_back([this]() { scan_instance_loop(); });
+ workers_.emplace_back([this] { lease_compaction_jobs(); });
+ for (int i = 0; i < config::snapshot_chain_compactor_concurrent; ++i) {
+ workers_.emplace_back([this]() { compaction_loop(); });
+ }
+
+ LOG_INFO("snapshot chain compactor started")
+ .tag("concurrent", config::snapshot_chain_compactor_concurrent);
+
+ return 0;
+}
+
+void SnapshotChainCompactor::stop() {
+ stopped_ = true;
+ notifier_.notify_all();
+ pending_instance_cond_.notify_all();
+ {
+ std::lock_guard lock(mtx_);
+ for (auto& [_, compactor] : compacting_instance_map_) {
+ compactor->stop();
+ }
+ }
+ for (auto& w : workers_) {
+ if (w.joinable()) w.join();
+ }
+}
+
+void SnapshotChainCompactor::compaction_loop() {
+ while (!stopped()) {
+ // fetch instance to check
+ InstanceInfoPB instance;
+ {
+ std::unique_lock lock(mtx_);
+ pending_instance_cond_.wait(
+ lock, [&]() -> bool { return
!pending_instance_queue_.empty() || stopped(); });
+ if (stopped()) {
+ return;
+ }
+ instance = std::move(pending_instance_queue_.front());
+ pending_instance_queue_.pop_front();
+ pending_instance_set_.erase(instance.instance_id());
+ }
+ const auto& instance_id = instance.instance_id();
+ {
+ std::lock_guard lock(mtx_);
+ // skip instance in compacting
+ if (compacting_instance_map_.count(instance_id)) continue;
+ }
+
+ auto compactor = std::make_shared<InstanceChainCompactor>(txn_kv_,
instance);
+ if (compactor->init() != 0) {
+ LOG(WARNING) << "failed to init instance compactor, instance_id="
+ << instance.instance_id();
+ continue;
+ }
+
+ std::string job_key =
job_snapshot_chain_compactor_key(instance.instance_id());
+ int ret =
+ prepare_instance_recycle_job(txn_kv_.get(), job_key,
instance.instance_id(),
+ ip_port_,
config::recycle_job_lease_expired_ms * 1000);
+ if (ret != 0) { // Prepare failed
+ continue;
+ } else {
+ std::lock_guard lock(mtx_);
+ compacting_instance_map_.emplace(instance_id, compactor);
+ }
+ if (stopped()) return;
+
+ compactor->do_compact();
+ {
+ std::lock_guard lock(mtx_);
+ compacting_instance_map_.erase(instance.instance_id());
+ }
+ }
+}
+
+void SnapshotChainCompactor::scan_instance_loop() {
+ std::this_thread::sleep_for(
+
std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds));
+ while (!stopped()) {
+ std::vector<InstanceInfoPB> instances;
+ get_all_instances(txn_kv_.get(), instances);
+ LOG(INFO) << "Snapshot chain compactor get instances: " <<
[&instances] {
+ std::stringstream ss;
+ for (auto& i : instances) ss << ' ' << i.instance_id();
+ return ss.str();
+ }();
+ if (!instances.empty()) {
+ // enqueue instances
+ std::lock_guard lock(mtx_);
+ for (auto& instance : instances) {
+ if (instance.status() == InstanceInfoPB::DELETED) continue;
+ if (!is_snapshot_chain_need_compact(instance)) continue;
+ auto [_, success] =
pending_instance_set_.insert(instance.instance_id());
+ // skip instance already in pending queue
+ if (success) {
+ pending_instance_queue_.push_back(std::move(instance));
+ }
+ }
+ pending_instance_cond_.notify_all();
+ }
+ {
+ std::unique_lock lock(mtx_);
+ notifier_.wait_for(lock,
std::chrono::seconds(config::recycle_interval_seconds),
+ [&]() { return stopped(); });
+ }
+ }
+}
+
+void SnapshotChainCompactor::lease_compaction_jobs() {
+ while (!stopped()) {
+ std::vector<std::string> instances;
+ instances.reserve(compacting_instance_map_.size());
+ {
+ std::lock_guard lock(mtx_);
+ for (auto& [id, _] : compacting_instance_map_) {
+ instances.push_back(id);
+ }
+ }
+ for (auto& i : instances) {
+ std::string job_key = job_snapshot_chain_compactor_key(i);
+ int ret = lease_instance_recycle_job(txn_kv_.get(), job_key, i,
ip_port_);
+ if (ret == 1) {
+ std::lock_guard lock(mtx_);
+ if (auto it = compacting_instance_map_.find(i);
+ it != compacting_instance_map_.end()) {
+ it->second->stop();
+ }
+ }
+ }
+ {
+ std::unique_lock lock(mtx_);
+ notifier_.wait_for(lock,
+
std::chrono::milliseconds(config::recycle_job_lease_expired_ms / 3),
+ [&]() { return stopped(); });
+ }
+ }
+}
+
+bool SnapshotChainCompactor::is_snapshot_chain_need_compact(const
InstanceInfoPB& instance_info) {
+ // TODO:
+ return false;
+}
+
+InstanceChainCompactor::InstanceChainCompactor(std::shared_ptr<TxnKv> txn_kv,
+ const InstanceInfoPB& instance)
+ : txn_kv_(std::move(txn_kv)),
+ instance_id_(instance.instance_id()),
+ instance_info_(instance) {}
+
+InstanceChainCompactor::~InstanceChainCompactor() {
+ if (!stopped()) {
+ stop();
+ }
+}
+
+int InstanceChainCompactor::init() {
+ int ret = init_obj_store_accessors();
+ if (ret != 0) {
+ return ret;
+ }
+
+ return init_storage_vault_accessors();
+}
+
+int InstanceChainCompactor::init_obj_store_accessors() {
+ for (const auto& obj_info : instance_info_.obj_info()) {
+#ifdef UNIT_TEST
+ auto accessor = std::make_shared<MockAccessor>();
+#else
+ auto s3_conf = S3Conf::from_obj_store_info(obj_info);
+ if (!s3_conf) {
+ LOG(WARNING) << "failed to init object accessor, instance_id=" <<
instance_id_;
+ return -1;
+ }
+
+ std::shared_ptr<S3Accessor> accessor;
+ int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
+ if (ret != 0) {
+ LOG(WARNING) << "failed to init object accessor. instance_id=" <<
instance_id_
+ << " resource_id=" << obj_info.id();
+ return ret;
+ }
+#endif
+
+ accessor_map_.emplace(obj_info.id(), std::move(accessor));
+ }
+
+ return 0;
+}
+
+int InstanceChainCompactor::init_storage_vault_accessors() {
+ if (instance_info_.resource_ids().empty()) {
+ return 0;
+ }
+
+ FullRangeGetOptions opts(txn_kv_);
+ opts.prefetch = true;
+ auto it = txn_kv_->full_range_get(storage_vault_key({instance_id_, ""}),
+ storage_vault_key({instance_id_,
"\xff"}), std::move(opts));
+
+ for (auto kv = it->next(); kv.has_value(); kv = it->next()) {
+ auto [k, v] = *kv;
+ StorageVaultPB vault;
+ if (!vault.ParseFromArray(v.data(), v.size())) {
+ LOG(WARNING) << "malformed storage vault, unable to deserialize
key=" << hex(k);
+ return -1;
+ }
+
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::init_storage_vault_accessors.mock_vault",
+ &accessor_map_, &vault);
+ if (vault.has_hdfs_info()) {
+#ifdef ENABLE_HDFS_STORAGE_VAULT
+ auto accessor = std::make_shared<HdfsAccessor>(vault.hdfs_info());
+ int ret = accessor->init();
+ if (ret != 0) {
+ LOG(WARNING) << "failed to init hdfs accessor. instance_id="
<< instance_id_
+ << " resource_id=" << vault.id() << " name=" <<
vault.name();
+ return ret;
+ }
+
+ accessor_map_.emplace(vault.id(), std::move(accessor));
+#else
+ LOG(ERROR) << "HDFS is disabled (via the ENABLE_HDFS_STORAGE_VAULT
build option), "
+ << "but HDFS storage vaults were detected";
+#endif
+ } else if (vault.has_obj_info()) {
+#ifdef UNIT_TEST
+ auto accessor = std::make_shared<MockAccessor>();
+#else
+ auto s3_conf = S3Conf::from_obj_store_info(vault.obj_info());
+ if (!s3_conf) {
+ LOG(WARNING) << "failed to init object accessor, instance_id="
<< instance_id_;
+ return -1;
+ }
+
+ std::shared_ptr<S3Accessor> accessor;
+ int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
+ if (ret != 0) {
+ LOG(WARNING) << "failed to init s3 accessor. instance_id=" <<
instance_id_
+ << " resource_id=" << vault.id() << " name=" <<
vault.name();
+ return ret;
+ }
+#endif
+
+ accessor_map_.emplace(vault.id(), std::move(accessor));
+ }
+ }
+
+ if (!it->is_valid()) {
+ LOG_WARNING("failed to get storage vault kv");
+ return -1;
+ }
+ return 0;
+}
+
+int InstanceChainCompactor::do_compact() {
+ LOG_WARNING("start snapshot chain compaction").tag("instance_id",
instance_id_);
+
+ StopWatch stop_watch;
+ DORIS_CLOUD_DEFER {
+ LOG_WARNING("snapshot chain compaction done")
+ .tag("instance_id", instance_id_)
+ .tag("cost(sec)", stop_watch.elapsed_seconds());
+ };
+
+ SnapshotManager snapshot_mgr(txn_kv_);
+ int res = snapshot_mgr.compact_snapshot_chains(this);
+ if (res != 0) {
+ LOG_WARNING("failed to compact snapshot chains").tag("instance_id",
instance_id_);
+ return res;
+ }
+
+ return handle_compaction_completion();
+}
+
+int InstanceChainCompactor::handle_compaction_completion() {
+ // TODO:
+ return 0;
+}
+
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/recycler/snapshot_chain_compactor.h
b/cloud/src/recycler/snapshot_chain_compactor.h
new file mode 100644
index 00000000000..f7d81ef95d2
--- /dev/null
+++ b/cloud/src/recycler/snapshot_chain_compactor.h
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <condition_variable>
+#include <deque>
+#include <memory>
+#include <string>
+#include <string_view>
+#include <thread>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "meta-service/txn_lazy_committer.h"
+#include "recycler/storage_vault_accessor.h"
+#include "snapshot/snapshot_manager.h"
+
+namespace doris::cloud {
+
+class InstanceChainCompactor;
+
+class SnapshotChainCompactor {
+public:
+ explicit SnapshotChainCompactor(std::shared_ptr<TxnKv> txn_kv);
+ ~SnapshotChainCompactor();
+
+ // returns 0 for success otherwise error
+ int start();
+
+ void stop();
+
+ bool stopped() const { return stopped_.load(std::memory_order_acquire); }
+
+private:
+ void scan_instance_loop();
+ void compaction_loop();
+ void lease_compaction_jobs();
+
+ bool is_snapshot_chain_need_compact(const InstanceInfoPB& instance_info);
+
+ std::shared_ptr<TxnKv> txn_kv_;
+ std::atomic_bool stopped_ {false};
+ std::string ip_port_;
+ std::vector<std::thread> workers_;
+
+ std::mutex mtx_;
+ // notify compaction workers
+ std::condition_variable pending_instance_cond_;
+ std::deque<InstanceInfoPB> pending_instance_queue_;
+ std::unordered_set<std::string> pending_instance_set_;
+ std::unordered_map<std::string, std::shared_ptr<InstanceChainCompactor>>
+ compacting_instance_map_;
+ // notify instance scanner and lease thread
+ std::condition_variable notifier_;
+
+ std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
+ std::shared_ptr<SnapshotManager> snapshot_manager_;
+};
+
+class InstanceChainCompactor {
+public:
+ InstanceChainCompactor(std::shared_ptr<TxnKv> txn_kv, const
InstanceInfoPB& instance);
+ ~InstanceChainCompactor();
+
+ std::string_view instance_id() const { return instance_id_; }
+ const InstanceInfoPB& instance_info() const { return instance_info_; }
+
+ // returns 0 for success otherwise error
+ int init();
+
+ void stop() { stopped_.store(true, std::memory_order_release); }
+ bool stopped() const { return stopped_.load(std::memory_order_acquire); }
+
+ // returns 0 for success otherwise error
+ int do_compact();
+
+private:
+ // returns 0 for success otherwise error
+ int init_obj_store_accessors();
+
+ // returns 0 for success otherwise error
+ int init_storage_vault_accessors();
+
+ int handle_compaction_completion();
+
+ std::atomic_bool stopped_ {false};
+ std::shared_ptr<TxnKv> txn_kv_;
+ std::string instance_id_;
+ InstanceInfoPB instance_info_;
+
+ std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>>
accessor_map_;
+};
+
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/recycler/snapshot_data_migrator.cpp
b/cloud/src/recycler/snapshot_data_migrator.cpp
new file mode 100644
index 00000000000..57c495aa886
--- /dev/null
+++ b/cloud/src/recycler/snapshot_data_migrator.cpp
@@ -0,0 +1,362 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "recycler/snapshot_data_migrator.h"
+
+#include <gen_cpp/cloud.pb.h>
+
+#include "common/config.h"
+#include "common/defer.h"
+#include "common/stopwatch.h"
+#include "common/util.h"
+#include "meta-store/keys.h"
+#include "mock_accessor.h"
+#include "recycler/hdfs_accessor.h"
+#include "recycler/s3_accessor.h"
+#include "recycler/util.h"
+
+namespace doris::cloud {
+
+SnapshotDataMigrator::SnapshotDataMigrator(std::shared_ptr<TxnKv> txn_kv)
+ : txn_kv_(std::move(txn_kv)) {}
+
+SnapshotDataMigrator::~SnapshotDataMigrator() {
+ if (!stopped()) {
+ stop();
+ }
+}
+
+int SnapshotDataMigrator::start() {
+ workers_.emplace_back([this]() { scan_instance_loop(); });
+ workers_.emplace_back([this] { lease_migration_jobs(); });
+ for (int i = 0; i < config::snapshot_data_migrator_concurrent; ++i) {
+ workers_.emplace_back([this]() { migration_loop(); });
+ }
+
+ LOG_INFO("snapshot data migrator started")
+ .tag("concurrent", config::snapshot_data_migrator_concurrent);
+
+ return 0;
+}
+
+void SnapshotDataMigrator::stop() {
+ stopped_ = true;
+ notifier_.notify_all();
+ pending_instance_cond_.notify_all();
+ {
+ std::lock_guard lock(mtx_);
+ for (auto& [_, migrator] : migrating_instance_map_) {
+ migrator->stop();
+ }
+ }
+ for (auto& w : workers_) {
+ if (w.joinable()) w.join();
+ }
+}
+
+void SnapshotDataMigrator::migration_loop() {
+ while (!stopped()) {
+ // fetch instance to check
+ InstanceInfoPB instance;
+ {
+ std::unique_lock lock(mtx_);
+ pending_instance_cond_.wait(
+ lock, [&]() -> bool { return
!pending_instance_queue_.empty() || stopped(); });
+ if (stopped()) {
+ return;
+ }
+ instance = std::move(pending_instance_queue_.front());
+ pending_instance_queue_.pop_front();
+ pending_instance_set_.erase(instance.instance_id());
+ }
+ const auto& instance_id = instance.instance_id();
+ {
+ std::lock_guard lock(mtx_);
+ // skip instance in recycling
+ if (migrating_instance_map_.count(instance_id)) continue;
+ }
+
+ auto migrator = std::make_shared<InstanceDataMigrator>(txn_kv_,
instance, migrate_context_);
+ if (migrator->init() != 0) {
+ LOG(WARNING) << "failed to init instance migrator, instance_id="
+ << instance.instance_id();
+ continue;
+ }
+
+ std::string job_key =
job_snapshot_data_migrator_key(instance.instance_id());
+ int ret =
+ prepare_instance_recycle_job(txn_kv_.get(), job_key,
instance.instance_id(),
+ ip_port_,
config::recycle_job_lease_expired_ms * 1000);
+ if (ret != 0) { // Prepare failed
+ continue;
+ } else {
+ std::lock_guard lock(mtx_);
+ migrating_instance_map_.emplace(instance_id, migrator);
+ }
+ if (stopped()) return;
+
+ migrator->do_migrate();
+ {
+ std::lock_guard lock(mtx_);
+ migrating_instance_map_.erase(instance.instance_id());
+ }
+ }
+}
+
+void SnapshotDataMigrator::scan_instance_loop() {
+ std::this_thread::sleep_for(
+
std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds));
+ while (!stopped()) {
+ std::vector<InstanceInfoPB> instances;
+ get_all_instances(txn_kv_.get(), instances);
+ LOG(INFO) << "Snapshot data migrator get instances: " << [&instances] {
+ std::stringstream ss;
+ for (auto& i : instances) ss << ' ' << i.instance_id();
+ return ss.str();
+ }();
+ if (!instances.empty()) {
+ // enqueue instances
+ std::lock_guard lock(mtx_);
+ for (auto& instance : instances) {
+ if (instance.status() == InstanceInfoPB::DELETED) continue;
+ if (!is_instance_need_migrate(instance)) continue;
+ auto [_, success] =
pending_instance_set_.insert(instance.instance_id());
+ // skip instance already in pending queue
+ if (success) {
+ pending_instance_queue_.push_back(std::move(instance));
+ }
+ }
+ pending_instance_cond_.notify_all();
+ }
+ {
+ std::unique_lock lock(mtx_);
+ notifier_.wait_for(lock,
std::chrono::seconds(config::scan_instances_interval_seconds),
+ [&]() { return stopped(); });
+ }
+ }
+}
+
+void SnapshotDataMigrator::lease_migration_jobs() {
+ while (!stopped()) {
+ std::vector<std::string> instances;
+ instances.reserve(migrating_instance_map_.size());
+ {
+ std::lock_guard lock(mtx_);
+ for (auto& [id, _] : migrating_instance_map_) {
+ instances.push_back(id);
+ }
+ }
+ for (auto& i : instances) {
+ std::string job_key = job_snapshot_data_migrator_key(i);
+ int ret = lease_instance_recycle_job(txn_kv_.get(), job_key, i,
ip_port_);
+ if (ret == 1) {
+ std::lock_guard lock(mtx_);
+ if (auto it = migrating_instance_map_.find(i);
+ it != migrating_instance_map_.end()) {
+ it->second->stop();
+ }
+ }
+ }
+ {
+ std::unique_lock lock(mtx_);
+ notifier_.wait_for(lock,
+
std::chrono::milliseconds(config::recycle_job_lease_expired_ms / 3),
+ [&]() { return stopped(); });
+ }
+ }
+}
+
+bool SnapshotDataMigrator::is_instance_need_migrate(const InstanceInfoPB&
instance_info) {
+ // The multi-version feature is enabled, but the snapshot feature is not
ready.
+ return instance_info.multi_version_status() !=
MultiVersionStatus::MULTI_VERSION_DISABLED &&
+ instance_info.snapshot_switch_status() ==
SnapshotSwitchStatus::SNAPSHOT_SWITCH_DISABLED;
+}
+
+InstanceDataMigrator::InstanceDataMigrator(std::shared_ptr<TxnKv> txn_kv,
+ const InstanceInfoPB& instance,
+ SnapshotDataMigrateContext&
migrate_context)
+ : txn_kv_(std::move(txn_kv)),
+ instance_id_(instance.instance_id()),
+ instance_info_(instance),
+ migrate_context_(migrate_context) {}
+
+InstanceDataMigrator::~InstanceDataMigrator() {
+ if (!stopped()) {
+ stop();
+ }
+}
+
+int InstanceDataMigrator::init() {
+ int ret = init_obj_store_accessors();
+ if (ret != 0) {
+ return ret;
+ }
+
+ return init_storage_vault_accessors();
+}
+
+int InstanceDataMigrator::init_obj_store_accessors() {
+ for (const auto& obj_info : instance_info_.obj_info()) {
+#ifdef UNIT_TEST
+ auto accessor = std::make_shared<MockAccessor>();
+#else
+ auto s3_conf = S3Conf::from_obj_store_info(obj_info);
+ if (!s3_conf) {
+ LOG(WARNING) << "failed to init object accessor, instance_id=" <<
instance_id_;
+ return -1;
+ }
+
+ std::shared_ptr<S3Accessor> accessor;
+ int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
+ if (ret != 0) {
+ LOG(WARNING) << "failed to init object accessor. instance_id=" <<
instance_id_
+ << " resource_id=" << obj_info.id();
+ return ret;
+ }
+#endif
+
+ accessor_map_.emplace(obj_info.id(), std::move(accessor));
+ }
+
+ return 0;
+}
+
+int InstanceDataMigrator::init_storage_vault_accessors() {
+ if (instance_info_.resource_ids().empty()) {
+ return 0;
+ }
+
+ FullRangeGetOptions opts(txn_kv_);
+ opts.prefetch = true;
+ auto it = txn_kv_->full_range_get(storage_vault_key({instance_id_, ""}),
+ storage_vault_key({instance_id_,
"\xff"}), std::move(opts));
+
+ for (auto kv = it->next(); kv.has_value(); kv = it->next()) {
+ auto [k, v] = *kv;
+ StorageVaultPB vault;
+ if (!vault.ParseFromArray(v.data(), v.size())) {
+ LOG(WARNING) << "malformed storage vault, unable to deserialize
key=" << hex(k);
+ return -1;
+ }
+
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::init_storage_vault_accessors.mock_vault",
+ &accessor_map_, &vault);
+ if (vault.has_hdfs_info()) {
+#ifdef ENABLE_HDFS_STORAGE_VAULT
+ auto accessor = std::make_shared<HdfsAccessor>(vault.hdfs_info());
+ int ret = accessor->init();
+ if (ret != 0) {
+ LOG(WARNING) << "failed to init hdfs accessor. instance_id="
<< instance_id_
+ << " resource_id=" << vault.id() << " name=" <<
vault.name();
+ return ret;
+ }
+
+ accessor_map_.emplace(vault.id(), std::move(accessor));
+#else
+ LOG(ERROR) << "HDFS is disabled (via the ENABLE_HDFS_STORAGE_VAULT
build option), "
+ << "but HDFS storage vaults were detected";
+#endif
+ } else if (vault.has_obj_info()) {
+#ifdef UNIT_TEST
+ auto accessor = std::make_shared<MockAccessor>();
+#else
+ auto s3_conf = S3Conf::from_obj_store_info(vault.obj_info());
+ if (!s3_conf) {
+ LOG(WARNING) << "failed to init object accessor, instance_id="
<< instance_id_;
+ return -1;
+ }
+
+ std::shared_ptr<S3Accessor> accessor;
+ int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
+ if (ret != 0) {
+ LOG(WARNING) << "failed to init s3 accessor. instance_id=" <<
instance_id_
+ << " resource_id=" << vault.id() << " name=" <<
vault.name();
+ return ret;
+ }
+#endif
+
+ accessor_map_.emplace(vault.id(), std::move(accessor));
+ }
+ }
+
+ if (!it->is_valid()) {
+ LOG_WARNING("failed to get storage vault kv");
+ return -1;
+ }
+ return 0;
+}
+
+int InstanceDataMigrator::do_migrate() {
+ LOG_WARNING("start data migration").tag("instance_id", instance_id_);
+
+ StopWatch stop_watch;
+ DORIS_CLOUD_DEFER {
+ LOG_WARNING("data migration done")
+ .tag("instance_id", instance_id_)
+ .tag("cost(sec)", stop_watch.elapsed_seconds());
+ };
+
+ SnapshotManager snapshot_mgr(txn_kv_);
+ int res = snapshot_mgr.migrate_to_versioned_keys(this);
+ if (res != 0) {
+ LOG_WARNING("failed to migrate snapshot keys").tag("instance_id",
instance_id_);
+ return res;
+ }
+
+ return enable_instance_snapshot_switch();
+}
+
+int InstanceDataMigrator::enable_instance_snapshot_switch() {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to create txn for data
migration").tag("instance_id", instance_id_);
+ return -1;
+ }
+
+ std::string key = instance_key(instance_id_);
+ std::string instance_value;
+ err = txn->get(key, &instance_value);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to get instance info in data migration")
+ .tag("instance_id", instance_id_)
+ .tag("error", err);
+ return -1;
+ }
+
+ InstanceInfoPB instance_info;
+ if (!instance_info.ParseFromString(instance_value)) {
+ LOG_WARNING("failed to parse instance info in data migration")
+ .tag("instance_id", instance_id_);
+ return -1;
+ }
+
+
instance_info.set_snapshot_switch_status(SnapshotSwitchStatus::SNAPSHOT_SWITCH_OFF);
+ txn->put(key, instance_info.SerializeAsString());
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to commit instance info in data migration")
+ .tag("instance_id", instance_id_)
+ .tag("error", err);
+ return -1;
+ }
+
+ LOG_WARNING("finish data migration").tag("instance_id", instance_id_);
+
+ return 0;
+}
+
+} // namespace doris::cloud
diff --git a/cloud/src/recycler/snapshot_data_migrator.h
b/cloud/src/recycler/snapshot_data_migrator.h
new file mode 100644
index 00000000000..ec4cae009f9
--- /dev/null
+++ b/cloud/src/recycler/snapshot_data_migrator.h
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <condition_variable>
+#include <cstdint>
+#include <deque>
+#include <memory>
+#include <string>
+#include <string_view>
+#include <thread>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "meta-service/txn_lazy_committer.h"
+#include "recycler/storage_vault_accessor.h"
+#include "snapshot/snapshot_manager.h"
+
+namespace doris::cloud {
+// class TxnKv;
+// class InstanceInfoPB;
+// class StorageVaultAccessor;
+// class SimpleThreadPool;
+// class Checker;
+// class SnapshotDataMigratorMetricsContext;
+class InstanceDataMigrator;
+
+struct SnapshotDataMigrateContext {
+ std::mutex mutex;
+
+ // The indexes that have been migrated.
+ std::unordered_set<int64_t> migrated_indexes;
+ // The partitions that have been migrated.
+ std::unordered_set<int64_t> migrated_partitions;
+};
+
+class SnapshotDataMigrator {
+public:
+ explicit SnapshotDataMigrator(std::shared_ptr<TxnKv> txn_kv);
+ ~SnapshotDataMigrator();
+
+ // returns 0 for success otherwise error
+ int start();
+
+ void stop();
+
+ bool stopped() const { return stopped_.load(std::memory_order_acquire); }
+
+private:
+ void scan_instance_loop();
+ void migration_loop();
+ void lease_migration_jobs();
+
+ bool is_instance_need_migrate(const InstanceInfoPB& instance_info);
+
+ SnapshotDataMigrateContext migrate_context_;
+
+ std::shared_ptr<TxnKv> txn_kv_;
+ std::atomic_bool stopped_ {false};
+ std::string ip_port_;
+ std::vector<std::thread> workers_;
+
+ std::mutex mtx_;
+ // notify migration workers
+ std::condition_variable pending_instance_cond_;
+ std::deque<InstanceInfoPB> pending_instance_queue_;
+ std::unordered_set<std::string> pending_instance_set_;
+ std::unordered_map<std::string, std::shared_ptr<InstanceDataMigrator>>
migrating_instance_map_;
+ // notify instance scanner and lease thread
+ std::condition_variable notifier_;
+
+ std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
+ std::shared_ptr<SnapshotManager> snapshot_manager_;
+};
+
+class InstanceDataMigrator {
+public:
+ InstanceDataMigrator(std::shared_ptr<TxnKv> txn_kv, const InstanceInfoPB&
instance,
+ SnapshotDataMigrateContext& migrate_context);
+ ~InstanceDataMigrator();
+
+ std::string_view instance_id() const { return instance_id_; }
+ const InstanceInfoPB& instance_info() const { return instance_info_; }
+
+ // returns 0 for success otherwise error
+ int init();
+
+ void stop() { stopped_.store(true, std::memory_order_release); }
+ bool stopped() const { return stopped_.load(std::memory_order_acquire); }
+
+ // returns 0 for success otherwise error
+ int do_migrate();
+
+ SnapshotDataMigrateContext& get_migrate_context() { return
migrate_context_; }
+
+private:
+ // returns 0 for success otherwise error
+ int init_obj_store_accessors();
+
+ // returns 0 for success otherwise error
+ int init_storage_vault_accessors();
+
+ // Enable instance snapshot switch after data migration is done.
+ int enable_instance_snapshot_switch();
+
+ std::atomic_bool stopped_ {false};
+ std::shared_ptr<TxnKv> txn_kv_;
+ std::string instance_id_;
+ InstanceInfoPB instance_info_;
+ SnapshotDataMigrateContext& migrate_context_;
+
+ std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>>
accessor_map_;
+};
+
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/snapshot/snapshot_manager.cpp
b/cloud/src/snapshot/snapshot_manager.cpp
index 2f38e36a575..d18e8dc3676 100644
--- a/cloud/src/snapshot/snapshot_manager.cpp
+++ b/cloud/src/snapshot/snapshot_manager.cpp
@@ -150,4 +150,12 @@ int
SnapshotManager::recycle_snapshot_meta_and_data(std::string_view instance_id
return 0;
}
+int SnapshotManager::migrate_to_versioned_keys(InstanceDataMigrator* migrator)
{
+ return 0;
+}
+
+int SnapshotManager::compact_snapshot_chains(InstanceChainCompactor*
compactor) {
+ return 0;
+}
+
} // namespace doris::cloud
diff --git a/cloud/src/snapshot/snapshot_manager.h
b/cloud/src/snapshot/snapshot_manager.h
index 7413d8ca6bb..a4d4deed0a0 100644
--- a/cloud/src/snapshot/snapshot_manager.h
+++ b/cloud/src/snapshot/snapshot_manager.h
@@ -27,6 +27,8 @@ namespace doris::cloud {
class InstanceRecycler;
class InstanceChecker;
class StorageVaultAccessor;
+class InstanceDataMigrator;
+class InstanceChainCompactor;
// A abstract class for managing cluster snapshots.
class SnapshotManager {
@@ -78,6 +80,14 @@ public:
static bool parse_snapshot_versionstamp(std::string_view snapshot_id,
Versionstamp* versionstamp);
+ // Migrate the single version keys to multi-version keys for the instance.
+ // Return 0 for success otherwise error.
+ virtual int migrate_to_versioned_keys(InstanceDataMigrator* migrator);
+
+ // Compress snapshot chains for the instance.
+ // Return 0 for success otherwise error.
+ virtual int compact_snapshot_chains(InstanceChainCompactor* compactor);
+
private:
SnapshotManager(const SnapshotManager&) = delete;
SnapshotManager& operator=(const SnapshotManager&) = delete;
diff --git a/cloud/test/keys_test.cpp b/cloud/test/keys_test.cpp
index 60fcc2b375b..142627e7fef 100644
--- a/cloud/test/keys_test.cpp
+++ b/cloud/test/keys_test.cpp
@@ -936,6 +936,54 @@ TEST(KeysTest, JobKeysTest) {
EXPECT_EQ("check", dec_job_suffix);
EXPECT_EQ(instance_id, dec_instance_id);
}
+
+ // 0x01 "job" ${instance_id} "snapshot_data_migrator"
-> JobSnapshotDataMigratorPB
+ {
+ JobSnapshotDataMigratorKeyInfo job_key {instance_id};
+ std::string encoded_job_key0;
+ job_snapshot_data_migrator_key(job_key, &encoded_job_key0);
+ std::cout << hex(encoded_job_key0) << std::endl;
+
+ std::string dec_instance_id;
+
+ std::string_view key_sv(encoded_job_key0);
+ std::string dec_job_prefix;
+ std::string dec_job_suffix;
+
+ remove_user_space_prefix(&key_sv);
+ ASSERT_EQ(decode_bytes(&key_sv, &dec_job_prefix), 0);
+ ASSERT_EQ(decode_bytes(&key_sv, &dec_instance_id), 0);
+ ASSERT_EQ(decode_bytes(&key_sv, &dec_job_suffix), 0);
+ ASSERT_TRUE(key_sv.empty());
+
+ EXPECT_EQ("job", dec_job_prefix);
+ EXPECT_EQ("snapshot_data_migrator", dec_job_suffix);
+ EXPECT_EQ(instance_id, dec_instance_id);
+ }
+
+ // 0x01 "job" ${instance_id} "snapshot_chain_compactor"
-> JobSnapshotChainCompactorPB
+ {
+ JobSnapshotChainCompactorKeyInfo job_key {instance_id};
+ std::string encoded_job_key0;
+ job_snapshot_chain_compactor_key(job_key, &encoded_job_key0);
+ std::cout << hex(encoded_job_key0) << std::endl;
+
+ std::string dec_instance_id;
+
+ std::string_view key_sv(encoded_job_key0);
+ std::string dec_job_prefix;
+ std::string dec_job_suffix;
+
+ remove_user_space_prefix(&key_sv);
+ ASSERT_EQ(decode_bytes(&key_sv, &dec_job_prefix), 0);
+ ASSERT_EQ(decode_bytes(&key_sv, &dec_instance_id), 0);
+ ASSERT_EQ(decode_bytes(&key_sv, &dec_job_suffix), 0);
+ ASSERT_TRUE(key_sv.empty());
+
+ EXPECT_EQ("job", dec_job_prefix);
+ EXPECT_EQ("snapshot_chain_compactor", dec_job_suffix);
+ EXPECT_EQ(instance_id, dec_instance_id);
+ }
}
TEST(KeysTest, SystemKeysTest) {
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 7ce5bd13d91..04b55662f18 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -68,6 +68,32 @@ enum SnapshotSwitchStatus {
SNAPSHOT_SWITCH_ON = 2;
}
+enum KeySetType {
+ UNKNOWN_KEY_SET = 0;
+
+ // Single version keys
+ SINGLE_VERSION_PARTITION_VERSION = 1;
+ SINGLE_VERSION_TABLE_VERSION = 2;
+ SINGLE_VERSION_META_ROWSET = 3;
+ SINGLE_VERSION_META_TABLET = 4;
+ SINGLE_VERSION_META_SCHEMA = 5;
+ SINGLE_VERSION_TABLET_STATS = 6;
+ SINGLE_VERSION_TABLET_INDEX = 7;
+
+ // Multi version keys
+ MULTI_VERSION_PARTITION_VERSION = 10;
+ MULTI_VERSION_TABLE_VERSION = 12;
+ MULTI_VERSION_INDEX_PARTITION = 13;
+ MULTI_VERSION_INDEX_INDEX = 14;
+ MULTI_VERSION_INDEX_TABLET = 15;
+ MULTI_VERSION_TABLET_STATS = 16;
+ MULTI_VERSION_META_PARTITION = 17;
+ MULTI_VERSION_META_INDEX = 18;
+ MULTI_VERSION_META_TABLET = 19;
+ MULTI_VERSION_META_SCHEMA = 20;
+ MULTI_VERSION_META_ROWSET = 21;
+}
+
message InstanceInfoPB {
enum Status {
NORMAL = 0;
@@ -108,6 +134,9 @@ message InstanceInfoPB {
// Snapshot properties.
optional int64 max_reserved_snapshot = 117;
optional int64 snapshot_interval_seconds = 118;
+
+ repeated KeySetType migrated_key_sets = 119; // convert single version
keys to multi version keys
+ repeated KeySetType compacted_key_sets = 120; // compact snapshot chain
...
}
message StagePB {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]