morningman commented on a change in pull request #2093: Fix some routine load 
bugs
URL: https://github.com/apache/incubator-doris/pull/2093#discussion_r341033996
 
 

 ##########
 File path: 
fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 ##########
 @@ -769,113 +782,48 @@ public boolean isPreviousTransactionsFinished(long 
endTransactionId, long dbId)
         return true;
     }
     
-    /**
-     * in this method should get db lock or load lock first then get txn 
manager lock , or there will be dead lock
+    /*
+     * The txn cleaner will run at a fixed interval and try to delete expired 
and timeout txns:
+     * expired: txn is in VISIBLE or ABORTED, and is expired.
+     * timeout: txn is in PREPARE, but timeout
      */
-    public void removeOldTransactions() {
+    public void removeExpiredAndTimeoutTxns() {
         long currentMillis = System.currentTimeMillis();
 
-        // TODO(cmy): the following 3 steps are no needed anymore, we can only 
use the last step to check
-        // the timeout txn. Because, now we set timeout for each txn same as 
timeout of their job's.
-        // But we keep the 1 and 2 step for compatibility. They should be 
deleted in 0.11.0
-
-        // to avoid dead lock (transaction lock and load lock), we do this in 
3 phases
-        // 1. get all related db ids of txn in idToTransactionState
-        Set<Long> dbIds = Sets.newHashSet();
-        readLock();
-        try {
-            for (TransactionState transactionState : 
idToTransactionState.values()) {
-                if (transactionState.getTransactionStatus() == 
TransactionStatus.ABORTED
-                        || transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
-                    if ((currentMillis - transactionState.getFinishTime()) / 
1000 > Config.label_keep_max_second) {
-                        dbIds.add(transactionState.getDbId());
-                    }
-                } else {
-                    // check if job is also deleted
-                    // streaming insert stmt not add to fe load job, should 
use this method to
-                    // recycle the timeout insert stmt load job
-                    if (transactionState.getTransactionStatus() == 
TransactionStatus.PREPARE
-                            && currentMillis - 
transactionState.getPrepareTime() > transactionState.getTimeoutMs()) {
-                        dbIds.add(transactionState.getDbId());
-                    }
-                }
-            }
-        } finally {
-            readUnlock();
-        }
-
-        // 2. get all load jobs' txn id of these databases
-        Map<Long, Set<Long>> dbIdToTxnIds = Maps.newHashMap();
-        Load loadInstance = Catalog.getCurrentCatalog().getLoadInstance();
-        for (Long dbId : dbIds) {
-            Set<Long> txnIds = loadInstance.getTxnIdsByDb(dbId);
-            dbIdToTxnIds.put(dbId, txnIds);
-        }
-
-        // 3. use dbIdToTxnIds to remove old transactions, without holding 
load locks again
-        List<TransactionState> abortedTxns = Lists.newArrayList();
+        List<Long> timeoutTxns = Lists.newArrayList();
+        List<Long> expiredTxns = Lists.newArrayList();
         writeLock();
         try {
-            List<Long> transactionsToDelete = Lists.newArrayList();
             for (TransactionState transactionState : 
idToTransactionState.values()) {
-                if (transactionState.getTransactionStatus() == 
TransactionStatus.ABORTED
-                        || transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
-                    if ((currentMillis - transactionState.getFinishTime()) / 
1000 > Config.label_keep_max_second) {
-                        // if this txn is not from front end then delete it 
immediately
-                        // if this txn is from front end but could not find in 
job list, then delete it immediately
-                        if (transactionState.getSourceType() != 
LoadJobSourceType.FRONTEND
-                                || !checkTxnHasRelatedJob(transactionState, 
dbIdToTxnIds)) {
-                            
transactionsToDelete.add(transactionState.getTransactionId());
-                        }
-                    }
-                } else {
-                    // check if job is also deleted
-                    // streaming insert stmt not add to fe load job, should 
use this method to
-                    // recycle the timeout insert stmt load job
-                    if (transactionState.getTransactionStatus() == 
TransactionStatus.PREPARE
-                            && currentMillis - 
transactionState.getPrepareTime() > transactionState.getTimeoutMs()) {
-                        if ((transactionState.getSourceType() != 
LoadJobSourceType.FRONTEND
-                                || !checkTxnHasRelatedJob(transactionState, 
dbIdToTxnIds))) {
-                            
transactionState.setTransactionStatus(TransactionStatus.ABORTED);
-                            
transactionState.setFinishTime(System.currentTimeMillis());
-                            transactionState.setReason("transaction is timeout 
and is cancelled automatically");
-                            unprotectUpsertTransactionState(transactionState);
-                            abortedTxns.add(transactionState);
-                        }
-                    }
+                if (transactionState.isExpired(currentMillis)) {
+                    // remove the txn which labels are expired
+                    expiredTxns.add(transactionState.getTransactionId());
+                } else if (transactionState.isTimeout(currentMillis)) {
+                    // txn is running but timeout, abort it.
+                    timeoutTxns.add(transactionState.getTransactionId());
                 }
             }
-            
-            for (Long transId : transactionsToDelete) {
-                deleteTransaction(transId);
-                LOG.info("transaction [" + transId + "] is expired, remove it 
from transaction table");
-            }
         } finally {
             writeUnlock();
         }
 
-        for (TransactionState abortedTxn : abortedTxns) {
+        // delete expired txns
+        for (Long txnId : expiredTxns) {
+            deleteTransaction(txnId);
+            LOG.info("transaction [" + txnId + "] is expired, remove it from 
transaction manager");
+        }
+
+        // abort timeout txns
+        for (Long txnId : timeoutTxns) {
             try {
-                abortedTxn.afterStateTransform(TransactionStatus.ABORTED, 
true, abortedTxn.getReason());
+                abortTransaction(txnId, "timeout by txn manager");
+                LOG.info("transaction [" + txnId + "] is timeout, abort it by 
transaction manager");
             } catch (UserException e) {
-                // just print a log, it does not matter.
-                LOG.warn("after abort timeout txn failed. txn id: {}", 
abortedTxn.getTransactionId(), e);
+                // abort may be failed. it is acceptable. just print a log
+                LOG.warn("abort timeout txn {} failed. msg: {}", txnId, 
e.getMessage());
 
 Review comment:
   print stack trance is unnecessary here, and will make fe.log ugly

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to