This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ebb8d52668c [fix](recycler) Concurrent recycle cause txn commit 
conflict (#54849)
ebb8d52668c is described below

commit ebb8d52668c2da23c69f40bb57d0351021d6f8bb
Author: Uniqueyou <[email protected]>
AuthorDate: Sat Aug 16 16:08:59 2025 +0800

    [fix](recycler) Concurrent recycle cause txn commit conflict (#54849)
    
    ### What problem does this PR solve?
    
    retry when txn conflict, 10 max retry times
    
    before
    ```
    W20250815 11:37:43.104856 448779 recycler.cpp:3925] failed to delete 
expired txn, err=Conflict 
key=011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f5a71323835444d37643600011074786e00011200000000000003e81200000000000007d4
    W20250815 11:37:43.104873 448783 recycler.cpp:3925] failed to delete 
expired txn, err=Conflict 
key=011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f5a71323835444d37643600011074786e00011200000000000003e81200000000000007d8
    W20250815 11:37:43.104945 448784 mem_txn_kv.cpp:200] commit conflict
    I20250815 11:37:43.104902 448777 recycler.cpp:3932] recycle expired txn, 
key=011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f5a71323835444d37643600011074786e00011200000000000003e81200000000000007d1
    W20250815 11:37:43.104959 448782 recycler.cpp:3925] failed to delete 
expired txn, err=Conflict 
key=011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f5a71323835444d37643600011074786e00011200000000000003e81200000000000007d7
    W20250815 11:37:43.104981 448779 recycler.cpp:3945] failed to delete 
recycle txn kv instance id="concurrent_recycle_txn_label_test_Zq285DM7d6" 
key="011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f5a71323835444d37643600011074786e00011200000000000003e81200000000000007d4"
    W20250815 11:37:43.104983 448783 recycler.cpp:3945] failed to delete 
recycle txn kv instance id="concurrent_recycle_txn_label_test_Zq285DM7d6" 
key="011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f5a71323835444d37643600011074786e00011200000000000003e81200000000000007d8"
    ```
    
    now
    ```
    W20250815 11:56:22.749135 595891 recycler.cpp:3925] txn conflict, retry 
times=6 
key=011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f3353706c496c4742534c00011074786e00011200000000000003e81200000000000007d7
 db_id=1000 txn_id=2007
    I20250815 11:56:22.896646 595892 recycler.cpp:3942] recycle expired txn, 
key=011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f3353706c496c4742534c00011074786e00011200000000000003e81200000000000007d8
    W20250815 11:56:22.942795 595891 mem_txn_kv.cpp:200] commit conflict
    W20250815 11:56:22.942880 595891 recycler.cpp:3925] txn conflict, retry 
times=7 
key=011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f3353706c496c4742534c00011074786e00011200000000000003e81200000000000007d7
 db_id=1000 txn_id=2007
    I20250815 11:56:23.079614 595891 recycler.cpp:3942] recycle expired txn, 
key=011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f3353706c496c4742534c00011074786e00011200000000000003e81200000000000007d7
    I20250815 11:56:23.079846 595741 recycler.cpp:3578] finish scan_and_recycle 
key_range=[011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f3353706c496c4742534c00011074786e0001120000000000000000120000000000000000,011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f3353706c496c4742534c00011074786e0001127fffffffffffffff127fffffffffffffff)
 num_scanned=10 get_range_retried=0 ret=0 err=
    I20250815 11:56:23.080000 595741 recycler.h:384] recycle instance: 
concurrent_recycle_txn_label_test_3SplIlGBSL, operation type: 
recycle_expired_txn_label, cost: 1385 ms, total recycled num: 10, total 
recycled data size: 0 bytes
    ```
---
 cloud/src/recycler/recycler.cpp |  28 ++++++-
 cloud/test/recycler_test.cpp    | 176 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 203 insertions(+), 1 deletion(-)

diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 89ce765b0d8..324165e139a 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -30,6 +30,7 @@
 #include <chrono>
 #include <cstddef>
 #include <cstdint>
+#include <cstdlib>
 #include <deque>
 #include <initializer_list>
 #include <numeric>
@@ -3831,6 +3832,7 @@ int InstanceRecycler::recycle_expired_txn_label() {
         return 0;
     };
 
+    // int 0 for success, 1 for conflict, -1 for error
     auto delete_recycle_txn_kv = [&](const std::string& k) -> int {
         std::string_view k1 = k;
         //RecycleTxnKeyInfo 0:instance_id  1:db_id  2:txn_id
@@ -3896,17 +3898,29 @@ int InstanceRecycler::recycle_expired_txn_label() {
         }
         if (txn_label.txn_ids().empty()) {
             txn->remove(label_key);
+            TEST_SYNC_POINT_CALLBACK(
+                    
"InstanceRecycler::recycle_expired_txn_label.remove_label_before");
         } else {
             if (!txn_label.SerializeToString(&label_val)) {
                 LOG(WARNING) << "failed to serialize txn label, key=" << 
hex(label_key);
                 return -1;
             }
+            TEST_SYNC_POINT_CALLBACK(
+                    
"InstanceRecycler::recycle_expired_txn_label.update_label_before");
             txn->atomic_set_ver_value(label_key, label_val);
+            TEST_SYNC_POINT_CALLBACK(
+                    
"InstanceRecycler::recycle_expired_txn_label.update_label_after");
         }
         // Remove recycle txn kv
         txn->remove(k);
+        
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::recycle_expired_txn_label.before_commit");
         err = txn->commit();
         if (err != TxnErrorCode::TXN_OK) {
+            if (err == TxnErrorCode::TXN_CONFLICT) {
+                TEST_SYNC_POINT_CALLBACK(
+                        
"InstanceRecycler::recycle_expired_txn_label.txn_conflict");
+                return 1;
+            }
             LOG(WARNING) << "failed to delete expired txn, err=" << err << " 
key=" << hex(k);
             return -1;
         }
@@ -3926,7 +3940,19 @@ int InstanceRecycler::recycle_expired_txn_label() {
                 &recycle_txn_info_keys);
         for (const auto& k : recycle_txn_info_keys) {
             concurrent_delete_executor.add([&]() {
-                if (delete_recycle_txn_kv(k) != 0) {
+                int ret = delete_recycle_txn_kv(k);
+                if (ret == 1) {
+                    constexpr int MAX_RETRY = 10;
+                    for (size_t i = 0; i < MAX_RETRY; ++i) {
+                        ret = delete_recycle_txn_kv(k);
+                        LOG(WARNING) << "txn conflict, retry times=" << i << " 
key=" << hex(k);
+                        if (ret != 1) {
+                            break;
+                        }
+                        // random sleep 0-100 ms to retry
+                        
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
+                    }
+                } else if (ret == -1) {
                     LOG_WARNING("failed to delete recycle txn kv")
                             .tag("instance id", instance_id_)
                             .tag("key", hex(k));
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 8a3e4ff0c71..5fc1b673dd1 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -6002,4 +6002,180 @@ TEST(RecyclerTest, 
concurrent_recycle_txn_label_failure_test) {
               << "ms" << std::endl;
     check_multiple_txn_info_kvs(txn_kv, 5000);
 }
+TEST(RecyclerTest, concurrent_recycle_txn_label_conflict_test) {
+    config::label_keep_max_second = 0;
+    config::recycle_pool_parallelism = 20;
+
+    doris::cloud::RecyclerThreadPoolGroup recycle_txn_label_thread_group;
+    auto recycle_txn_label_s3_producer_pool =
+            
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+    recycle_txn_label_s3_producer_pool->start();
+    auto recycle_txn_label_recycle_tablet_pool =
+            
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+    recycle_txn_label_recycle_tablet_pool->start();
+    auto recycle_txn_label_group_recycle_function_pool =
+            
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+    recycle_txn_label_group_recycle_function_pool->start();
+    recycle_txn_label_thread_group =
+            
RecyclerThreadPoolGroup(std::move(recycle_txn_label_s3_producer_pool),
+                                    
std::move(recycle_txn_label_recycle_tablet_pool),
+                                    
std::move(recycle_txn_label_group_recycle_function_pool));
+
+    auto mem_txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(mem_txn_kv->init(), 0);
+
+    std::string shared_label = "shared_conflict_label";
+    int64_t shared_db_id = 1000;
+    std::vector<int64_t> shared_txn_ids = {2001, 2002, 2003, 2004, 2005,
+                                           2006, 2007, 2008, 2009, 2010};
+
+    // create shared TxnLabel
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(mem_txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+        std::string label_key;
+        std::string label_val;
+        txn_label_key({instance_id, shared_db_id, shared_label}, &label_key);
+
+        TxnLabelPB txn_label_pb;
+        for (auto txn_id : shared_txn_ids) {
+            txn_label_pb.add_txn_ids(txn_id);
+        }
+
+        if (!txn_label_pb.SerializeToString(&label_val)) {
+            FAIL() << "Failed to serialize txn label";
+        }
+
+        uint32_t offset = label_val.size();
+        label_val.append(10, '\x00'); // 10 bytes for versionstamp
+        label_val.append((const char*)&offset, 4);
+        MemTxnKv::gen_version_timestamp(123456790, 0, &label_val);
+        txn->put(label_key, label_val);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    int64_t current_time = duration_cast<std::chrono::milliseconds>(
+                                   
std::chrono::system_clock::now().time_since_epoch())
+                                   .count();
+
+    for (auto txn_id : shared_txn_ids) {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(mem_txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+        // RecycleTxnKeyInfo -> RecycleTxnPB (set to expired)
+        std::string recycle_txn_info_key;
+        std::string recycle_txn_info_val;
+        RecycleTxnKeyInfo recycle_txn_key_info {instance_id, shared_db_id, 
txn_id};
+        recycle_txn_key(recycle_txn_key_info, &recycle_txn_info_key);
+        RecycleTxnPB recycle_txn_pb;
+        recycle_txn_pb.set_creation_time(current_time - 300000);
+        recycle_txn_pb.set_label(shared_label);
+        if (!recycle_txn_pb.SerializeToString(&recycle_txn_info_val)) {
+            FAIL() << "Failed to serialize recycle txn";
+        }
+        txn->put(recycle_txn_info_key, recycle_txn_info_val);
+
+        // TxnIndexKey -> TxnIndexPB
+        std::string txn_idx_key = txn_index_key({instance_id, txn_id});
+        std::string txn_idx_val;
+        TxnIndexPB txn_index_pb;
+        if (!txn_index_pb.SerializeToString(&txn_idx_val)) {
+            FAIL() << "Failed to serialize txn index";
+        }
+        txn->put(txn_idx_key, txn_idx_val);
+
+        // TxnInfoKey -> TxnInfoPB
+        std::string info_key = txn_info_key({instance_id, shared_db_id, 
txn_id});
+        std::string info_val;
+        TxnInfoPB txn_info_pb;
+        txn_info_pb.set_label(shared_label);
+        if (!txn_info_pb.SerializeToString(&info_val)) {
+            FAIL() << "Failed to serialize txn info";
+        }
+        txn->put(info_key, info_val);
+
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    auto* sp = SyncPoint::get_instance();
+    DORIS_CLOUD_DEFER {
+        SyncPoint::get_instance()->clear_all_call_backs();
+    };
+
+    std::atomic<int> update_label_before_count {0};
+    std::atomic<int> remove_label_before_count {0};
+    std::atomic<int> update_label_after_count {0};
+    std::atomic<int> txn_conflict_count {0};
+
+    
sp->set_call_back("InstanceRecycler::recycle_expired_txn_label.remove_label_before",
+                      [&](auto&& args) {
+                          remove_label_before_count++;
+                          
std::this_thread::sleep_for(std::chrono::milliseconds(60));
+                      });
+
+    
sp->set_call_back("InstanceRecycler::recycle_expired_txn_label.update_label_before",
+                      [&](auto&& args) {
+                          update_label_before_count++;
+                          
std::this_thread::sleep_for(std::chrono::milliseconds(80));
+                      });
+
+    
sp->set_call_back("InstanceRecycler::recycle_expired_txn_label.update_label_after",
+                      [&](auto&& args) { update_label_after_count++; });
+
+    sp->set_call_back(
+            "InstanceRecycler::recycle_expired_txn_label.before_commit",
+            [&](auto&& args) { 
std::this_thread::sleep_for(std::chrono::milliseconds(20)); });
+
+    
sp->set_call_back("InstanceRecycler::recycle_expired_txn_label.txn_conflict", 
[&](auto&& args) {
+        txn_conflict_count++;
+        LOG(WARNING) << "Transaction conflict detected in test";
+    });
+
+    sp->enable_processing();
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(instance_id);
+    InstanceRecycler recycler(mem_txn_kv, instance, 
recycle_txn_label_thread_group,
+                              std::make_shared<TxnLazyCommitter>(mem_txn_kv));
+    ASSERT_EQ(recycler.init(), 0);
+
+    auto start = std::chrono::steady_clock::now();
+    ASSERT_EQ(recycler.recycle_expired_txn_label(), 0);
+    auto finish = std::chrono::steady_clock::now();
+
+    std::cout << "Concurrent recycle cost="
+              << std::chrono::duration_cast<std::chrono::milliseconds>(finish 
- start).count()
+              << "ms" << std::endl;
+    std::cout << "Update label before count: " << update_label_before_count << 
std::endl;
+    std::cout << "Update label after count: " << update_label_after_count << 
std::endl;
+    std::cout << "Transaction conflict count: " << txn_conflict_count << 
std::endl;
+
+    EXPECT_GT(txn_conflict_count, 0) << "txn_conflict sync point should be 
triggered";
+
+    std::unique_ptr<Transaction> verify_txn;
+    ASSERT_EQ(mem_txn_kv->create_txn(&verify_txn), TxnErrorCode::TXN_OK);
+
+    RecycleTxnKeyInfo recycle_txn_key_info0 {instance_id, 0, 0};
+    RecycleTxnKeyInfo recycle_txn_key_info1 {instance_id, INT64_MAX, 
INT64_MAX};
+    std::string begin_key = recycle_txn_key(recycle_txn_key_info0);
+    std::string end_key = recycle_txn_key(recycle_txn_key_info1);
+
+    std::unique_ptr<RangeGetIterator> it;
+    ASSERT_EQ(verify_txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
+    EXPECT_EQ(it->size(), 0) << "All recycle txn keys should be deleted";
+
+    std::string label_key;
+    std::string label_val;
+    txn_label_key({instance_id, shared_db_id, shared_label}, &label_key);
+    EXPECT_EQ(verify_txn->get(label_key, &label_val), 
TxnErrorCode::TXN_KEY_NOT_FOUND)
+            << "Shared label should be deleted";
+
+    for (auto txn_id : shared_txn_ids) {
+        std::string info_key = txn_info_key({instance_id, shared_db_id, 
txn_id});
+        std::string info_val;
+        EXPECT_EQ(verify_txn->get(info_key, &info_val), 
TxnErrorCode::TXN_KEY_NOT_FOUND)
+                << "TxnInfo for txn_id " << txn_id << " should be deleted";
+    }
+}
 } // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to