This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 9053834d5db4034bd70c794577711ea0a4f0d005 Author: AlexYue <[email protected]> AuthorDate: Mon Mar 20 17:55:03 2023 +0800 [bug](txn) fix concurrent txns's status data race when aborting txn (#17893) --- be/src/agent/task_worker_pool.cpp | 2 ++ .../apache/doris/service/FrontendServiceImpl.java | 10 +++++-- .../doris/transaction/GlobalTransactionMgr.java | 34 ++++++++++++++++++---- 3 files changed, 39 insertions(+), 7 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index e9982416be..f835dd4d14 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -700,6 +700,8 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() { _tasks.push_back(agent_task_req); _worker_thread_condition_variable.notify_one(); } + LOG(INFO) << "wait for previous publish version task to be done" + << "transaction_id: " << publish_version_req.transaction_id; break; } else { LOG_WARNING("failed to publish version") diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index b00cfab65a..4be9605815 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -789,7 +789,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { Env.getCurrentGlobalTransactionMgr() .commitTransaction2PC(database, tableList, request.getTxnId(), 5000); } else if (txnOperation.equalsIgnoreCase("abort")) { - Env.getCurrentGlobalTransactionMgr().abortTransaction2PC(database.getId(), request.getTxnId()); + Env.getCurrentGlobalTransactionMgr().abortTransaction2PC(database.getId(), request.getTxnId(), tableList); } else { throw new UserException("transaction operation should be \'commit\' or \'abort\'"); } @@ -911,9 +911,15 @@ public class FrontendServiceImpl implements FrontendService.Iface { throw new MetaNotFoundException("db " + request.getDb() + " does not exist"); } long dbId = db.getId(); + DatabaseTransactionMgr dbTransactionMgr = Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(dbId); + TransactionState transactionState = dbTransactionMgr.getTransactionState(request.getTxnId()); + if (transactionState == null) { + throw new UserException("transaction [" + request.getTxnId() + "] not found"); + } + List<Table> tableList = db.getTablesOnIdOrderIfExist(transactionState.getTableIdList()); Env.getCurrentGlobalTransactionMgr().abortTransaction(dbId, request.getTxnId(), request.isSetReason() ? request.getReason() : "system cancel", - TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment())); + TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()), tableList); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 4621f08700..bcd91383e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -297,13 +297,29 @@ public class GlobalTransactionMgr implements Writable { } public void abortTransaction(long dbId, long transactionId, String reason) throws UserException { - abortTransaction(dbId, transactionId, reason, null); + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); + TransactionState transactionState = getDatabaseTransactionMgr(dbId).getTransactionState(transactionId); + List<Table> tableList = db.getTablesOnIdOrderIfExist(transactionState.getTableIdList()); + abortTransaction(dbId, transactionId, reason, null, tableList); + } + + public void abortTransaction(long dbId, long transactionId, String reason, List<Table> tableList) + throws UserException { + abortTransaction(dbId, transactionId, reason, null, tableList); } public void abortTransaction(Long dbId, Long txnId, String reason, - TxnCommitAttachment txnCommitAttachment) throws UserException { + TxnCommitAttachment txnCommitAttachment, List<Table> tableList) throws UserException { DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId); - dbTransactionMgr.abortTransaction(txnId, reason, txnCommitAttachment); + if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, 5000, TimeUnit.MILLISECONDS)) { + throw new UserException("get tableList write lock timeout, tableList=(" + + StringUtils.join(tableList, ",") + ")"); + } + try { + dbTransactionMgr.abortTransaction(txnId, reason, txnCommitAttachment); + } finally { + MetaLockUtils.writeUnlockTables(tableList); + } } // for http cancel stream load api @@ -312,9 +328,17 @@ public class GlobalTransactionMgr implements Writable { dbTransactionMgr.abortTransaction(label, reason); } - public void abortTransaction2PC(Long dbId, long transactionId) throws UserException { + public void abortTransaction2PC(Long dbId, long transactionId, List<Table> tableList) throws UserException { DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId); - dbTransactionMgr.abortTransaction2PC(transactionId); + if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, 5000, TimeUnit.MILLISECONDS)) { + throw new UserException("get tableList write lock timeout, tableList=(" + + StringUtils.join(tableList, ",") + ")"); + } + try { + dbTransactionMgr.abortTransaction2PC(transactionId); + } finally { + MetaLockUtils.writeUnlockTables(tableList); + } } /* --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
