This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5e140cf85b3 [feat](snapshot) support snapshot chain compactor (#57801)
5e140cf85b3 is described below
commit 5e140cf85b3cfb3f95d7acfe8314b4fa5e560025
Author: meiyi <[email protected]>
AuthorDate: Sat Nov 8 03:18:39 2025 +0800
[feat](snapshot) support snapshot chain compactor (#57801)
---
cloud/src/common/util.cpp | 10 +-
cloud/src/meta-store/keys.cpp | 155 +++++++++++++++++++++++-
cloud/src/meta-store/keys.h | 26 +++-
cloud/src/recycler/snapshot_chain_compactor.cpp | 141 ++++++++++++++++++++-
cloud/test/document_message_test.cpp | 23 +++-
cloud/test/meta_reader_test.cpp | 22 ++++
6 files changed, 369 insertions(+), 8 deletions(-)
diff --git a/cloud/src/common/util.cpp b/cloud/src/common/util.cpp
index 7c3d8aeeff2..5cc942ea59f 100644
--- a/cloud/src/common/util.cpp
+++ b/cloud/src/common/util.cpp
@@ -208,9 +208,13 @@ std::string prettify_key(std::string_view key_hex, bool
unicode) {
fields_pos.push_back(0);
for (auto& i : fields) {
- fields_str.emplace_back(std::get<1>(i) == EncodingTag::BYTES_TAG
- ? std::get<std::string>(std::get<0>(i))
- :
std::to_string(std::get<int64_t>(std::get<0>(i))));
+ if (std::get<1>(i) == EncodingTag::BYTES_TAG) {
+ fields_str.emplace_back(std::get<std::string>(std::get<0>(i)));
+ } else if (std::get<1>(i) == EncodingTag::VERSIONSTAMP_TAG) {
+ fields_str.emplace_back("versionstamp: " +
std::get<std::string>(std::get<0>(i)));
+ } else {
+
fields_str.emplace_back(std::to_string(std::get<int64_t>(std::get<0>(i))));
+ }
fields_pos.push_back((std::get<2>(i) + 1) * 2);
}
diff --git a/cloud/src/meta-store/keys.cpp b/cloud/src/meta-store/keys.cpp
index aabdf01f042..b87fa12e5e5 100644
--- a/cloud/src/meta-store/keys.cpp
+++ b/cloud/src/meta-store/keys.cpp
@@ -846,6 +846,15 @@ std::string snapshot_reference_key_prefix(std::string_view
instance_id, Versions
return out;
}
+std::string snapshot_reference_key_prefix(std::string_view instance_id) {
+ std::string out;
+ out.push_back(CLOUD_VERSIONED_KEY_SPACE03);
+ encode_bytes(SNAPSHOT_KEY_PREFIX, &out); // "snapshot"
+ encode_bytes(instance_id, &out); // instance_id
+ encode_bytes(SNAPSHOT_REFERENCE_KEY_INFIX, &out); // "reference"
+ return out;
+}
+
//==============================================================================
// Log keys
//==============================================================================
@@ -861,7 +870,8 @@ void log_key(const LogKeyInfo& in, std::string* out) {
// Decode keys
//==============================================================================
int decode_key(std::string_view* in,
- std::vector<std::tuple<std::variant<int64_t, std::string>, int,
int>>* out) {
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int,
int>>* out,
+ Versionstamp* timestamp) {
int pos = 0;
int last_len = static_cast<int>(in->size());
while (!in->empty()) {
@@ -878,6 +888,16 @@ int decode_key(std::string_view* in,
ret = decode_int64(in, &v);
if (ret != 0) return ret;
out->emplace_back(v, tag, pos);
+ } else if (tag == EncodingTag::VERSIONSTAMP_TAG) {
+ Versionstamp vs;
+ ret = decode_versionstamp(in, &vs);
+ if (ret != 0) return ret;
+ if (timestamp) {
+ *timestamp = vs;
+ }
+ ret = decode_tailing_versionstamp_end(in);
+ if (ret != 0) return ret;
+ out->emplace_back(vs.to_string(), tag, pos);
} else {
return -1;
}
@@ -915,6 +935,33 @@ std::vector<std::string>
get_single_version_meta_key_prefixs() {
}
namespace versioned {
+
+bool decode_table_version_key(std::string_view* in, int64_t* table_id,
Versionstamp* timestamp) {
+ // 0x03 "version" ${instance_id} "table" ${table_id} ${timestamp}
+ if (in->empty() || static_cast<uint8_t>((*in)[0]) !=
CLOUD_VERSIONED_KEY_SPACE03) {
+ return false;
+ }
+ in->remove_prefix(1);
+
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+ auto res = decode_key(in, &out, timestamp);
+ if (res != 0 || out.size() != 5) {
+ return false;
+ }
+
+ try {
+ if (std::get<std::string>(std::get<0>(out[0])) != VERSION_KEY_PREFIX ||
+ std::get<std::string>(std::get<0>(out[2])) !=
TABLE_VERSION_KEY_INFIX) {
+ return false;
+ }
+ *table_id = std::get<int64_t>(std::get<0>(out[3]));
+ } catch (const std::bad_variant_access& e) {
+ return false;
+ }
+
+ return true;
+}
+
bool decode_partition_inverted_index_key(std::string_view* in, int64_t* db_id,
int64_t* table_id,
int64_t* partition_id) {
if (in->empty() || static_cast<uint8_t>((*in)[0]) !=
CLOUD_VERSIONED_KEY_SPACE03) {
@@ -943,6 +990,112 @@ bool
decode_partition_inverted_index_key(std::string_view* in, int64_t* db_id, i
return true;
}
+bool decode_meta_partition_key(std::string_view* in, int64_t* partition_id,
+ Versionstamp* timestamp) {
+ // 0x03 "meta" ${instance_id} "partition" ${partition_id} ${timestamp}
+ if (in->empty() || static_cast<uint8_t>((*in)[0]) !=
CLOUD_VERSIONED_KEY_SPACE03) {
+ return false;
+ }
+ in->remove_prefix(1);
+
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+ auto res = decode_key(in, &out, timestamp);
+ if (res != 0 || out.size() != 5) {
+ return false;
+ }
+
+ try {
+ if (std::get<std::string>(std::get<0>(out[0])) != META_KEY_PREFIX ||
+ std::get<std::string>(std::get<0>(out[2])) !=
META_PARTITION_KEY_INFIX) {
+ return false;
+ }
+ *partition_id = std::get<int64_t>(std::get<0>(out[3]));
+ } catch (const std::bad_variant_access& e) {
+ return false;
+ }
+
+ return true;
+}
+
+bool decode_meta_index_key(std::string_view* in, int64_t* index_id,
Versionstamp* timestamp) {
+ // 0x03 "meta" ${instance_id} "index" ${index_id} ${timestamp}
+ if (in->empty() || static_cast<uint8_t>((*in)[0]) !=
CLOUD_VERSIONED_KEY_SPACE03) {
+ return false;
+ }
+ in->remove_prefix(1);
+
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+ auto res = decode_key(in, &out, timestamp);
+ if (res != 0 || out.size() != 5) {
+ return false;
+ }
+
+ try {
+ if (std::get<std::string>(std::get<0>(out[0])) != META_KEY_PREFIX ||
+ std::get<std::string>(std::get<0>(out[2])) !=
META_INDEX_KEY_INFIX) {
+ return false;
+ }
+ *index_id = std::get<int64_t>(std::get<0>(out[3]));
+ } catch (const std::bad_variant_access& e) {
+ return false;
+ }
+
+ return true;
+}
+
+bool decode_meta_schema_key(std::string_view* in, int64_t* index_id, int64_t*
schema_version) {
+ // 0x03 "meta" ${instance_id} "schema" ${index_id} ${schema_version}
+ if (in->empty() || static_cast<uint8_t>((*in)[0]) !=
CLOUD_VERSIONED_KEY_SPACE03) {
+ return false;
+ }
+ in->remove_prefix(1);
+
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+ auto res = decode_key(in, &out);
+ if (res != 0 || out.size() < 5) {
+ return false;
+ }
+
+ try {
+ if (std::get<std::string>(std::get<0>(out[0])) != META_KEY_PREFIX ||
+ std::get<std::string>(std::get<0>(out[2])) !=
META_KEY_INFIX_SCHEMA) {
+ return false;
+ }
+ *index_id = std::get<int64_t>(std::get<0>(out[3]));
+ *schema_version = std::get<int64_t>(std::get<0>(out[4]));
+ } catch (const std::bad_variant_access& e) {
+ return false;
+ }
+
+ return true;
+}
+
+bool decode_meta_tablet_key(std::string_view* in, int64_t* tablet_id,
Versionstamp* timestamp) {
+ // 0x03 "meta" ${instance_id} "tablet" ${tablet_id} ${timestamp}
+ if (in->empty() || static_cast<uint8_t>((*in)[0]) !=
CLOUD_VERSIONED_KEY_SPACE03) {
+ return false;
+ }
+ in->remove_prefix(1);
+
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+ auto res = decode_key(in, &out, timestamp);
+ if (res != 0 || out.size() < 5) {
+ return false;
+ }
+
+ try {
+ if (std::get<std::string>(std::get<0>(out[0])) != META_KEY_PREFIX ||
+ std::get<std::string>(std::get<0>(out[2])) !=
META_KEY_INFIX_TABLET) {
+ return false;
+ }
+ *tablet_id = std::get<int64_t>(std::get<0>(out[3]));
+ } catch (const std::bad_variant_access& e) {
+ return false;
+ }
+
+ return true;
+}
+
bool decode_snapshot_ref_key(std::string_view* in, std::string* instance_id,
Versionstamp* timestamp, std::string*
ref_instance_id) {
// Key format: 0x03 + encode_bytes("snapshot") + encode_bytes(instance_id)
+
diff --git a/cloud/src/meta-store/keys.h b/cloud/src/meta-store/keys.h
index 93609a92b14..fb45dcd945c 100644
--- a/cloud/src/meta-store/keys.h
+++ b/cloud/src/meta-store/keys.h
@@ -525,6 +525,7 @@ static inline std::string snapshot_full_key(const
SnapshotFullKeyInfo& in) { std
void snapshot_reference_key(const SnapshotReferenceKeyInfo& in, std::string*
out);
static inline std::string snapshot_reference_key(const
SnapshotReferenceKeyInfo& in) { std::string s; snapshot_reference_key(in, &s);
return s; }
std::string snapshot_reference_key_prefix(std::string_view instance_id,
Versionstamp timestamp);
+std::string snapshot_reference_key_prefix(std::string_view instance_id);
void log_key(const LogKeyInfo& in, std::string* out);
static inline std::string log_key(const LogKeyInfo& in) { std::string s;
log_key(in, &s); return s; }
@@ -539,10 +540,12 @@ static inline std::string log_key(const LogKeyInfo& in) {
std::string s; log_key
*
* @param in input byte stream, successfully decoded part will be consumed
* @param out the vector of each <field decoded, field type and its position>
in the input stream
+ * @param timestamp the timestamp of a versioned key
* @return 0 for successful decoding of the entire input, otherwise error.
*/
int decode_key(std::string_view* in,
- std::vector<std::tuple<std::variant<int64_t, std::string>, int,
int>>* out);
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int,
int>>* out,
+ Versionstamp* timestamp = nullptr);
/**
* Return the list of single version meta key prefixs.
@@ -551,11 +554,32 @@ std::vector<std::string>
get_single_version_meta_key_prefixs();
namespace versioned {
+// Decode table version key
+// Return true if decode successfully, otherwise false
+bool decode_table_version_key(std::string_view* in, int64_t* table_id,
Versionstamp* timestamp);
+
// Decode partition inverted index key
// Return true if decode successfully, otherwise false
bool decode_partition_inverted_index_key(std::string_view* in, int64_t* db_id,
int64_t* table_id,
int64_t* partition_id);
+// Decode meta partition key
+// Return true if decode successfully, otherwise false
+bool decode_meta_partition_key(std::string_view* in, int64_t* partition_id,
+ Versionstamp* timestamp);
+
+// Decode meta index key
+// Return true if decode successfully, otherwise false
+bool decode_meta_index_key(std::string_view* in, int64_t* index_id,
Versionstamp* timestamp);
+
+// Decode meta schema key
+// Return true if decode successfully, otherwise false
+bool decode_meta_schema_key(std::string_view* in, int64_t* index_id, int64_t*
schema_version);
+
+// Decode meta tablet key
+// Return true if decode successfully, otherwise false
+bool decode_meta_tablet_key(std::string_view* in, int64_t* tablet_id,
Versionstamp* timestamp);
+
// Decode snapshot reference key
// Return true if decode successfully, otherwise false
bool decode_snapshot_ref_key(std::string_view* in, std::string* instance_id,
diff --git a/cloud/src/recycler/snapshot_chain_compactor.cpp
b/cloud/src/recycler/snapshot_chain_compactor.cpp
index 2809445b3c5..4332d067df8 100644
--- a/cloud/src/recycler/snapshot_chain_compactor.cpp
+++ b/cloud/src/recycler/snapshot_chain_compactor.cpp
@@ -180,8 +180,86 @@ void SnapshotChainCompactor::lease_compaction_jobs() {
}
}
+bool is_instance_cloned_from_snapshot(const InstanceInfoPB& instance_info) {
+ return instance_info.has_source_instance_id() &&
!instance_info.source_instance_id().empty() &&
+ instance_info.has_source_snapshot_id() &&
!instance_info.source_snapshot_id().empty();
+}
+
+int get_instance_info(TxnKv* txn_kv, const std::string& instance_id,
+ InstanceInfoPB& instance_info) {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to create txn";
+ return -1;
+ }
+
+ std::string key = instance_key({instance_id});
+ std::string val;
+ err = txn->get(key, &val);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to get instance, instance_id=" << instance_id
<< " err=" << err;
+ return -1;
+ }
+ if (!instance_info.ParseFromString(val)) {
+ LOG(WARNING) << "failed to parse InstanceInfoPB, instance_id=" <<
instance_id;
+ return -2;
+ }
+ return 0;
+}
+
+int is_instance_cloned(TxnKv* txn_kv, const std::string& instance_id, bool*
is_cloned) {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to create txn";
+ return -1;
+ }
+
+ std::string snapshot_ref_key_start =
versioned::snapshot_reference_key_prefix(instance_id);
+ std::string snapshot_ref_key_end = snapshot_ref_key_start + '\xFF';
+ std::unique_ptr<RangeGetIterator> it;
+ err = txn->get(snapshot_ref_key_start, snapshot_ref_key_end, &it, false,
1);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to get snapshot reference key. instance_id="
<< instance_id
+ << ", err=" << err;
+ return -1;
+ }
+ *is_cloned = it->has_next();
+ return 0;
+}
+
bool SnapshotChainCompactor::is_snapshot_chain_need_compact(const
InstanceInfoPB& instance_info) {
- // TODO:
+ // compact the instance which meets the following conditions:
+ // 1. the instance is cloned from snapshot
+ // 2. its source instance is not cloned from other snapshots
+ // 3. the instance is cloned by other instances.
+ //
+ // for example, if the clone chain is [instance1 -> instance2 ->
instance3], compact the instance2
+ if (!is_instance_cloned_from_snapshot(instance_info)) {
+ return false;
+ }
+
+ InstanceInfoPB source_instance_info;
+ if (get_instance_info(txn_kv_.get(), instance_info.source_instance_id(),
+ source_instance_info) != 0) {
+ LOG(WARNING) << "failed to get source instance info, instance_id="
+ << instance_info.source_instance_id();
+ return false;
+ }
+ if (is_instance_cloned_from_snapshot(source_instance_info)) {
+ return false;
+ }
+
+ bool is_cloned = false;
+ if (is_instance_cloned(txn_kv_.get(), instance_info.instance_id(),
&is_cloned) != 0) {
+ LOG(WARNING) << "failed to check is instance cloned, instance_id="
+ << instance_info.instance_id();
+ return false;
+ }
+ if (is_cloned) {
+ return true;
+ }
return false;
}
@@ -317,7 +395,66 @@ int InstanceChainCompactor::do_compact() {
}
int InstanceChainCompactor::handle_compaction_completion() {
- // TODO:
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to create txn in chain
compaction").tag("instance_id", instance_id_);
+ return -1;
+ }
+
+ std::string key = instance_key(instance_id_);
+ std::string instance_value;
+ err = txn->get(key, &instance_value);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to get instance info in chain compaction")
+ .tag("instance_id", instance_id_)
+ .tag("error", err);
+ return -1;
+ }
+
+ InstanceInfoPB instance_info;
+ if (!instance_info.ParseFromString(instance_value)) {
+ LOG_WARNING("failed to parse instance info in chain compaction")
+ .tag("instance_id", instance_id_);
+ return -1;
+ }
+
+ Versionstamp snapshot_versionstamp;
+ if
(!SnapshotManager::parse_snapshot_versionstamp(instance_info.source_snapshot_id(),
+ &snapshot_versionstamp))
{
+ LOG_WARNING("failed to parse snapshot_id to versionstamp in chain
compaction")
+ .tag("instance_id", instance_id_)
+ .tag("source_instance_id", instance_info.source_instance_id())
+ .tag("snapshot_id", instance_info.source_snapshot_id());
+ return -1;
+ }
+ auto source_instance_id = instance_info.source_instance_id();
+ auto snapshot_id = instance_info.source_snapshot_id();
+ versioned::SnapshotReferenceKeyInfo ref_key_info {source_instance_id,
snapshot_versionstamp,
+ instance_id_};
+ std::string reference_key =
versioned::snapshot_reference_key(ref_key_info);
+ txn->remove(reference_key);
+
+ // instance_info.clear_source_instance_id();
+ instance_info.clear_source_snapshot_id();
+ instance_info.clear_compacted_key_sets();
+ txn->atomic_add(system_meta_service_instance_update_key(), 1);
+ txn->put(key, instance_info.SerializeAsString());
+
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to commit instance info in chain compaction")
+ .tag("instance_id", instance_id_)
+ .tag("source_instance_id", source_instance_id)
+ .tag("snapshot_id", snapshot_id)
+ .tag("error", err);
+ return -1;
+ }
+
+ LOG_INFO("finish chain compaction")
+ .tag("instance_id", instance_id_)
+ .tag("source_instance_id", source_instance_id)
+ .tag("snapshot_id", snapshot_id);
return 0;
}
diff --git a/cloud/test/document_message_test.cpp
b/cloud/test/document_message_test.cpp
index d78309179cc..47a70ad9748 100644
--- a/cloud/test/document_message_test.cpp
+++ b/cloud/test/document_message_test.cpp
@@ -372,7 +372,7 @@ TEST(DocumentMessageTest, DocumentPutTabletSchemaCloudPB) {
column->set_is_nullable(false);
}
- std::string schema_key = "tablet_schema_key_test";
+ std::string schema_key = versioned::meta_schema_key({"instance_id", 10,
0});
{
// create a txn and put tablet schema
doris::TabletSchemaCloudPB schema_copy(schema);
@@ -406,6 +406,27 @@ TEST(DocumentMessageTest, DocumentPutTabletSchemaCloudPB) {
ASSERT_GE(txn_kv->total_kvs(), 2) << dump_range(txn_kv.get());
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ std::string begin_key = versioned::meta_schema_key({"instance_id", 10,
0});
+ std::string end_key = versioned::meta_schema_key({"instance_id", 10,
INT64_MAX});
+ FullRangeGetOptions options;
+ std::unique_ptr<FullRangeGetIterator> iter =
+ txn->full_range_get(begin_key, end_key, std::move(options));
+ for (auto kvp = iter->next(); kvp.has_value(); kvp = iter->next()) {
+ auto&& [key, _] = *kvp;
+ int64_t decode_index_id = -1;
+ int64_t schema_version = -1;
+ std::string_view key_view(key);
+ ASSERT_TRUE(versioned::decode_meta_schema_key(&key_view,
&decode_index_id,
+ &schema_version));
+ ASSERT_EQ(10, decode_index_id);
+ ASSERT_EQ(0, schema_version);
+ }
+ }
+
{
// create a txn and remove tablet schema
std::unique_ptr<Transaction> txn;
diff --git a/cloud/test/meta_reader_test.cpp b/cloud/test/meta_reader_test.cpp
index 9e818c1f258..3b7076a9008 100644
--- a/cloud/test/meta_reader_test.cpp
+++ b/cloud/test/meta_reader_test.cpp
@@ -130,6 +130,28 @@ TEST(MetaReaderTest, GetTableVersion) {
}
ASSERT_LT(version1, version2);
+
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string begin_key = versioned::table_version_key({instance_id,
table_id});
+ std::string end_key = versioned::table_version_key({instance_id,
table_id + 1});
+ FullRangeGetOptions options;
+ std::unique_ptr<FullRangeGetIterator> iter =
+ txn->full_range_get(begin_key, end_key, std::move(options));
+ int num = 1;
+ for (auto kvp = iter->next(); kvp.has_value(); kvp = iter->next()) {
+ auto&& [key, _] = *kvp;
+ int64_t decoded_id;
+ Versionstamp decoded_version;
+ std::string_view key_view(key);
+ ASSERT_TRUE(
+ versioned::decode_table_version_key(&key_view,
&decoded_id, &decoded_version));
+ ASSERT_EQ(decoded_id, table_id);
+ ASSERT_EQ(decoded_version, num == 1 ? version1 : version2);
+ num++;
+ }
+ }
}
TEST(MetaReaderTest, BatchGetTableVersion) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]