gavinchou commented on code in PR #50037:
URL: https://github.com/apache/doris/pull/50037#discussion_r2052407369


##########
cloud/test/recycler_test.cpp:
##########
@@ -4051,4 +4055,232 @@ TEST(RecyclerTest, 
delete_tmp_rowset_without_resource_id) {
     }
 }
 
+static std::string generate_random_string(int length) {
+    std::string char_set = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+    std::random_device rd;
+    std::mt19937 generator(rd());
+    std::uniform_int_distribution<int> distribution(0, char_set.length() - 1);
+
+    std::string randomString;
+    for (int i = 0; i < length; ++i) {
+        randomString += char_set[distribution(generator)];
+    }
+    return randomString;
+}
+
+std::string instance_id = "concurrent_recycle_txn_label_test_" + 
generate_random_string(10);

Review Comment:
   use MemKv to get rid of persisting anything in FDB



##########
cloud/test/recycler_test.cpp:
##########
@@ -4051,4 +4055,232 @@ TEST(RecyclerTest, 
delete_tmp_rowset_without_resource_id) {
     }
 }
 
+static std::string generate_random_string(int length) {
+    std::string char_set = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+    std::random_device rd;
+    std::mt19937 generator(rd());
+    std::uniform_int_distribution<int> distribution(0, char_set.length() - 1);
+
+    std::string randomString;
+    for (int i = 0; i < length; ++i) {
+        randomString += char_set[distribution(generator)];
+    }
+    return randomString;
+}
+
+std::string instance_id = "concurrent_recycle_txn_label_test_" + 
generate_random_string(10);
+
+int put_single_kv(const std::unique_ptr<Transaction>& txn, int64_t i) {
+    std::string key;
+    std::string val;
+    int64_t db_id = i;
+    int64_t txn_id = 100000 + i;
+    int64_t sub_txn_id = 200000 + i;
+    int64_t current_time = duration_cast<std::chrono::milliseconds>(
+                                   
std::chrono::system_clock::now().time_since_epoch())
+                                   .count();
+
+    // RecycleTxnKeyInfo -> RecycleTxnPB
+    RecycleTxnKeyInfo recycle_txn_key_info {instance_id, db_id, txn_id};
+    recycle_txn_key(recycle_txn_key_info, &key);
+    RecycleTxnPB recycle_txn_pb;
+    if (i < 8000) {
+        recycle_txn_pb.set_creation_time(current_time - 4 * 24 * 3600 * 1000L);
+    } else {
+        recycle_txn_pb.set_creation_time(current_time);
+    }
+    recycle_txn_pb.set_label("recycle_txn_key_info_label_" + 
std::to_string(i));
+    if (!recycle_txn_pb.SerializeToString(&val)) {
+        LOG_WARNING("failed to serialize recycle txn info")
+                .tag("key", hex(key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return -1;
+    }
+    LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", 
instance_id, db_id,
+                             recycle_txn_pb.label(), hex(key));
+    txn->put(key, val);
+
+    // TxnIndexKey -> TxnIndexPB
+    key.clear();

Review Comment:
   better not to reuse var key value
   use new var names like
   txn_idx_key txn_idx_value



##########
cloud/test/recycler_test.cpp:
##########
@@ -4051,4 +4055,232 @@ TEST(RecyclerTest, 
delete_tmp_rowset_without_resource_id) {
     }
 }
 
+static std::string generate_random_string(int length) {
+    std::string char_set = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+    std::random_device rd;
+    std::mt19937 generator(rd());
+    std::uniform_int_distribution<int> distribution(0, char_set.length() - 1);
+
+    std::string randomString;
+    for (int i = 0; i < length; ++i) {
+        randomString += char_set[distribution(generator)];
+    }
+    return randomString;
+}
+
+std::string instance_id = "concurrent_recycle_txn_label_test_" + 
generate_random_string(10);
+
+int put_single_kv(const std::unique_ptr<Transaction>& txn, int64_t i) {

Review Comment:
   add comment what it does
   and suggest naming is  `make_single_txn_related_kvs`



##########
cloud/test/recycler_test.cpp:
##########
@@ -4051,4 +4055,232 @@ TEST(RecyclerTest, 
delete_tmp_rowset_without_resource_id) {
     }
 }
 
+static std::string generate_random_string(int length) {
+    std::string char_set = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+    std::random_device rd;
+    std::mt19937 generator(rd());
+    std::uniform_int_distribution<int> distribution(0, char_set.length() - 1);
+
+    std::string randomString;
+    for (int i = 0; i < length; ++i) {
+        randomString += char_set[distribution(generator)];
+    }
+    return randomString;
+}
+
+std::string instance_id = "concurrent_recycle_txn_label_test_" + 
generate_random_string(10);
+
+int put_single_kv(const std::unique_ptr<Transaction>& txn, int64_t i) {
+    std::string key;
+    std::string val;
+    int64_t db_id = i;
+    int64_t txn_id = 100000 + i;
+    int64_t sub_txn_id = 200000 + i;
+    int64_t current_time = duration_cast<std::chrono::milliseconds>(
+                                   
std::chrono::system_clock::now().time_since_epoch())
+                                   .count();
+
+    // RecycleTxnKeyInfo -> RecycleTxnPB
+    RecycleTxnKeyInfo recycle_txn_key_info {instance_id, db_id, txn_id};
+    recycle_txn_key(recycle_txn_key_info, &key);
+    RecycleTxnPB recycle_txn_pb;
+    if (i < 8000) {
+        recycle_txn_pb.set_creation_time(current_time - 4 * 24 * 3600 * 1000L);
+    } else {
+        recycle_txn_pb.set_creation_time(current_time);
+    }
+    recycle_txn_pb.set_label("recycle_txn_key_info_label_" + 
std::to_string(i));
+    if (!recycle_txn_pb.SerializeToString(&val)) {
+        LOG_WARNING("failed to serialize recycle txn info")
+                .tag("key", hex(key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return -1;
+    }
+    LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", 
instance_id, db_id,
+                             recycle_txn_pb.label(), hex(key));
+    txn->put(key, val);
+
+    // TxnIndexKey -> TxnIndexPB
+    key.clear();
+    val.clear();
+    key = txn_index_key({instance_id, txn_id});
+    TxnIndexPB txn_index_pb;
+    if (!txn_index_pb.SerializeToString(&val)) {
+        LOG_WARNING("failed to serialize txn index")
+                .tag("key", hex(key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return -1;
+    }
+    txn->put(key, val);
+
+    // TxnInfoKey -> TxnInfoPB
+    key = txn_info_key({instance_id, db_id, txn_id});
+    TxnInfoPB txn_info_pb;
+    txn_info_pb.add_sub_txn_ids(sub_txn_id);
+    txn_info_pb.set_label("txn_info_label_" + std::to_string(i));
+    if (!txn_info_pb.SerializeToString(&val)) {
+        LOG_WARNING("failed to serialize txn info")
+                .tag("key", hex(key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return -1;
+    }
+    txn->put(key, val);
+
+    // SubTxnIndex -> TxnIndexPB
+    key.clear();
+    val.clear();
+    key = txn_index_key({instance_id, sub_txn_id});
+    TxnIndexPB sub_txn_index_pb;
+    if (!sub_txn_index_pb.SerializeToString(&val)) {
+        LOG_WARNING("failed to serialize sub txn index")
+                .tag("key", hex(key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return -1;
+    }
+    txn->put(key, val);
+
+    // TxnLabel -> TxnLabelPB
+    key.clear();
+    val.clear();
+    txn_label_key({instance_id, db_id, txn_info_pb.label()}, &key);
+    TxnLabelPB txn_label_pb;
+    txn_label_pb.add_txn_ids(txn_id);
+    LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", 
instance_id, db_id,
+                             txn_info_pb.label(), hex(key));
+    if (!txn_label_pb.SerializeToString(&val)) {
+        LOG_WARNING("failed to serialize txn label")
+                .tag("key", hex(key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return -1;
+    }
+    MemTxnKv::gen_version_timestamp(123456790, 0, &val);
+    txn->put(key, val);
+
+    return 0;
+}
+
+void put_kv(std::shared_ptr<cloud::TxnKv> txn_kv) {
+    using namespace doris::cloud;
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+    for (int64_t i = 0; i < 5000; i++) {
+        put_single_kv(txn, i);
+    }
+    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);

Review Comment:
   this commit may fail if we are using fdb, due to it is a 5000-kv  large txn.



##########
cloud/test/recycler_test.cpp:
##########
@@ -4051,4 +4055,232 @@ TEST(RecyclerTest, 
delete_tmp_rowset_without_resource_id) {
     }
 }
 
+static std::string generate_random_string(int length) {
+    std::string char_set = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+    std::random_device rd;
+    std::mt19937 generator(rd());
+    std::uniform_int_distribution<int> distribution(0, char_set.length() - 1);
+
+    std::string randomString;
+    for (int i = 0; i < length; ++i) {
+        randomString += char_set[distribution(generator)];
+    }
+    return randomString;
+}
+
+std::string instance_id = "concurrent_recycle_txn_label_test_" + 
generate_random_string(10);
+
+int put_single_kv(const std::unique_ptr<Transaction>& txn, int64_t i) {
+    std::string key;
+    std::string val;
+    int64_t db_id = i;
+    int64_t txn_id = 100000 + i;
+    int64_t sub_txn_id = 200000 + i;
+    int64_t current_time = duration_cast<std::chrono::milliseconds>(
+                                   
std::chrono::system_clock::now().time_since_epoch())
+                                   .count();
+
+    // RecycleTxnKeyInfo -> RecycleTxnPB
+    RecycleTxnKeyInfo recycle_txn_key_info {instance_id, db_id, txn_id};
+    recycle_txn_key(recycle_txn_key_info, &key);
+    RecycleTxnPB recycle_txn_pb;
+    if (i < 8000) {
+        recycle_txn_pb.set_creation_time(current_time - 4 * 24 * 3600 * 1000L);
+    } else {
+        recycle_txn_pb.set_creation_time(current_time);
+    }
+    recycle_txn_pb.set_label("recycle_txn_key_info_label_" + 
std::to_string(i));
+    if (!recycle_txn_pb.SerializeToString(&val)) {
+        LOG_WARNING("failed to serialize recycle txn info")
+                .tag("key", hex(key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return -1;
+    }
+    LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", 
instance_id, db_id,
+                             recycle_txn_pb.label(), hex(key));
+    txn->put(key, val);
+
+    // TxnIndexKey -> TxnIndexPB
+    key.clear();
+    val.clear();
+    key = txn_index_key({instance_id, txn_id});
+    TxnIndexPB txn_index_pb;
+    if (!txn_index_pb.SerializeToString(&val)) {
+        LOG_WARNING("failed to serialize txn index")
+                .tag("key", hex(key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return -1;
+    }
+    txn->put(key, val);
+
+    // TxnInfoKey -> TxnInfoPB
+    key = txn_info_key({instance_id, db_id, txn_id});
+    TxnInfoPB txn_info_pb;
+    txn_info_pb.add_sub_txn_ids(sub_txn_id);
+    txn_info_pb.set_label("txn_info_label_" + std::to_string(i));
+    if (!txn_info_pb.SerializeToString(&val)) {
+        LOG_WARNING("failed to serialize txn info")
+                .tag("key", hex(key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return -1;
+    }
+    txn->put(key, val);
+
+    // SubTxnIndex -> TxnIndexPB
+    key.clear();
+    val.clear();
+    key = txn_index_key({instance_id, sub_txn_id});
+    TxnIndexPB sub_txn_index_pb;
+    if (!sub_txn_index_pb.SerializeToString(&val)) {
+        LOG_WARNING("failed to serialize sub txn index")
+                .tag("key", hex(key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return -1;
+    }
+    txn->put(key, val);
+
+    // TxnLabel -> TxnLabelPB
+    key.clear();
+    val.clear();
+    txn_label_key({instance_id, db_id, txn_info_pb.label()}, &key);
+    TxnLabelPB txn_label_pb;
+    txn_label_pb.add_txn_ids(txn_id);
+    LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", 
instance_id, db_id,
+                             txn_info_pb.label(), hex(key));
+    if (!txn_label_pb.SerializeToString(&val)) {
+        LOG_WARNING("failed to serialize txn label")
+                .tag("key", hex(key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return -1;
+    }
+    MemTxnKv::gen_version_timestamp(123456790, 0, &val);
+    txn->put(key, val);
+
+    return 0;
+}
+
+void put_kv(std::shared_ptr<cloud::TxnKv> txn_kv) {

Review Comment:
   add comment what it does
   and suggest naming is  `make_multiple_txn_info_kvs`



##########
cloud/test/recycler_test.cpp:
##########
@@ -4051,4 +4055,232 @@ TEST(RecyclerTest, 
delete_tmp_rowset_without_resource_id) {
     }
 }
 
+static std::string generate_random_string(int length) {
+    std::string char_set = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+    std::random_device rd;
+    std::mt19937 generator(rd());
+    std::uniform_int_distribution<int> distribution(0, char_set.length() - 1);
+
+    std::string randomString;
+    for (int i = 0; i < length; ++i) {
+        randomString += char_set[distribution(generator)];
+    }
+    return randomString;
+}
+
+std::string instance_id = "concurrent_recycle_txn_label_test_" + 
generate_random_string(10);
+
+int put_single_kv(const std::unique_ptr<Transaction>& txn, int64_t i) {
+    std::string key;
+    std::string val;
+    int64_t db_id = i;
+    int64_t txn_id = 100000 + i;
+    int64_t sub_txn_id = 200000 + i;
+    int64_t current_time = duration_cast<std::chrono::milliseconds>(
+                                   
std::chrono::system_clock::now().time_since_epoch())
+                                   .count();
+
+    // RecycleTxnKeyInfo -> RecycleTxnPB
+    RecycleTxnKeyInfo recycle_txn_key_info {instance_id, db_id, txn_id};
+    recycle_txn_key(recycle_txn_key_info, &key);
+    RecycleTxnPB recycle_txn_pb;
+    if (i < 8000) {
+        recycle_txn_pb.set_creation_time(current_time - 4 * 24 * 3600 * 1000L);
+    } else {
+        recycle_txn_pb.set_creation_time(current_time);
+    }
+    recycle_txn_pb.set_label("recycle_txn_key_info_label_" + 
std::to_string(i));
+    if (!recycle_txn_pb.SerializeToString(&val)) {
+        LOG_WARNING("failed to serialize recycle txn info")
+                .tag("key", hex(key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return -1;
+    }
+    LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", 
instance_id, db_id,
+                             recycle_txn_pb.label(), hex(key));
+    txn->put(key, val);
+
+    // TxnIndexKey -> TxnIndexPB
+    key.clear();
+    val.clear();
+    key = txn_index_key({instance_id, txn_id});
+    TxnIndexPB txn_index_pb;
+    if (!txn_index_pb.SerializeToString(&val)) {
+        LOG_WARNING("failed to serialize txn index")
+                .tag("key", hex(key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return -1;
+    }
+    txn->put(key, val);
+
+    // TxnInfoKey -> TxnInfoPB
+    key = txn_info_key({instance_id, db_id, txn_id});
+    TxnInfoPB txn_info_pb;
+    txn_info_pb.add_sub_txn_ids(sub_txn_id);
+    txn_info_pb.set_label("txn_info_label_" + std::to_string(i));
+    if (!txn_info_pb.SerializeToString(&val)) {
+        LOG_WARNING("failed to serialize txn info")
+                .tag("key", hex(key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return -1;
+    }
+    txn->put(key, val);
+
+    // SubTxnIndex -> TxnIndexPB
+    key.clear();
+    val.clear();
+    key = txn_index_key({instance_id, sub_txn_id});
+    TxnIndexPB sub_txn_index_pb;
+    if (!sub_txn_index_pb.SerializeToString(&val)) {
+        LOG_WARNING("failed to serialize sub txn index")
+                .tag("key", hex(key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return -1;
+    }
+    txn->put(key, val);
+
+    // TxnLabel -> TxnLabelPB
+    key.clear();
+    val.clear();
+    txn_label_key({instance_id, db_id, txn_info_pb.label()}, &key);
+    TxnLabelPB txn_label_pb;
+    txn_label_pb.add_txn_ids(txn_id);
+    LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", 
instance_id, db_id,
+                             txn_info_pb.label(), hex(key));
+    if (!txn_label_pb.SerializeToString(&val)) {
+        LOG_WARNING("failed to serialize txn label")
+                .tag("key", hex(key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return -1;
+    }
+    MemTxnKv::gen_version_timestamp(123456790, 0, &val);
+    txn->put(key, val);
+
+    return 0;
+}
+
+void put_kv(std::shared_ptr<cloud::TxnKv> txn_kv) {
+    using namespace doris::cloud;
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+    for (int64_t i = 0; i < 5000; i++) {
+        put_single_kv(txn, i);
+    }
+    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+    for (int64_t i = 5000; i < 10000; i++) {
+        put_single_kv(txn, i);
+    }
+    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+}
+
+void check_single_kv(const std::string_view& k, const std::string_view& v,

Review Comment:
   add comment what it does
   and suggest naming is  `check_single_txn_info_kvs`



##########
cloud/test/recycler_test.cpp:
##########
@@ -4051,4 +4055,232 @@ TEST(RecyclerTest, 
delete_tmp_rowset_without_resource_id) {
     }
 }
 
+static std::string generate_random_string(int length) {
+    std::string char_set = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+    std::random_device rd;
+    std::mt19937 generator(rd());
+    std::uniform_int_distribution<int> distribution(0, char_set.length() - 1);
+
+    std::string randomString;
+    for (int i = 0; i < length; ++i) {
+        randomString += char_set[distribution(generator)];
+    }
+    return randomString;
+}
+
+std::string instance_id = "concurrent_recycle_txn_label_test_" + 
generate_random_string(10);
+
+int put_single_kv(const std::unique_ptr<Transaction>& txn, int64_t i) {
+    std::string key;
+    std::string val;
+    int64_t db_id = i;
+    int64_t txn_id = 100000 + i;
+    int64_t sub_txn_id = 200000 + i;
+    int64_t current_time = duration_cast<std::chrono::milliseconds>(
+                                   
std::chrono::system_clock::now().time_since_epoch())
+                                   .count();
+
+    // RecycleTxnKeyInfo -> RecycleTxnPB
+    RecycleTxnKeyInfo recycle_txn_key_info {instance_id, db_id, txn_id};
+    recycle_txn_key(recycle_txn_key_info, &key);
+    RecycleTxnPB recycle_txn_pb;
+    if (i < 8000) {
+        recycle_txn_pb.set_creation_time(current_time - 4 * 24 * 3600 * 1000L);
+    } else {
+        recycle_txn_pb.set_creation_time(current_time);
+    }
+    recycle_txn_pb.set_label("recycle_txn_key_info_label_" + 
std::to_string(i));
+    if (!recycle_txn_pb.SerializeToString(&val)) {
+        LOG_WARNING("failed to serialize recycle txn info")
+                .tag("key", hex(key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return -1;
+    }
+    LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", 
instance_id, db_id,
+                             recycle_txn_pb.label(), hex(key));
+    txn->put(key, val);
+
+    // TxnIndexKey -> TxnIndexPB
+    key.clear();
+    val.clear();
+    key = txn_index_key({instance_id, txn_id});
+    TxnIndexPB txn_index_pb;
+    if (!txn_index_pb.SerializeToString(&val)) {
+        LOG_WARNING("failed to serialize txn index")
+                .tag("key", hex(key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return -1;
+    }
+    txn->put(key, val);
+
+    // TxnInfoKey -> TxnInfoPB
+    key = txn_info_key({instance_id, db_id, txn_id});
+    TxnInfoPB txn_info_pb;
+    txn_info_pb.add_sub_txn_ids(sub_txn_id);
+    txn_info_pb.set_label("txn_info_label_" + std::to_string(i));
+    if (!txn_info_pb.SerializeToString(&val)) {
+        LOG_WARNING("failed to serialize txn info")
+                .tag("key", hex(key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return -1;
+    }
+    txn->put(key, val);
+
+    // SubTxnIndex -> TxnIndexPB
+    key.clear();
+    val.clear();
+    key = txn_index_key({instance_id, sub_txn_id});
+    TxnIndexPB sub_txn_index_pb;
+    if (!sub_txn_index_pb.SerializeToString(&val)) {
+        LOG_WARNING("failed to serialize sub txn index")
+                .tag("key", hex(key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return -1;
+    }
+    txn->put(key, val);
+
+    // TxnLabel -> TxnLabelPB
+    key.clear();
+    val.clear();
+    txn_label_key({instance_id, db_id, txn_info_pb.label()}, &key);
+    TxnLabelPB txn_label_pb;
+    txn_label_pb.add_txn_ids(txn_id);
+    LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", 
instance_id, db_id,
+                             txn_info_pb.label(), hex(key));
+    if (!txn_label_pb.SerializeToString(&val)) {
+        LOG_WARNING("failed to serialize txn label")
+                .tag("key", hex(key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return -1;
+    }
+    MemTxnKv::gen_version_timestamp(123456790, 0, &val);
+    txn->put(key, val);
+
+    return 0;
+}
+
+void put_kv(std::shared_ptr<cloud::TxnKv> txn_kv) {
+    using namespace doris::cloud;
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+    for (int64_t i = 0; i < 5000; i++) {
+        put_single_kv(txn, i);
+    }
+    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+    for (int64_t i = 5000; i < 10000; i++) {
+        put_single_kv(txn, i);
+    }
+    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+}
+
+void check_single_kv(const std::string_view& k, const std::string_view& v,
+                     std::shared_ptr<cloud::TxnKv> txn_kv) {
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+    // check RecycleTxnInfo
+    RecycleTxnPB recycle_txn_pb;
+    ASSERT_TRUE(recycle_txn_pb.ParseFromArray(v.data(), v.size()));
+    std::string_view k1 = k;
+
+    // check TxnIndex
+    std::string index_key, index_value;
+    k1.remove_prefix(1); // Remove key space
+    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+    ASSERT_EQ(decode_key(&k1, &out), 0);
+    int64_t db_id = std::get<int64_t>(std::get<0>(out[3]));
+    int64_t txn_id = std::get<int64_t>(std::get<0>(out[4]));
+    index_key = txn_index_key({instance_id, txn_id});
+    ASSERT_EQ(txn->get(index_key, &index_value), TxnErrorCode::TXN_OK);
+
+    // check TxnInfo
+    std::string info_key, info_val;
+    txn_info_key({instance_id, db_id, txn_id}, &info_key);
+    ASSERT_EQ(txn->get(info_key, &info_val), TxnErrorCode::TXN_OK);
+
+    // check SubTxnIndex
+    TxnInfoPB txn_info;
+    ASSERT_TRUE(txn_info.ParseFromString(info_val));
+    std::vector<std::string> sub_txn_index_keys;
+    std::string sub_txn_index_value;
+    for (auto sub_txn_id : txn_info.sub_txn_ids()) {
+        auto sub_txn_index_key = txn_index_key({instance_id, sub_txn_id});
+        sub_txn_index_keys.push_back(sub_txn_index_key);
+    }
+    for (auto& sub_txn_index_key : sub_txn_index_keys) {
+        ASSERT_EQ(txn->get(sub_txn_index_key, &sub_txn_index_value), 
TxnErrorCode::TXN_OK);
+    }
+
+    // check TxnLabel
+    std::string label_key, label_val;
+    txn_label_key({instance_id, db_id, txn_info.label()}, &label_key);
+    ASSERT_EQ(txn->get(label_key, &label_val), TxnErrorCode::TXN_OK)
+            << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", 
instance_id, db_id,
+                           txn_info.label(), hex(label_key));
+}
+
+void check_kv(std::shared_ptr<cloud::TxnKv> txn_kv, int64_t size) {

Review Comment:
   add comment what it does
   and suggest naming is  `check_multiple_txn_info_kvs`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to