This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e140cf85b3 [feat](snapshot) support snapshot chain compactor (#57801)
5e140cf85b3 is described below

commit 5e140cf85b3cfb3f95d7acfe8314b4fa5e560025
Author: meiyi <[email protected]>
AuthorDate: Sat Nov 8 03:18:39 2025 +0800

    [feat](snapshot) support snapshot chain compactor (#57801)
---
 cloud/src/common/util.cpp                       |  10 +-
 cloud/src/meta-store/keys.cpp                   | 155 +++++++++++++++++++++++-
 cloud/src/meta-store/keys.h                     |  26 +++-
 cloud/src/recycler/snapshot_chain_compactor.cpp | 141 ++++++++++++++++++++-
 cloud/test/document_message_test.cpp            |  23 +++-
 cloud/test/meta_reader_test.cpp                 |  22 ++++
 6 files changed, 369 insertions(+), 8 deletions(-)

diff --git a/cloud/src/common/util.cpp b/cloud/src/common/util.cpp
index 7c3d8aeeff2..5cc942ea59f 100644
--- a/cloud/src/common/util.cpp
+++ b/cloud/src/common/util.cpp
@@ -208,9 +208,13 @@ std::string prettify_key(std::string_view key_hex, bool 
unicode) {
     fields_pos.push_back(0);
 
     for (auto& i : fields) {
-        fields_str.emplace_back(std::get<1>(i) == EncodingTag::BYTES_TAG
-                                        ? std::get<std::string>(std::get<0>(i))
-                                        : 
std::to_string(std::get<int64_t>(std::get<0>(i))));
+        if (std::get<1>(i) == EncodingTag::BYTES_TAG) {
+            fields_str.emplace_back(std::get<std::string>(std::get<0>(i)));
+        } else if (std::get<1>(i) == EncodingTag::VERSIONSTAMP_TAG) {
+            fields_str.emplace_back("versionstamp: " + 
std::get<std::string>(std::get<0>(i)));
+        } else {
+            
fields_str.emplace_back(std::to_string(std::get<int64_t>(std::get<0>(i))));
+        }
         fields_pos.push_back((std::get<2>(i) + 1) * 2);
     }
 
diff --git a/cloud/src/meta-store/keys.cpp b/cloud/src/meta-store/keys.cpp
index aabdf01f042..b87fa12e5e5 100644
--- a/cloud/src/meta-store/keys.cpp
+++ b/cloud/src/meta-store/keys.cpp
@@ -846,6 +846,15 @@ std::string snapshot_reference_key_prefix(std::string_view 
instance_id, Versions
     return out;
 }
 
+std::string snapshot_reference_key_prefix(std::string_view instance_id) {
+    std::string out;
+    out.push_back(CLOUD_VERSIONED_KEY_SPACE03);
+    encode_bytes(SNAPSHOT_KEY_PREFIX, &out);          // "snapshot"
+    encode_bytes(instance_id, &out);                  // instance_id
+    encode_bytes(SNAPSHOT_REFERENCE_KEY_INFIX, &out); // "reference"
+    return out;
+}
+
 
//==============================================================================
 // Log keys
 
//==============================================================================
@@ -861,7 +870,8 @@ void log_key(const LogKeyInfo& in, std::string* out) {
 // Decode keys
 
//==============================================================================
 int decode_key(std::string_view* in,
-               std::vector<std::tuple<std::variant<int64_t, std::string>, int, 
int>>* out) {
+               std::vector<std::tuple<std::variant<int64_t, std::string>, int, 
int>>* out,
+               Versionstamp* timestamp) {
     int pos = 0;
     int last_len = static_cast<int>(in->size());
     while (!in->empty()) {
@@ -878,6 +888,16 @@ int decode_key(std::string_view* in,
             ret = decode_int64(in, &v);
             if (ret != 0) return ret;
             out->emplace_back(v, tag, pos);
+        } else if (tag == EncodingTag::VERSIONSTAMP_TAG) {
+            Versionstamp vs;
+            ret = decode_versionstamp(in, &vs);
+            if (ret != 0) return ret;
+            if (timestamp) {
+                *timestamp = vs;
+            }
+            ret = decode_tailing_versionstamp_end(in);
+            if (ret != 0) return ret;
+            out->emplace_back(vs.to_string(), tag, pos);
         } else {
             return -1;
         }
@@ -915,6 +935,33 @@ std::vector<std::string> 
get_single_version_meta_key_prefixs() {
 }
 
 namespace versioned {
+
+bool decode_table_version_key(std::string_view* in, int64_t* table_id, 
Versionstamp* timestamp) {
+    // 0x03 "version" ${instance_id} "table" ${table_id} ${timestamp}
+    if (in->empty() || static_cast<uint8_t>((*in)[0]) != 
CLOUD_VERSIONED_KEY_SPACE03) {
+        return false;
+    }
+    in->remove_prefix(1);
+
+    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+    auto res = decode_key(in, &out, timestamp);
+    if (res != 0 || out.size() != 5) {
+        return false;
+    }
+
+    try {
+        if (std::get<std::string>(std::get<0>(out[0])) != VERSION_KEY_PREFIX ||
+            std::get<std::string>(std::get<0>(out[2])) != 
TABLE_VERSION_KEY_INFIX) {
+            return false;
+        }
+        *table_id = std::get<int64_t>(std::get<0>(out[3]));
+    } catch (const std::bad_variant_access& e) {
+        return false;
+    }
+
+    return true;
+}
+
 bool decode_partition_inverted_index_key(std::string_view* in, int64_t* db_id, 
int64_t* table_id,
                                          int64_t* partition_id) {
     if (in->empty() || static_cast<uint8_t>((*in)[0]) != 
CLOUD_VERSIONED_KEY_SPACE03) {
@@ -943,6 +990,112 @@ bool 
decode_partition_inverted_index_key(std::string_view* in, int64_t* db_id, i
     return true;
 }
 
+bool decode_meta_partition_key(std::string_view* in, int64_t* partition_id,
+                               Versionstamp* timestamp) {
+    // 0x03 "meta" ${instance_id} "partition" ${partition_id} ${timestamp}
+    if (in->empty() || static_cast<uint8_t>((*in)[0]) != 
CLOUD_VERSIONED_KEY_SPACE03) {
+        return false;
+    }
+    in->remove_prefix(1);
+
+    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+    auto res = decode_key(in, &out, timestamp);
+    if (res != 0 || out.size() != 5) {
+        return false;
+    }
+
+    try {
+        if (std::get<std::string>(std::get<0>(out[0])) != META_KEY_PREFIX ||
+            std::get<std::string>(std::get<0>(out[2])) != 
META_PARTITION_KEY_INFIX) {
+            return false;
+        }
+        *partition_id = std::get<int64_t>(std::get<0>(out[3]));
+    } catch (const std::bad_variant_access& e) {
+        return false;
+    }
+
+    return true;
+}
+
+bool decode_meta_index_key(std::string_view* in, int64_t* index_id, 
Versionstamp* timestamp) {
+    // 0x03 "meta" ${instance_id} "index" ${index_id} ${timestamp}
+    if (in->empty() || static_cast<uint8_t>((*in)[0]) != 
CLOUD_VERSIONED_KEY_SPACE03) {
+        return false;
+    }
+    in->remove_prefix(1);
+
+    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+    auto res = decode_key(in, &out, timestamp);
+    if (res != 0 || out.size() != 5) {
+        return false;
+    }
+
+    try {
+        if (std::get<std::string>(std::get<0>(out[0])) != META_KEY_PREFIX ||
+            std::get<std::string>(std::get<0>(out[2])) != 
META_INDEX_KEY_INFIX) {
+            return false;
+        }
+        *index_id = std::get<int64_t>(std::get<0>(out[3]));
+    } catch (const std::bad_variant_access& e) {
+        return false;
+    }
+
+    return true;
+}
+
+bool decode_meta_schema_key(std::string_view* in, int64_t* index_id, int64_t* 
schema_version) {
+    // 0x03 "meta" ${instance_id} "schema" ${index_id} ${schema_version}
+    if (in->empty() || static_cast<uint8_t>((*in)[0]) != 
CLOUD_VERSIONED_KEY_SPACE03) {
+        return false;
+    }
+    in->remove_prefix(1);
+
+    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+    auto res = decode_key(in, &out);
+    if (res != 0 || out.size() < 5) {
+        return false;
+    }
+
+    try {
+        if (std::get<std::string>(std::get<0>(out[0])) != META_KEY_PREFIX ||
+            std::get<std::string>(std::get<0>(out[2])) != 
META_KEY_INFIX_SCHEMA) {
+            return false;
+        }
+        *index_id = std::get<int64_t>(std::get<0>(out[3]));
+        *schema_version = std::get<int64_t>(std::get<0>(out[4]));
+    } catch (const std::bad_variant_access& e) {
+        return false;
+    }
+
+    return true;
+}
+
+bool decode_meta_tablet_key(std::string_view* in, int64_t* tablet_id, 
Versionstamp* timestamp) {
+    // 0x03 "meta" ${instance_id} "tablet" ${tablet_id} ${timestamp}
+    if (in->empty() || static_cast<uint8_t>((*in)[0]) != 
CLOUD_VERSIONED_KEY_SPACE03) {
+        return false;
+    }
+    in->remove_prefix(1);
+
+    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+    auto res = decode_key(in, &out, timestamp);
+    if (res != 0 || out.size() < 5) {
+        return false;
+    }
+
+    try {
+        if (std::get<std::string>(std::get<0>(out[0])) != META_KEY_PREFIX ||
+            std::get<std::string>(std::get<0>(out[2])) != 
META_KEY_INFIX_TABLET) {
+            return false;
+        }
+        *tablet_id = std::get<int64_t>(std::get<0>(out[3]));
+    } catch (const std::bad_variant_access& e) {
+        return false;
+    }
+
+    return true;
+}
+
 bool decode_snapshot_ref_key(std::string_view* in, std::string* instance_id,
                              Versionstamp* timestamp, std::string* 
ref_instance_id) {
     // Key format: 0x03 + encode_bytes("snapshot") + encode_bytes(instance_id) 
+
diff --git a/cloud/src/meta-store/keys.h b/cloud/src/meta-store/keys.h
index 93609a92b14..fb45dcd945c 100644
--- a/cloud/src/meta-store/keys.h
+++ b/cloud/src/meta-store/keys.h
@@ -525,6 +525,7 @@ static inline std::string snapshot_full_key(const 
SnapshotFullKeyInfo& in) { std
 void snapshot_reference_key(const SnapshotReferenceKeyInfo& in, std::string* 
out);
 static inline std::string snapshot_reference_key(const 
SnapshotReferenceKeyInfo& in) { std::string s; snapshot_reference_key(in, &s); 
return s; }
 std::string snapshot_reference_key_prefix(std::string_view instance_id, 
Versionstamp timestamp);
+std::string snapshot_reference_key_prefix(std::string_view instance_id);
 
 void log_key(const LogKeyInfo& in, std::string* out);
 static inline std::string log_key(const LogKeyInfo& in) { std::string s; 
log_key(in, &s); return s; }
@@ -539,10 +540,12 @@ static inline std::string log_key(const LogKeyInfo& in) { 
std::string s; log_key
  *
  * @param in input byte stream, successfully decoded part will be consumed
  * @param out the vector of each <field decoded, field type and its position> 
in the input stream
+ * @param timestamp the timestamp of a versioned key
  * @return 0 for successful decoding of the entire input, otherwise error.
  */
 int decode_key(std::string_view* in,
-               std::vector<std::tuple<std::variant<int64_t, std::string>, int, 
int>>* out);
+               std::vector<std::tuple<std::variant<int64_t, std::string>, int, 
int>>* out,
+               Versionstamp* timestamp = nullptr);
 
 /**
  * Return the list of single version meta key prefixs.
@@ -551,11 +554,32 @@ std::vector<std::string> 
get_single_version_meta_key_prefixs();
 
 namespace versioned {
 
+// Decode table version key
+// Return true if decode successfully, otherwise false
+bool decode_table_version_key(std::string_view* in, int64_t* table_id, 
Versionstamp* timestamp);
+
 // Decode partition inverted index key
 // Return true if decode successfully, otherwise false
 bool decode_partition_inverted_index_key(std::string_view* in, int64_t* db_id, 
int64_t* table_id,
                                          int64_t* partition_id);
 
+// Decode meta partition key
+// Return true if decode successfully, otherwise false
+bool decode_meta_partition_key(std::string_view* in, int64_t* partition_id,
+                               Versionstamp* timestamp);
+
+// Decode meta index key
+// Return true if decode successfully, otherwise false
+bool decode_meta_index_key(std::string_view* in, int64_t* index_id, 
Versionstamp* timestamp);
+
+// Decode meta schema key
+// Return true if decode successfully, otherwise false
+bool decode_meta_schema_key(std::string_view* in, int64_t* index_id, int64_t* 
schema_version);
+
+// Decode meta tablet key
+// Return true if decode successfully, otherwise false
+bool decode_meta_tablet_key(std::string_view* in, int64_t* tablet_id, 
Versionstamp* timestamp);
+
 // Decode snapshot reference key
 // Return true if decode successfully, otherwise false
 bool decode_snapshot_ref_key(std::string_view* in, std::string* instance_id,
diff --git a/cloud/src/recycler/snapshot_chain_compactor.cpp 
b/cloud/src/recycler/snapshot_chain_compactor.cpp
index 2809445b3c5..4332d067df8 100644
--- a/cloud/src/recycler/snapshot_chain_compactor.cpp
+++ b/cloud/src/recycler/snapshot_chain_compactor.cpp
@@ -180,8 +180,86 @@ void SnapshotChainCompactor::lease_compaction_jobs() {
     }
 }
 
+bool is_instance_cloned_from_snapshot(const InstanceInfoPB& instance_info) {
+    return instance_info.has_source_instance_id() && 
!instance_info.source_instance_id().empty() &&
+           instance_info.has_source_snapshot_id() && 
!instance_info.source_snapshot_id().empty();
+}
+
+int get_instance_info(TxnKv* txn_kv, const std::string& instance_id,
+                      InstanceInfoPB& instance_info) {
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG(WARNING) << "failed to create txn";
+        return -1;
+    }
+
+    std::string key = instance_key({instance_id});
+    std::string val;
+    err = txn->get(key, &val);
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG(WARNING) << "failed to get instance, instance_id=" << instance_id 
<< " err=" << err;
+        return -1;
+    }
+    if (!instance_info.ParseFromString(val)) {
+        LOG(WARNING) << "failed to parse InstanceInfoPB, instance_id=" << 
instance_id;
+        return -2;
+    }
+    return 0;
+}
+
+int is_instance_cloned(TxnKv* txn_kv, const std::string& instance_id, bool* 
is_cloned) {
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG(WARNING) << "failed to create txn";
+        return -1;
+    }
+
+    std::string snapshot_ref_key_start = 
versioned::snapshot_reference_key_prefix(instance_id);
+    std::string snapshot_ref_key_end = snapshot_ref_key_start + '\xFF';
+    std::unique_ptr<RangeGetIterator> it;
+    err = txn->get(snapshot_ref_key_start, snapshot_ref_key_end, &it, false, 
1);
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG(WARNING) << "failed to get snapshot reference key. instance_id=" 
<< instance_id
+                     << ", err=" << err;
+        return -1;
+    }
+    *is_cloned = it->has_next();
+    return 0;
+}
+
 bool SnapshotChainCompactor::is_snapshot_chain_need_compact(const 
InstanceInfoPB& instance_info) {
-    // TODO:
+    // compact the instance which meets the following conditions:
+    // 1. the instance is cloned from snapshot
+    // 2. its source instance is not cloned from other snapshots
+    // 3. the instance is cloned by other instances.
+    //
+    // for example, if the clone chain is [instance1 -> instance2 -> 
instance3], compact the instance2
+    if (!is_instance_cloned_from_snapshot(instance_info)) {
+        return false;
+    }
+
+    InstanceInfoPB source_instance_info;
+    if (get_instance_info(txn_kv_.get(), instance_info.source_instance_id(),
+                          source_instance_info) != 0) {
+        LOG(WARNING) << "failed to get source instance info, instance_id="
+                     << instance_info.source_instance_id();
+        return false;
+    }
+    if (is_instance_cloned_from_snapshot(source_instance_info)) {
+        return false;
+    }
+
+    bool is_cloned = false;
+    if (is_instance_cloned(txn_kv_.get(), instance_info.instance_id(), 
&is_cloned) != 0) {
+        LOG(WARNING) << "failed to check is instance cloned, instance_id="
+                     << instance_info.instance_id();
+        return false;
+    }
+    if (is_cloned) {
+        return true;
+    }
     return false;
 }
 
@@ -317,7 +395,66 @@ int InstanceChainCompactor::do_compact() {
 }
 
 int InstanceChainCompactor::handle_compaction_completion() {
-    // TODO:
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG_WARNING("failed to create txn in chain 
compaction").tag("instance_id", instance_id_);
+        return -1;
+    }
+
+    std::string key = instance_key(instance_id_);
+    std::string instance_value;
+    err = txn->get(key, &instance_value);
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG_WARNING("failed to get instance info in chain compaction")
+                .tag("instance_id", instance_id_)
+                .tag("error", err);
+        return -1;
+    }
+
+    InstanceInfoPB instance_info;
+    if (!instance_info.ParseFromString(instance_value)) {
+        LOG_WARNING("failed to parse instance info in chain compaction")
+                .tag("instance_id", instance_id_);
+        return -1;
+    }
+
+    Versionstamp snapshot_versionstamp;
+    if 
(!SnapshotManager::parse_snapshot_versionstamp(instance_info.source_snapshot_id(),
+                                                      &snapshot_versionstamp)) 
{
+        LOG_WARNING("failed to parse snapshot_id to versionstamp in chain 
compaction")
+                .tag("instance_id", instance_id_)
+                .tag("source_instance_id", instance_info.source_instance_id())
+                .tag("snapshot_id", instance_info.source_snapshot_id());
+        return -1;
+    }
+    auto source_instance_id = instance_info.source_instance_id();
+    auto snapshot_id = instance_info.source_snapshot_id();
+    versioned::SnapshotReferenceKeyInfo ref_key_info {source_instance_id, 
snapshot_versionstamp,
+                                                      instance_id_};
+    std::string reference_key = 
versioned::snapshot_reference_key(ref_key_info);
+    txn->remove(reference_key);
+
+    // instance_info.clear_source_instance_id();
+    instance_info.clear_source_snapshot_id();
+    instance_info.clear_compacted_key_sets();
+    txn->atomic_add(system_meta_service_instance_update_key(), 1);
+    txn->put(key, instance_info.SerializeAsString());
+
+    err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG_WARNING("failed to commit instance info in chain compaction")
+                .tag("instance_id", instance_id_)
+                .tag("source_instance_id", source_instance_id)
+                .tag("snapshot_id", snapshot_id)
+                .tag("error", err);
+        return -1;
+    }
+
+    LOG_INFO("finish chain compaction")
+            .tag("instance_id", instance_id_)
+            .tag("source_instance_id", source_instance_id)
+            .tag("snapshot_id", snapshot_id);
     return 0;
 }
 
diff --git a/cloud/test/document_message_test.cpp 
b/cloud/test/document_message_test.cpp
index d78309179cc..47a70ad9748 100644
--- a/cloud/test/document_message_test.cpp
+++ b/cloud/test/document_message_test.cpp
@@ -372,7 +372,7 @@ TEST(DocumentMessageTest, DocumentPutTabletSchemaCloudPB) {
         column->set_is_nullable(false);
     }
 
-    std::string schema_key = "tablet_schema_key_test";
+    std::string schema_key = versioned::meta_schema_key({"instance_id", 10, 
0});
     {
         // create a txn and put tablet schema
         doris::TabletSchemaCloudPB schema_copy(schema);
@@ -406,6 +406,27 @@ TEST(DocumentMessageTest, DocumentPutTabletSchemaCloudPB) {
 
     ASSERT_GE(txn_kv->total_kvs(), 2) << dump_range(txn_kv.get());
 
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+        std::string begin_key = versioned::meta_schema_key({"instance_id", 10, 
0});
+        std::string end_key = versioned::meta_schema_key({"instance_id", 10, 
INT64_MAX});
+        FullRangeGetOptions options;
+        std::unique_ptr<FullRangeGetIterator> iter =
+                txn->full_range_get(begin_key, end_key, std::move(options));
+        for (auto kvp = iter->next(); kvp.has_value(); kvp = iter->next()) {
+            auto&& [key, _] = *kvp;
+            int64_t decode_index_id = -1;
+            int64_t schema_version = -1;
+            std::string_view key_view(key);
+            ASSERT_TRUE(versioned::decode_meta_schema_key(&key_view, 
&decode_index_id,
+                                                          &schema_version));
+            ASSERT_EQ(10, decode_index_id);
+            ASSERT_EQ(0, schema_version);
+        }
+    }
+
     {
         // create a txn and remove tablet schema
         std::unique_ptr<Transaction> txn;
diff --git a/cloud/test/meta_reader_test.cpp b/cloud/test/meta_reader_test.cpp
index 9e818c1f258..3b7076a9008 100644
--- a/cloud/test/meta_reader_test.cpp
+++ b/cloud/test/meta_reader_test.cpp
@@ -130,6 +130,28 @@ TEST(MetaReaderTest, GetTableVersion) {
     }
 
     ASSERT_LT(version1, version2);
+
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        std::string begin_key = versioned::table_version_key({instance_id, 
table_id});
+        std::string end_key = versioned::table_version_key({instance_id, 
table_id + 1});
+        FullRangeGetOptions options;
+        std::unique_ptr<FullRangeGetIterator> iter =
+                txn->full_range_get(begin_key, end_key, std::move(options));
+        int num = 1;
+        for (auto kvp = iter->next(); kvp.has_value(); kvp = iter->next()) {
+            auto&& [key, _] = *kvp;
+            int64_t decoded_id;
+            Versionstamp decoded_version;
+            std::string_view key_view(key);
+            ASSERT_TRUE(
+                    versioned::decode_table_version_key(&key_view, 
&decoded_id, &decoded_version));
+            ASSERT_EQ(decoded_id, table_id);
+            ASSERT_EQ(decoded_version, num == 1 ? version1 : version2);
+            num++;
+        }
+    }
 }
 
 TEST(MetaReaderTest, BatchGetTableVersion) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to