This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new c09afbbf2a7 branch-4.0: [fix](txn) Committed transaction should not be 
aborted #59850 (#60000)
c09afbbf2a7 is described below

commit c09afbbf2a71cad207a4b42f505daef12c8c7e33
Author: Yixuan Wang <[email protected]>
AuthorDate: Mon Jan 19 11:31:09 2026 +0800

    branch-4.0: [fix](txn) Committed transaction should not be aborted #59850 
(#60000)
    
    pick: https://github.com/apache/doris/pull/59850
---
 cloud/src/meta-service/meta_service_txn.cpp   |  8 +++
 cloud/src/meta-service/txn_lazy_committer.cpp |  7 +++
 cloud/test/txn_lazy_commit_test.cpp           | 88 +++++++++++++++++++++++++++
 gensrc/proto/cloud.proto                      |  1 +
 4 files changed, 104 insertions(+)

diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index 9b1cfc49836..e7f76f2b7ff 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -2391,6 +2391,8 @@ void MetaServiceImpl::commit_txn_eventually(
             return;
         }
 
+        
TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::abort_txn_after_mark_txn_commited");
+
         
TEST_SYNC_POINT_RETURN_WITH_VOID("commit_txn_eventually::txn_lazy_committer_submit",
                                          &txn_id);
         std::shared_ptr<TxnLazyCommitTask> task = 
txn_lazy_committer_->submit(instance_id, txn_id);
@@ -3134,6 +3136,12 @@ void _abort_txn(const std::string& instance_id, const 
AbortTxnRequest* request,
             msg = ss.str();
             return;
         }
+        if (return_txn_info.status() == TxnStatusPB::TXN_STATUS_COMMITTED) {
+            code = MetaServiceCode::TXN_ALREADY_COMMITED;
+            ss << "transaction [" << txn_id << "] is already COMMITED, db_id=" 
<< db_id;
+            msg = ss.str();
+            return;
+        }
     } else {
         VLOG_DEBUG << "abort_txn db_id and label, db_id=" << db_id << " 
label=" << label;
         //abort txn by label.
diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp 
b/cloud/src/meta-service/txn_lazy_committer.cpp
index 2556630e964..920b71f8d96 100644
--- a/cloud/src/meta-service/txn_lazy_committer.cpp
+++ b/cloud/src/meta-service/txn_lazy_committer.cpp
@@ -641,6 +641,13 @@ void TxnLazyCommitTask::commit() {
         return;
     }
 
+    if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
+        // The txn has been aborted, no need to commit again.
+        code_ = MetaServiceCode::TXN_ALREADY_ABORTED;
+        LOG(INFO) << "txn_id=" << txn_id_ << " is already aborted, skip 
commit";
+        return;
+    }
+
     bool is_versioned_write = txn_info.versioned_write();
     bool is_versioned_read = txn_info.versioned_read();
     CloneChainReader meta_reader(instance_id_, txn_kv_.get(),
diff --git a/cloud/test/txn_lazy_commit_test.cpp 
b/cloud/test/txn_lazy_commit_test.cpp
index 7899ea83ebb..1b3125970e3 100644
--- a/cloud/test/txn_lazy_commit_test.cpp
+++ b/cloud/test/txn_lazy_commit_test.cpp
@@ -3248,4 +3248,92 @@ TEST(TxnLazyCommitTest, 
CommitTxnEventuallyWithSchemaChangeTest) {
     sp->disable_processing();
 }
 
+TEST(TxnLazyCommitTest, CommitTxnEventuallyWithAbortAfterCommitTest) {
+    auto txn_kv = get_mem_txn_kv();
+    int64_t db_id = 55432134;
+    int64_t table_id = 326843;
+    int64_t index_id = 34345678;
+    int64_t partition_id = 212343;
+
+    auto meta_service = get_meta_service(txn_kv, true);
+    brpc::Controller cntl;
+    BeginTxnRequest req;
+    req.set_cloud_unique_id("test_cloud_unique_id");
+    TxnInfoPB txn_info_pb;
+    txn_info_pb.set_db_id(db_id);
+    txn_info_pb.set_label("test_label_commit_txn_eventually");
+    txn_info_pb.add_table_ids(table_id);
+    txn_info_pb.set_timeout_ms(36000);
+    req.mutable_txn_info()->CopyFrom(txn_info_pb);
+    BeginTxnResponse res;
+    
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req, &res,
+                            nullptr);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    int64_t txn_id = res.txn_id();
+    auto sp = SyncPoint::get_instance();
+    DORIS_CLOUD_DEFER {
+        SyncPoint::get_instance()->clear_all_call_backs();
+    };
+    sp->enable_processing();
+    
sp->set_call_back("commit_txn_eventually::abort_txn_after_mark_txn_commited", 
[&](auto&&) {
+        AbortTxnRequest req;
+        AbortTxnResponse res;
+        req.set_txn_id(txn_id);
+        req.set_cloud_unique_id("test_cloud_unique_id");
+        
meta_service->abort_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req,
+                                &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::TXN_ALREADY_COMMITED);
+    });
+
+    // mock rowset and tablet
+    int64_t tablet_id_base = 372323;
+    for (int i = 0; i < 2001; ++i) {
+        create_tablet_without_db_id(meta_service.get(), table_id, index_id, 
partition_id,
+                                    tablet_id_base + i);
+        auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, 
partition_id);
+        CreateRowsetResponse res;
+        prepare_rowset(meta_service.get(), tmp_rowset, res);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        commit_rowset(meta_service.get(), tmp_rowset, res);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    }
+
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        for (int i = 0; i < 2001; ++i) {
+            int64_t tablet_id = tablet_id_base + i;
+            check_tmp_rowset_exist(txn, tablet_id, txn_id);
+        }
+    }
+
+    {
+        brpc::Controller cntl;
+        CommitTxnRequest req;
+        req.set_cloud_unique_id("test_cloud_unique_id");
+        req.set_db_id(db_id);
+        req.set_txn_id(txn_id);
+        req.set_is_2pc(false);
+        req.set_enable_txn_lazy_commit(true);
+        for (int i = 0; i < 2001; ++i) {
+            int64_t tablet_id = tablet_id_base + i;
+            req.add_base_tablet_ids(tablet_id);
+        }
+        CommitTxnResponse res;
+        
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req,
+                                 &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    }
+
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        for (int i = 0; i < 2001; ++i) {
+            int64_t tablet_id = tablet_id_base + i;
+            check_tablet_idx_db_id(txn, db_id, tablet_id);
+            check_rowset_meta_exist(txn, tablet_id, 2);
+        }
+    }
+}
+
 } // namespace doris::cloud
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 08904998a3c..855c4d76378 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1769,6 +1769,7 @@ enum MetaServiceCode {
     TABLET_NOT_FOUND = 2011;
     STALE_TABLET_CACHE = 2012;
     STALE_PREPARE_ROWSET = 2013;
+    TXN_ALREADY_COMMITED = 2014;
 
     CLUSTER_NOT_FOUND = 3001;
     ALREADY_EXISTED = 3002;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to