This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 9053834d5db4034bd70c794577711ea0a4f0d005
Author: AlexYue <[email protected]>
AuthorDate: Mon Mar 20 17:55:03 2023 +0800

    [bug](txn) fix concurrent txns's status data race when aborting txn (#17893)
---
 be/src/agent/task_worker_pool.cpp                  |  2 ++
 .../apache/doris/service/FrontendServiceImpl.java  | 10 +++++--
 .../doris/transaction/GlobalTransactionMgr.java    | 34 ++++++++++++++++++----
 3 files changed, 39 insertions(+), 7 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index e9982416be..f835dd4d14 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -700,6 +700,8 @@ void 
TaskWorkerPool::_publish_version_worker_thread_callback() {
                     _tasks.push_back(agent_task_req);
                     _worker_thread_condition_variable.notify_one();
                 }
+                LOG(INFO) << "wait for previous publish version task to be 
done"
+                          << "transaction_id: " << 
publish_version_req.transaction_id;
                 break;
             } else {
                 LOG_WARNING("failed to publish version")
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index b00cfab65a..4be9605815 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -789,7 +789,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             Env.getCurrentGlobalTransactionMgr()
                     .commitTransaction2PC(database, tableList, 
request.getTxnId(), 5000);
         } else if (txnOperation.equalsIgnoreCase("abort")) {
-            
Env.getCurrentGlobalTransactionMgr().abortTransaction2PC(database.getId(), 
request.getTxnId());
+            
Env.getCurrentGlobalTransactionMgr().abortTransaction2PC(database.getId(), 
request.getTxnId(), tableList);
         } else {
             throw new UserException("transaction operation should be 
\'commit\' or \'abort\'");
         }
@@ -911,9 +911,15 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             throw new MetaNotFoundException("db " + request.getDb() + " does 
not exist");
         }
         long dbId = db.getId();
+        DatabaseTransactionMgr dbTransactionMgr = 
Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(dbId);
+        TransactionState transactionState = 
dbTransactionMgr.getTransactionState(request.getTxnId());
+        if (transactionState == null) {
+            throw new UserException("transaction [" + request.getTxnId() + "] 
not found");
+        }
+        List<Table> tableList = 
db.getTablesOnIdOrderIfExist(transactionState.getTableIdList());
         Env.getCurrentGlobalTransactionMgr().abortTransaction(dbId, 
request.getTxnId(),
                 request.isSetReason() ? request.getReason() : "system cancel",
-                
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+                
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()), tableList);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 4621f08700..bcd91383e3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -297,13 +297,29 @@ public class GlobalTransactionMgr implements Writable {
     }
 
     public void abortTransaction(long dbId, long transactionId, String reason) 
throws UserException {
-        abortTransaction(dbId, transactionId, reason, null);
+        Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+        TransactionState transactionState = 
getDatabaseTransactionMgr(dbId).getTransactionState(transactionId);
+        List<Table> tableList = 
db.getTablesOnIdOrderIfExist(transactionState.getTableIdList());
+        abortTransaction(dbId, transactionId, reason, null, tableList);
+    }
+
+    public void abortTransaction(long dbId, long transactionId, String reason, 
List<Table> tableList)
+            throws UserException {
+        abortTransaction(dbId, transactionId, reason, null, tableList);
     }
 
     public void abortTransaction(Long dbId, Long txnId, String reason,
-            TxnCommitAttachment txnCommitAttachment) throws UserException {
+            TxnCommitAttachment txnCommitAttachment, List<Table> tableList) 
throws UserException {
         DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(dbId);
-        dbTransactionMgr.abortTransaction(txnId, reason, txnCommitAttachment);
+        if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, 5000, 
TimeUnit.MILLISECONDS)) {
+            throw new UserException("get tableList write lock timeout, 
tableList=("
+                    + StringUtils.join(tableList, ",") + ")");
+        }
+        try {
+            dbTransactionMgr.abortTransaction(txnId, reason, 
txnCommitAttachment);
+        } finally {
+            MetaLockUtils.writeUnlockTables(tableList);
+        }
     }
 
     // for http cancel stream load api
@@ -312,9 +328,17 @@ public class GlobalTransactionMgr implements Writable {
         dbTransactionMgr.abortTransaction(label, reason);
     }
 
-    public void abortTransaction2PC(Long dbId, long transactionId) throws 
UserException {
+    public void abortTransaction2PC(Long dbId, long transactionId, List<Table> 
tableList) throws UserException {
         DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(dbId);
-        dbTransactionMgr.abortTransaction2PC(transactionId);
+        if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, 5000, 
TimeUnit.MILLISECONDS)) {
+            throw new UserException("get tableList write lock timeout, 
tableList=("
+                    + StringUtils.join(tableList, ",") + ")");
+        }
+        try {
+            dbTransactionMgr.abortTransaction2PC(transactionId);
+        } finally {
+            MetaLockUtils.writeUnlockTables(tableList);
+        }
     }
 
     /*


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to