This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d74a0ea609 [chore](cloud) support large operation log (#58100)
1d74a0ea609 is described below

commit 1d74a0ea60966f15ab26768edf9a95813d421941
Author: walter <[email protected]>
AuthorDate: Wed Nov 19 11:33:21 2025 +0800

    [chore](cloud) support large operation log (#58100)
---
 cloud/src/meta-service/meta_service.cpp           | 14 ++---
 cloud/src/meta-service/meta_service_job.cpp       | 31 ++---------
 cloud/src/meta-service/meta_service_partition.cpp | 51 ++++++------------
 cloud/src/meta-service/meta_service_txn.cpp       | 35 +++---------
 cloud/src/recycler/recycler.h                     |  8 +--
 cloud/src/recycler/recycler_operation_log.cpp     | 66 ++++++++++++++++-------
 cloud/test/meta_service_operation_log_test.cpp    | 66 ++++++++++++++---------
 cloud/test/recycler_operation_log_test.cpp        | 56 +++++++++++++------
 8 files changed, 163 insertions(+), 164 deletions(-)

diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index 9018c9b7837..03ce61ecfb0 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -1186,16 +1186,10 @@ void 
MetaServiceImpl::update_tablet(::google::protobuf::RpcController* controlle
     if (is_versioned_write && update_tablet_log.tablet_ids_size() > 0) {
         OperationLogPB log;
         log.mutable_update_tablet()->Swap(&update_tablet_log);
-        std::string update_log_key = versioned::log_key(instance_id);
-        std::string operation_log_value;
-        if (!log.SerializeToString(&operation_log_value)) {
-            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
-            msg = "failed to serialize update tablet log";
-            return;
-        }
-        versioned_put(txn.get(), update_log_key, operation_log_value);
-        LOG(INFO) << "put versioned update tablet log, key=" << 
hex(update_log_key)
-                  << " instance_id=" << instance_id << " log_size=" << 
operation_log_value.size();
+        std::string log_key = versioned::log_key(instance_id);
+        versioned::blob_put(txn.get(), log_key, log);
+        LOG(INFO) << "put update tablet operation log, key=" << hex(log_key)
+                  << " instance_id=" << instance_id;
     }
 
     err = txn->commit();
diff --git a/cloud/src/meta-service/meta_service_job.cpp 
b/cloud/src/meta-service/meta_service_job.cpp
index 46cd7f44278..7a7df579fa1 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -37,6 +37,7 @@
 #include "meta-service/meta_service.h"
 #include "meta-service/meta_service_helper.h"
 #include "meta-service/meta_service_tablet_stats.h"
+#include "meta-store/blob_message.h"
 #include "meta-store/clone_chain_reader.h"
 #include "meta-store/document_message.h"
 #include "meta-store/keys.h"
@@ -1274,28 +1275,17 @@ void process_compaction_job(MetaServiceCode& code, 
std::string& msg, std::string
     if (!compaction_log.recycle_rowsets().empty() && is_versioned_write) {
         size_t num_recycled_rowsets = compaction_log.recycle_rowsets().size();
         std::string operation_log_key = versioned::log_key({instance_id});
-        std::string operation_log_value;
         OperationLogPB operation_log;
         if (is_versioned_read) {
             operation_log.set_min_timestamp(meta_reader.min_read_version());
         }
         operation_log.mutable_compaction()->Swap(&compaction_log);
-        if (!operation_log.SerializeToString(&operation_log_value)) {
-            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
-            msg = fmt::format("failed to serialize OperationLogPB: {}", 
hex(operation_log_key));
-            LOG_WARNING(msg)
-                    .tag("instance_id", instance_id)
-                    .tag("table_id", request->job().idx().table_id());
-            return;
-        }
-        // Put versioned operation log for compaction to track recycling
-        LOG_INFO("put versioned operation log key")
+        versioned::blob_put(txn.get(), operation_log_key, operation_log);
+        LOG_INFO("put compaction operation log key")
                 .tag("instance_id", instance_id)
                 .tag("operation_log_key", hex(operation_log_key))
                 .tag("tablet_id", tablet_id)
-                .tag("value_size", operation_log_value.size())
                 .tag("recycle_rowsets_count", num_recycled_rowsets);
-        versioned_put(txn.get(), operation_log_key, operation_log_value);
     }
 }
 
@@ -1882,29 +1872,18 @@ void process_schema_change_job(MetaServiceCode& code, 
std::string& msg, std::str
 
     if (is_versioned_write) {
         std::string operation_log_key = versioned::log_key({instance_id});
-        std::string operation_log_value;
         OperationLogPB operation_log;
         if (is_versioned_read) {
             operation_log.set_min_timestamp(reader.min_read_version());
         }
         operation_log.mutable_schema_change()->Swap(&schema_change_log);
-        if (!operation_log.SerializeToString(&operation_log_value)) {
-            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
-            msg = fmt::format("failed to serialize OperationLogPB: {}", 
hex(operation_log_key));
-            LOG_WARNING(msg)
-                    .tag("instance_id", instance_id)
-                    .tag("table_id", request->job().idx().table_id());
-            return;
-        }
-        // Put versioned operation log for compaction to track recycling
-        LOG_INFO("put versioned operation log key")
+        versioned::blob_put(txn.get(), operation_log_key, operation_log);
+        LOG_INFO("put schema change operation log key")
                 .tag("instance_id", instance_id)
                 .tag("operation_log_key", hex(operation_log_key))
                 .tag("tablet_id", tablet_id)
                 .tag("new_tablet_id", new_tablet_id)
-                .tag("value_size", operation_log_value.size())
                 .tag("recycle_rowsets_count", 
schema_change_log.recycle_rowsets().size());
-        versioned_put(txn.get(), operation_log_key, operation_log_value);
     }
 }
 
diff --git a/cloud/src/meta-service/meta_service_partition.cpp 
b/cloud/src/meta-service/meta_service_partition.cpp
index 1321b6aee9f..f347272be27 100644
--- a/cloud/src/meta-service/meta_service_partition.cpp
+++ b/cloud/src/meta-service/meta_service_partition.cpp
@@ -24,6 +24,7 @@
 #include "common/logging.h"
 #include "common/stats.h"
 #include "meta-service/meta_service_helper.h"
+#include "meta-store/blob_message.h"
 #include "meta-store/clone_chain_reader.h"
 #include "meta-store/keys.h"
 #include "meta-store/meta_reader.h"
@@ -295,19 +296,16 @@ void 
MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller
 
     if (commit_index_log.index_ids_size() > 0 && 
is_version_write_enabled(instance_id)) {
         std::string operation_log_key = versioned::log_key({instance_id});
-        std::string operation_log_value;
         OperationLogPB operation_log;
         if (is_versioned_read) {
             operation_log.set_min_timestamp(reader.min_read_version());
         }
         operation_log.mutable_commit_index()->Swap(&commit_index_log);
-        if (!operation_log.SerializeToString(&operation_log_value)) {
-            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
-            msg = fmt::format("failed to serialize OperationLogPB: {}", 
hex(operation_log_key));
-            LOG_WARNING(msg).tag("instance_id", instance_id).tag("table_id", 
request->table_id());
-            return;
-        }
-        versioned_put(txn.get(), operation_log_key, operation_log_value);
+        versioned::blob_put(txn.get(), operation_log_key, operation_log);
+        LOG_INFO("put commit index operation log key")
+                .tag("instance_id", instance_id)
+                .tag("table_id", request->table_id())
+                .tag("operation_log_key", hex(operation_log_key));
     }
 
     err = txn->commit();
@@ -419,23 +417,15 @@ void 
MetaServiceImpl::drop_index(::google::protobuf::RpcController* controller,
 
     if (drop_index_log.index_ids_size() > 0 && is_versioned_write) {
         std::string operation_log_key = versioned::log_key({instance_id});
-        std::string operation_log_value;
         OperationLogPB operation_log;
         if (is_versioned_read) {
             operation_log.set_min_timestamp(reader.min_read_version());
         }
         operation_log.mutable_drop_index()->Swap(&drop_index_log);
-        if (!operation_log.SerializeToString(&operation_log_value)) {
-            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
-            msg = fmt::format("failed to serialize OperationLogPB: {}", 
hex(operation_log_key));
-            LOG_WARNING(msg).tag("instance_id", instance_id).tag("table_id", 
request->table_id());
-            return;
-        }
-        versioned_put(txn.get(), operation_log_key, operation_log_value);
+        versioned::blob_put(txn.get(), operation_log_key, operation_log);
         LOG(INFO) << "put drop index operation log"
                   << " instance_id=" << instance_id << " table_id=" << 
request->table_id()
-                  << " index_ids=" << drop_index_log.index_ids_size()
-                  << " log_size=" << operation_log_value.size();
+                  << " index_ids=" << drop_index_log.index_ids_size();
     }
 
     err = txn->commit();
@@ -719,19 +709,16 @@ void 
MetaServiceImpl::commit_partition(::google::protobuf::RpcController* contro
 
     if (commit_partition_log.partition_ids_size() > 0 && 
is_version_write_enabled(instance_id)) {
         std::string operation_log_key = versioned::log_key({instance_id});
-        std::string operation_log_value;
         OperationLogPB operation_log;
         if (is_versioned_read) {
             operation_log.set_min_timestamp(reader.min_read_version());
         }
         operation_log.mutable_commit_partition()->Swap(&commit_partition_log);
-        if (!operation_log.SerializeToString(&operation_log_value)) {
-            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
-            msg = fmt::format("failed to serialize OperationLogPB: {}", 
hex(operation_log_key));
-            LOG_WARNING(msg).tag("instance_id", instance_id).tag("table_id", 
request->table_id());
-            return;
-        }
-        versioned_put(txn.get(), operation_log_key, operation_log_value);
+        versioned::blob_put(txn.get(), operation_log_key, operation_log);
+        LOG_INFO("put commit partition operation log key")
+                .tag("instance_id", instance_id)
+                .tag("table_id", request->table_id())
+                .tag("operation_log_key", hex(operation_log_key));
     }
 
     err = txn->commit();
@@ -864,23 +851,15 @@ void 
MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll
          drop_partition_log.partition_ids_size() > 0) &&
         is_versioned_write) {
         std::string operation_log_key = versioned::log_key({instance_id});
-        std::string operation_log_value;
         OperationLogPB operation_log;
         if (is_versioned_read) {
             operation_log.set_min_timestamp(reader.min_read_version());
         }
         operation_log.mutable_drop_partition()->Swap(&drop_partition_log);
-        if (!operation_log.SerializeToString(&operation_log_value)) {
-            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
-            msg = fmt::format("failed to serialize OperationLogPB: {}", 
hex(operation_log_key));
-            LOG_WARNING(msg).tag("instance_id", instance_id).tag("table_id", 
request->table_id());
-            return;
-        }
-        versioned_put(txn.get(), operation_log_key, operation_log_value);
+        versioned::blob_put(txn.get(), operation_log_key, operation_log);
         LOG(INFO) << "put drop partition operation log"
                   << " instance_id=" << instance_id << " table_id=" << 
request->table_id()
-                  << " partition_ids=" << 
drop_partition_log.partition_ids_size()
-                  << " log_size=" << operation_log_value.size();
+                  << " partition_ids=" << 
drop_partition_log.partition_ids_size();
     }
 
     err = txn->commit();
diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index dbb73383163..afd51268e56 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -32,6 +32,7 @@
 #include "meta-service/meta_service.h"
 #include "meta-service/meta_service_helper.h"
 #include "meta-service/meta_service_tablet_stats.h"
+#include "meta-store/blob_message.h"
 #include "meta-store/clone_chain_reader.h"
 #include "meta-store/document_message.h"
 #include "meta-store/keys.h"
@@ -1829,17 +1830,9 @@ void MetaServiceImpl::commit_txn_immediately(
                 
operation_log.set_min_timestamp(meta_reader.min_read_version());
             }
             operation_log.mutable_commit_txn()->Swap(&commit_txn_log);
-            std::string operation_log_value;
-            if (!operation_log.SerializeToString(&operation_log_value)) {
-                code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
-                ss << "failed to serialize operation_log, txn_id=" << txn_id;
-                msg = ss.str();
-                return;
-            }
+            versioned::blob_put(txn.get(), log_key, operation_log);
             LOG(INFO) << "put commit txn operation log, key=" << hex(log_key)
-                      << " txn_id=" << txn_id
-                      << " operation_log_size=" << operation_log_value.size();
-            versioned_put(txn.get(), log_key, operation_log_value);
+                      << " txn_id=" << txn_id;
         } else {
             std::string recycle_val;
             if (!recycle_pb.SerializeToString(&recycle_val)) {
@@ -2358,16 +2351,9 @@ void MetaServiceImpl::commit_txn_eventually(
                 
operation_log.set_min_timestamp(meta_reader.min_read_version());
             }
             operation_log.mutable_commit_txn()->Swap(&commit_txn_log);
-            std::string operation_log_value;
-            if (!operation_log.SerializeToString(&operation_log_value)) {
-                code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
-                ss << "failed to serialize operation_log, txn_id=" << txn_id;
-                msg = ss.str();
-                return;
-            }
-            versioned_put(txn.get(), log_key, operation_log_value);
+            versioned::blob_put(txn.get(), log_key, operation_log);
             LOG(INFO) << "put commit txn operation log, key=" << hex(log_key)
-                      << " txn_id=" << txn_id << " log_size=" << 
operation_log_value.size();
+                      << " txn_id=" << txn_id;
         }
 
         VLOG_DEBUG << "put_size=" << txn->put_bytes() << " del_size=" << 
txn->delete_bytes()
@@ -2880,21 +2866,14 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const 
CommitTxnRequest* request,
         if (is_versioned_write) {
             commit_txn_log.mutable_recycle_txn()->Swap(&recycle_pb);
             std::string log_key = versioned::log_key({instance_id});
-            std::string operation_log_value;
             OperationLogPB operation_log;
             if (is_versioned_read) {
                 
operation_log.set_min_timestamp(meta_reader.min_read_version());
             }
             operation_log.mutable_commit_txn()->Swap(&commit_txn_log);
-            if (!operation_log.SerializeToString(&operation_log_value)) {
-                code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
-                ss << "failed to serialize operation_log, txn_id=" << txn_id;
-                msg = ss.str();
-                return;
-            }
-            versioned_put(txn.get(), log_key, operation_log_value);
+            versioned::blob_put(txn.get(), log_key, operation_log);
             LOG(INFO) << "put commit txn operation log key=" << 
hex(recycle_key)
-                      << " txn_id=" << txn_id << " log_size=" << 
operation_log_value.size();
+                      << " txn_id=" << txn_id;
         } else {
             if (!recycle_pb.SerializeToString(&recycle_val)) {
                 code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index 463f5e9cb6b..12365c8668c 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -422,11 +422,11 @@ private:
     // for scan all rs of tablet and statistics metrics
     int scan_tablet_and_statistics(int64_t tablet_id, RecyclerMetricsContext& 
metrics_context);
 
-    // Recycle operation log and the log key.
+    // Recycle operation log and the log keys. The log keys are specified by 
`raw_keys`.
     //
-    // The log_key is constructed from the log_version and instance_id.
-    // Both `operation_log` and `log_key` will be removed in the same 
transaction, to ensure atomicity.
-    int recycle_operation_log(Versionstamp log_version, OperationLogPB 
operation_log);
+    // Both `operation_log` and `raw_keys` will be removed in the same 
transaction, to ensure atomicity.
+    int recycle_operation_log(Versionstamp log_version, const 
std::vector<std::string>& raw_keys,
+                              OperationLogPB operation_log);
 
     // Recycle rowset meta and data, return 0 for success otherwise error
     //
diff --git a/cloud/src/recycler/recycler_operation_log.cpp 
b/cloud/src/recycler/recycler_operation_log.cpp
index c5a95901afa..c9ea3f0ba99 100644
--- a/cloud/src/recycler/recycler_operation_log.cpp
+++ b/cloud/src/recycler/recycler_operation_log.cpp
@@ -38,6 +38,7 @@
 #include "common/util.h"
 #include "meta-service/meta_service.h"
 #include "meta-service/meta_service_schema.h"
+#include "meta-store/blob_message.h"
 #include "meta-store/document_message.h"
 #include "meta-store/keys.h"
 #include "meta-store/meta_reader.h"
@@ -128,8 +129,12 @@ bool OperationLogRecycleChecker::can_recycle(const 
Versionstamp& log_versionstam
 // A recycler for operation logs.
 class OperationLogRecycler {
 public:
-    OperationLogRecycler(std::string_view instance_id, TxnKv* txn_kv, 
Versionstamp log_version)
-            : instance_id_(instance_id), txn_kv_(txn_kv), 
log_version_(log_version) {}
+    OperationLogRecycler(std::string_view instance_id, TxnKv* txn_kv, 
Versionstamp log_version,
+                         const std::vector<std::string>& raw_keys)
+            : instance_id_(instance_id),
+              txn_kv_(txn_kv),
+              log_version_(log_version),
+              raw_keys_(raw_keys) {}
     OperationLogRecycler(const OperationLogRecycler&) = delete;
     OperationLogRecycler& operator=(const OperationLogRecycler&) = delete;
 
@@ -164,6 +169,7 @@ private:
     std::string_view instance_id_;
     TxnKv* txn_kv_;
     Versionstamp log_version_;
+    const std::vector<std::string>& raw_keys_;
 
     std::unique_ptr<Transaction> txn_;
 };
@@ -372,9 +378,10 @@ int OperationLogRecycler::begin() {
 
 int OperationLogRecycler::commit() {
     // Remove the operation log entry itself after recycling its contents
-    std::string log_key = 
encode_versioned_key(versioned::log_key(instance_id_), log_version_);
     LOG_INFO("remove operation log key").tag("log_version", 
log_version_.to_string());
-    txn_->remove(log_key);
+    for (const auto& raw_key : raw_keys_) {
+        txn_->remove(raw_key);
+    }
 
     TxnErrorCode err = txn_->commit();
     if (err != TxnErrorCode::TXN_OK) {
@@ -629,7 +636,8 @@ int InstanceRecycler::recycle_operation_logs() {
     }
 
     auto scan_and_recycle_operation_log = [&](const std::string_view& key,
-                                              const std::string_view& value) {
+                                              const std::vector<std::string>& 
raw_keys,
+                                              OperationLogPB operation_log) {
         std::string_view log_key(key);
         Versionstamp log_versionstamp;
         if (!decode_versioned_key(&log_key, &log_versionstamp)) {
@@ -638,28 +646,22 @@ int InstanceRecycler::recycle_operation_logs() {
             return -1;
         }
 
-        OperationLogPB operation_log;
-        if (!operation_log.ParseFromArray(value.data(), value.size())) {
-            LOG_WARNING("failed to parse OperationLogPB from operation log 
key")
-                    .tag("key", hex(key));
-            return -1;
-        }
-
+        size_t value_size = operation_log.ByteSizeLong();
         if (recycle_checker.can_recycle(log_versionstamp, 
operation_log.min_timestamp())) {
             AnnotateTag tag("log_key", hex(key));
-            int res = recycle_operation_log(log_versionstamp, 
std::move(operation_log));
+            int res = recycle_operation_log(log_versionstamp, raw_keys, 
std::move(operation_log));
             if (res != 0) {
                 LOG_WARNING("failed to recycle operation 
log").tag("error_code", res);
                 return res;
             }
 
             recycled_operation_logs++;
-            recycled_operation_log_data_size += value.size();
+            recycled_operation_log_data_size += value_size;
         }
 
         total_operation_logs++;
-        operation_log_data_size += value.size();
-        max_operation_log_data_size = std::max(max_operation_log_data_size, 
value.size());
+        operation_log_data_size += value_size;
+        max_operation_log_data_size = std::max(max_operation_log_data_size, 
value_size);
         return 0;
     };
 
@@ -699,15 +701,39 @@ int InstanceRecycler::recycle_operation_logs() {
     std::string log_key_prefix = versioned::log_key(instance_id_);
     std::string begin_key = encode_versioned_key(log_key_prefix, 
Versionstamp::min());
     std::string end_key = encode_versioned_key(log_key_prefix, 
Versionstamp::max());
-    return scan_and_recycle(std::move(begin_key), end_key,
-                            std::move(scan_and_recycle_operation_log),
-                            std::move(is_multi_version_status_changed));
+
+    std::unique_ptr<BlobIterator> iter = blob_get_range(txn_kv_, begin_key, 
end_key);
+    for (size_t i = 0; iter->valid(); iter->next(), i++) {
+        std::string_view key = iter->key();
+        OperationLogPB operation_log;
+        if (!iter->parse_value(&operation_log)) {
+            LOG_WARNING("failed to parse OperationLogPB from operation log 
key")
+                    .tag("key", hex(key));
+            return -1;
+        }
+
+        int res = scan_and_recycle_operation_log(key, iter->raw_keys(), 
std::move(operation_log));
+        if (res != 0) {
+            return res;
+        }
+
+        if (i % 1000 == 0 && is_multi_version_status_changed() != 0) {
+            return -1;
+        }
+    }
+    if (iter->error_code() != TxnErrorCode::TXN_OK) {
+        LOG_WARNING("error occurred during scanning operation logs")
+                .tag("error_code", iter->error_code());
+        return -1;
+    }
+    return 0;
 }
 
 int InstanceRecycler::recycle_operation_log(Versionstamp log_version,
+                                            const std::vector<std::string>& 
raw_keys,
                                             OperationLogPB operation_log) {
     int recycle_log_count = 0;
-    OperationLogRecycler log_recycler(instance_id_, txn_kv_.get(), 
log_version);
+    OperationLogRecycler log_recycler(instance_id_, txn_kv_.get(), 
log_version, raw_keys);
     RETURN_ON_FAILURE(log_recycler.begin());
 
 #define RECYCLE_OPERATION_LOG(log_type, method_name)                      \
diff --git a/cloud/test/meta_service_operation_log_test.cpp 
b/cloud/test/meta_service_operation_log_test.cpp
index 66dac8bef54..af2da9b23f9 100644
--- a/cloud/test/meta_service_operation_log_test.cpp
+++ b/cloud/test/meta_service_operation_log_test.cpp
@@ -30,12 +30,14 @@
 #include "common/util.h"
 #include "cpp/sync_point.h"
 #include "meta-service/meta_service.h"
+#include "meta-store/blob_message.h"
 #include "meta-store/document_message.h"
 #include "meta-store/keys.h"
 #include "meta-store/meta_reader.h"
 #include "meta-store/txn_kv.h"
 #include "meta-store/txn_kv_error.h"
 #include "meta-store/versioned_value.h"
+#include "meta-store/versionstamp.h"
 
 namespace doris::cloud {
 // External functions from meta_service_test.cpp
@@ -103,6 +105,31 @@ static std::string dump_range(TxnKv* txn_kv, 
std::string_view begin = "",
     return buffer;
 }
 
+// It will get the latest versioned values.
+TxnErrorCode read_operation_log(Transaction* txn, std::string_view log_key,
+                                Versionstamp* log_version, OperationLogPB* 
operation_log) {
+    std::string begin_key = encode_versioned_key(log_key, Versionstamp::min());
+    std::string end_key = encode_versioned_key(log_key, Versionstamp::max());
+    auto iter = blob_get_range(txn, begin_key, end_key);
+    if (!iter->valid()) {
+        TxnErrorCode err = iter->error_code();
+        if (err != TxnErrorCode::TXN_OK) {
+            return err;
+        }
+        return TxnErrorCode::TXN_KEY_NOT_FOUND;
+    }
+    for (; iter->valid(); iter->next()) {
+        std::string_view key = iter->key();
+        if (!decode_versioned_key(&key, log_version)) {
+            return TxnErrorCode::TXN_INVALID_DATA;
+        }
+        if (!iter->parse_value(operation_log)) {
+            return TxnErrorCode::TXN_INVALID_DATA;
+        }
+    }
+    return iter->error_code();
+}
+
 TEST(MetaServiceOperationLogTest, CommitPartitionLog) {
     auto meta_service = get_meta_service(false);
     std::string instance_id = "commit_partition_log";
@@ -200,10 +227,9 @@ TEST(MetaServiceOperationLogTest, CommitPartitionLog) {
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
         std::string log_key = versioned::log_key({instance_id});
-        std::string value;
-        ASSERT_EQ(versioned_get(txn.get(), log_key, &version2, &value), 
TxnErrorCode::TXN_OK);
         OperationLogPB operation_log;
-        ASSERT_TRUE(operation_log.ParseFromString(value));
+        ASSERT_EQ(read_operation_log(txn.get(), log_key, &version2, 
&operation_log),
+                  TxnErrorCode::TXN_OK);
         ASSERT_TRUE(operation_log.has_commit_partition());
     }
 
@@ -371,10 +397,9 @@ TEST(MetaServiceOperationLogTest, DropPartitionLog) {
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
         std::string log_key = versioned::log_key({instance_id});
-        std::string value;
-        ASSERT_EQ(versioned_get(txn.get(), log_key, &version2, &value), 
TxnErrorCode::TXN_OK);
         OperationLogPB operation_log;
-        ASSERT_TRUE(operation_log.ParseFromString(value));
+        ASSERT_EQ(read_operation_log(txn.get(), log_key, &version2, 
&operation_log),
+                  TxnErrorCode::TXN_OK);
         ASSERT_TRUE(operation_log.has_drop_partition());
         ASSERT_EQ(operation_log.drop_partition().partition_ids_size(), 1);
         ASSERT_EQ(operation_log.drop_partition().partition_ids(0), 
partition_id + 3);
@@ -477,10 +502,9 @@ TEST(MetaServiceOperationLogTest, CommitIndexLog) {
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
         std::string log_key = versioned::log_key({instance_id});
-        std::string value;
-        ASSERT_EQ(versioned_get(txn.get(), log_key, &version2, &value), 
TxnErrorCode::TXN_OK);
         OperationLogPB operation_log;
-        ASSERT_TRUE(operation_log.ParseFromString(value));
+        ASSERT_EQ(read_operation_log(txn.get(), log_key, &version2, 
&operation_log),
+                  TxnErrorCode::TXN_OK);
         ASSERT_TRUE(operation_log.has_commit_index());
     }
 
@@ -712,10 +736,9 @@ TEST(MetaServiceOperationLogTest, DropIndexLog) {
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
         std::string log_key = versioned::log_key({instance_id});
-        std::string value;
-        ASSERT_EQ(versioned_get(txn.get(), log_key, &version, &value), 
TxnErrorCode::TXN_OK);
         OperationLogPB operation_log;
-        ASSERT_TRUE(operation_log.ParseFromString(value));
+        ASSERT_EQ(read_operation_log(txn.get(), log_key, &version, 
&operation_log),
+                  TxnErrorCode::TXN_OK);
         ASSERT_TRUE(operation_log.has_drop_index());
         ASSERT_EQ(operation_log.drop_index().index_ids_size(), 1);
         ASSERT_EQ(operation_log.drop_index().index_ids(0), index_id + 3);
@@ -906,10 +929,9 @@ TEST(MetaServiceOperationLogTest, CommitTxn) {
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
         std::string log_key = versioned::log_key({instance_id});
-        std::string value;
-        ASSERT_EQ(versioned_get(txn.get(), log_key, &version, &value), 
TxnErrorCode::TXN_OK);
         OperationLogPB operation_log;
-        ASSERT_TRUE(operation_log.ParseFromString(value));
+        ASSERT_EQ(read_operation_log(txn.get(), log_key, &version, 
&operation_log),
+                  TxnErrorCode::TXN_OK);
         ASSERT_TRUE(operation_log.has_commit_txn());
 
         const auto& commit_log = operation_log.commit_txn();
@@ -1055,11 +1077,9 @@ TEST(MetaServiceOperationLogTest, CommitTxnEventually) {
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
         std::string log_key = versioned::log_key({instance_id});
-        std::string value;
-        ASSERT_EQ(versioned_get(txn.get(), log_key, &commit_versionstamp, 
&value),
-                  TxnErrorCode::TXN_OK);
         OperationLogPB operation_log;
-        ASSERT_TRUE(operation_log.ParseFromString(value));
+        ASSERT_EQ(read_operation_log(txn.get(), log_key, &commit_versionstamp, 
&operation_log),
+                  TxnErrorCode::TXN_OK);
         ASSERT_TRUE(operation_log.has_commit_txn());
 
         const auto& commit_log = operation_log.commit_txn();
@@ -1349,11 +1369,9 @@ TEST(MetaServiceOperationLogTest, CommitTxnWithSubTxn) {
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
         std::string log_key = versioned::log_key({instance_id});
-        std::string value;
-        ASSERT_EQ(versioned_get(txn.get(), log_key, &commit_versionstamp, 
&value),
-                  TxnErrorCode::TXN_OK);
         OperationLogPB operation_log;
-        ASSERT_TRUE(operation_log.ParseFromString(value));
+        ASSERT_EQ(read_operation_log(txn.get(), log_key, &commit_versionstamp, 
&operation_log),
+                  TxnErrorCode::TXN_OK);
         ASSERT_TRUE(operation_log.has_commit_txn());
 
         const auto& commit_log = operation_log.commit_txn();
@@ -1473,7 +1491,7 @@ TEST(MetaServiceOperationLogTest, 
UpdateVersionedTabletMeta) {
         std::string log_key = versioned::log_key(instance_id);
         OperationLogPB operation_log;
         TxnErrorCode err =
-                versioned::document_get(txn.get(), log_key, &operation_log, 
&log_versionstamp);
+                read_operation_log(txn.get(), log_key, &log_versionstamp, 
&operation_log);
         ASSERT_EQ(err, TxnErrorCode::TXN_OK);
         ASSERT_TRUE(operation_log.has_update_tablet());
         EXPECT_EQ(operation_log.update_tablet().tablet_ids_size(), 2);
diff --git a/cloud/test/recycler_operation_log_test.cpp 
b/cloud/test/recycler_operation_log_test.cpp
index 8b59321135e..90e49b9ac4e 100644
--- a/cloud/test/recycler_operation_log_test.cpp
+++ b/cloud/test/recycler_operation_log_test.cpp
@@ -29,6 +29,7 @@
 #include "common/util.h"
 #include "cpp/sync_point.h"
 #include "meta-service/meta_service.h"
+#include "meta-store/blob_message.h"
 #include "meta-store/document_message.h"
 #include "meta-store/keys.h"
 #include "meta-store/mem_txn_kv.h"
@@ -125,6 +126,31 @@ static void remove_instance_info(TxnKv* txn_kv) {
     ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK) << "Failed to commit 
transaction";
 }
 
+// It will get the latest versioned values.
+TxnErrorCode read_operation_log(Transaction* txn, std::string_view log_key,
+                                Versionstamp* log_version, OperationLogPB* 
operation_log) {
+    std::string begin_key = encode_versioned_key(log_key, Versionstamp::min());
+    std::string end_key = encode_versioned_key(log_key, Versionstamp::max());
+    auto iter = blob_get_range(txn, begin_key, end_key);
+    if (!iter->valid()) {
+        TxnErrorCode err = iter->error_code();
+        if (err != TxnErrorCode::TXN_OK) {
+            return err;
+        }
+        return TxnErrorCode::TXN_KEY_NOT_FOUND;
+    }
+    for (; iter->valid(); iter->next()) {
+        std::string_view key = iter->key();
+        if (!decode_versioned_key(&key, log_version)) {
+            return TxnErrorCode::TXN_INVALID_DATA;
+        }
+        if (!iter->parse_value(operation_log)) {
+            return TxnErrorCode::TXN_INVALID_DATA;
+        }
+    }
+    return iter->error_code();
+}
+
 TEST(RecycleOperationLogTest, RecycleOneOperationLog) {
     auto txn_kv = std::make_shared<MemTxnKv>();
     txn_kv->update_commit_version(1000);
@@ -152,14 +178,13 @@ TEST(RecycleOperationLogTest, RecycleOneOperationLog) {
         // Put a empty operation log
         std::string log_key = versioned::log_key(instance_id);
         Versionstamp versionstamp(123, 0);
-        std::string log_key_with_versionstamp = encode_versioned_key(log_key, 
versionstamp);
         OperationLogPB operation_log;
         operation_log.set_min_timestamp(versionstamp.version());
 
         std::unique_ptr<Transaction> txn;
         TxnErrorCode err = txn_kv->create_txn(&txn);
         ASSERT_EQ(err, TxnErrorCode::TXN_OK);
-        txn->put(log_key_with_versionstamp, operation_log.SerializeAsString());
+        versioned::blob_put(txn.get(), log_key, versionstamp, operation_log);
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
     }
 
@@ -218,7 +243,7 @@ TEST(RecycleOperationLogTest, RecycleCommitPartitionLog) {
         std::unique_ptr<Transaction> txn;
         TxnErrorCode err = txn_kv->create_txn(&txn);
         ASSERT_EQ(err, TxnErrorCode::TXN_OK);
-        versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+        versioned::blob_put(txn.get(), log_key, operation_log);
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
     }
 
@@ -248,7 +273,7 @@ TEST(RecycleOperationLogTest, RecycleCommitPartitionLog) {
         std::unique_ptr<Transaction> txn;
         TxnErrorCode err = txn_kv->create_txn(&txn);
         ASSERT_EQ(err, TxnErrorCode::TXN_OK);
-        versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+        versioned::blob_put(txn.get(), log_key, operation_log);
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
     }
 
@@ -312,7 +337,7 @@ TEST(RecycleOperationLogTest, RecycleDropPartitionLog) {
         std::unique_ptr<Transaction> txn;
         TxnErrorCode err = txn_kv->create_txn(&txn);
         ASSERT_EQ(err, TxnErrorCode::TXN_OK);
-        versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+        versioned::blob_put(txn.get(), log_key, operation_log);
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
     }
 
@@ -363,7 +388,7 @@ TEST(RecycleOperationLogTest, RecycleDropPartitionLog) {
         std::unique_ptr<Transaction> txn;
         TxnErrorCode err = txn_kv->create_txn(&txn);
         ASSERT_EQ(err, TxnErrorCode::TXN_OK);
-        versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+        versioned::blob_put(txn.get(), log_key, operation_log);
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
     }
 
@@ -481,7 +506,7 @@ TEST(RecycleOperationLogTest, RecycleCommitIndexLog) {
         std::unique_ptr<Transaction> txn;
         TxnErrorCode err = txn_kv->create_txn(&txn);
         ASSERT_EQ(err, TxnErrorCode::TXN_OK);
-        versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+        versioned::blob_put(txn.get(), log_key, operation_log);
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
     }
 
@@ -510,7 +535,7 @@ TEST(RecycleOperationLogTest, RecycleCommitIndexLog) {
         std::unique_ptr<Transaction> txn;
         TxnErrorCode err = txn_kv->create_txn(&txn);
         ASSERT_EQ(err, TxnErrorCode::TXN_OK);
-        versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+        versioned::blob_put(txn.get(), log_key, operation_log);
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
     }
 
@@ -556,7 +581,7 @@ TEST(RecycleOperationLogTest, RecycleDropIndexLog) {
         std::unique_ptr<Transaction> txn;
         TxnErrorCode err = txn_kv->create_txn(&txn);
         ASSERT_EQ(err, TxnErrorCode::TXN_OK);
-        versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+        versioned::blob_put(txn.get(), log_key, operation_log);
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
     }
 
@@ -723,7 +748,7 @@ TEST(RecycleOperationLogTest, RecycleCommitTxnLog) {
         std::unique_ptr<Transaction> txn;
         TxnErrorCode err = txn_kv->create_txn(&txn);
         ASSERT_EQ(err, TxnErrorCode::TXN_OK);
-        versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+        versioned::blob_put(txn.get(), log_key, operation_log);
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
     }
 
@@ -848,7 +873,7 @@ TEST(RecycleOperationLogTest, 
RecycleCommitTxnLogWhenTxnIsNotVisible) {
         std::unique_ptr<Transaction> txn;
         TxnErrorCode err = txn_kv->create_txn(&txn);
         ASSERT_EQ(err, TxnErrorCode::TXN_OK);
-        versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+        versioned::blob_put(txn.get(), log_key, operation_log);
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
     }
 
@@ -929,7 +954,7 @@ TEST(RecycleOperationLogTest, RecycleUpdateTabletLog) {
         std::unique_ptr<Transaction> txn;
         TxnErrorCode err = txn_kv->create_txn(&txn);
         ASSERT_EQ(err, TxnErrorCode::TXN_OK);
-        versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+        versioned::blob_put(txn.get(), log_key, operation_log);
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
     }
 
@@ -1235,10 +1260,9 @@ TEST(RecycleOperationLogTest, RecycleCompactionLog) {
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
         std::string log_key = versioned::log_key({test_instance_id});
-        std::string value;
-        ASSERT_EQ(versioned_get(txn.get(), log_key, &log_version, &value), 
TxnErrorCode::TXN_OK);
+        ASSERT_EQ(read_operation_log(txn.get(), log_key, &log_version, 
&operation_log),
+                  TxnErrorCode::TXN_OK);
 
-        ASSERT_TRUE(operation_log.ParseFromString(value));
         ASSERT_TRUE(operation_log.has_compaction());
 
         const auto& compaction_log = operation_log.compaction();
@@ -2202,7 +2226,7 @@ TEST(RecycleOperationLogTest, RecycleDeletedInstance) {
 
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
-        versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
+        versioned::blob_put(txn.get(), log_key, operation_log);
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to