This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 93668577f5c [fix][txn] fix txn coordinator recover handle committing 
and aborting txn race condition. (#19201)
93668577f5c is described below

commit 93668577f5cff9ba1c7d685ae539fedec448a50b
Author: thetumbled <[email protected]>
AuthorDate: Wed Feb 1 09:06:09 2023 +0800

    [fix][txn] fix txn coordinator recover handle committing and aborting txn 
race condition. (#19201)
    
    Fixes #19200
    
    transaction lasted for long time and will not be aborted, which cause TB's 
MaxReadPosition do not move and will not take snapshot. With an old snapshot, 
TB will read a lot of entry while doing recovery.
    In worst cases, there are 30 minutes of unavailable time with Topics.
    
    avoid concurrent execution.
---
 .../broker/TransactionMetadataStoreService.java    | 125 +++++++++++----------
 .../impl/MLTransactionMetadataStore.java           |   2 -
 2 files changed, 66 insertions(+), 61 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index fb56e8e6857..07f542bbad5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -133,54 +133,66 @@ public class TransactionMetadataStoreService {
                             return;
                         }
 
-                        openTransactionMetadataStore(tcId).thenAccept((store) 
-> internalPinnedExecutor.execute(() -> {
-                            stores.put(tcId, store);
-                            LOG.info("Added new transaction meta store {}", 
tcId);
-                            long endTime = System.currentTimeMillis() + 
HANDLE_PENDING_CONNECT_TIME_OUT;
-                            while (true) {
-                                // prevent thread in a busy loop.
-                                if (System.currentTimeMillis() < endTime) {
-                                    CompletableFuture<Void> future = 
deque.poll();
-                                    if (future != null) {
-                                        // complete queue request future
-                                        future.complete(null);
-                                    } else {
-                                        break;
-                                    }
-                                } else {
-                                    deque.clear();
-                                    break;
-                                }
-                            }
-
-                            completableFuture.complete(null);
-                            tcLoadSemaphore.release();
-                        })).exceptionally(e -> {
-                            internalPinnedExecutor.execute(() -> {
-                                        
completableFuture.completeExceptionally(e.getCause());
-                                        // release before handle request queue,
-                                        //in order to client reconnect 
infinite loop
-                                        tcLoadSemaphore.release();
-                                        long endTime = 
System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
-                                        while (true) {
-                                            // prevent thread in a busy loop.
-                                            if (System.currentTimeMillis() < 
endTime) {
-                                                CompletableFuture<Void> future 
= deque.poll();
-                                                if (future != null) {
-                                                    // this means that this tc 
client connection connect fail
-                                                    
future.completeExceptionally(e);
-                                                } else {
-                                                    break;
-                                                }
+                        TransactionTimeoutTracker timeoutTracker = 
timeoutTrackerFactory.newTracker(tcId);
+                        TransactionRecoverTracker recoverTracker =
+                                new 
TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
+                                        timeoutTracker, tcId.getId());
+                        openTransactionMetadataStore(tcId, timeoutTracker, 
recoverTracker).thenAccept(
+                                store -> internalPinnedExecutor.execute(() -> {
+                                    // TransactionMetadataStore initialization
+                                    // need to use TransactionMetadataStore 
itself.
+                                    // we need to put store into stores map 
before
+                                    // handle committing and aborting 
transaction.
+                                    stores.put(tcId, store);
+                                    LOG.info("Added new transaction meta store 
{}", tcId);
+                                    
recoverTracker.handleCommittingAndAbortingTransaction();
+                                    timeoutTracker.start();
+
+                                    long endTime = System.currentTimeMillis() 
+ HANDLE_PENDING_CONNECT_TIME_OUT;
+                                    while (true) {
+                                        // prevent thread in a busy loop.
+                                        if (System.currentTimeMillis() < 
endTime) {
+                                            CompletableFuture<Void> future = 
deque.poll();
+                                            if (future != null) {
+                                                // complete queue request 
future
+                                                future.complete(null);
                                             } else {
-                                                deque.clear();
                                                 break;
                                             }
+                                        } else {
+                                            deque.clear();
+                                            break;
                                         }
-                                        LOG.error("Add transaction metadata 
store with id {} error", tcId.getId(), e);
-                                    });
-                                    return null;
-                                });
+                                    }
+
+                                    completableFuture.complete(null);
+                                    tcLoadSemaphore.release();
+                                })).exceptionally(e -> {
+                            internalPinnedExecutor.execute(() -> {
+                                
completableFuture.completeExceptionally(e.getCause());
+                                // release before handle request queue,
+                                //in order to client reconnect infinite loop
+                                tcLoadSemaphore.release();
+                                long endTime = System.currentTimeMillis() + 
HANDLE_PENDING_CONNECT_TIME_OUT;
+                                while (true) {
+                                    // prevent thread in a busy loop.
+                                    if (System.currentTimeMillis() < endTime) {
+                                        CompletableFuture<Void> future = 
deque.poll();
+                                        if (future != null) {
+                                            // this means that this tc client 
connection connect fail
+                                            future.completeExceptionally(e);
+                                        } else {
+                                            break;
+                                        }
+                                    } else {
+                                        deque.clear();
+                                        break;
+                                    }
+                                }
+                                LOG.error("Add transaction metadata store with 
id {} error", tcId.getId(), e);
+                            });
+                            return null;
+                        });
                     } else {
                         // only one command can open transaction metadata 
store,
                         // other will be added to the deque, when the op of 
openTransactionMetadataStore finished
@@ -200,9 +212,11 @@ public class TransactionMetadataStoreService {
         return completableFuture;
     }
 
-    public CompletableFuture<TransactionMetadataStore> 
openTransactionMetadataStore(TransactionCoordinatorID tcId) {
-        final Timer brokerClientSharedTimer =
-                pulsarService.getBrokerClientSharedTimer();
+    public CompletableFuture<TransactionMetadataStore>
+    openTransactionMetadataStore(TransactionCoordinatorID tcId,
+                                 TransactionTimeoutTracker timeoutTracker,
+                                 TransactionRecoverTracker recoverTracker) {
+        final Timer brokerClientSharedTimer = 
pulsarService.getBrokerClientSharedTimer();
         final ServiceConfiguration serviceConfiguration = 
pulsarService.getConfiguration();
         final TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new 
TxnLogBufferedWriterConfig();
         
txnLogBufferedWriterConfig.setBatchEnabled(serviceConfiguration.isTransactionLogBatchedWriteEnabled());
@@ -212,18 +226,11 @@ public class TransactionMetadataStoreService {
         txnLogBufferedWriterConfig
                 
.setBatchedWriteMaxDelayInMillis(serviceConfiguration.getTransactionLogBatchedWriteMaxDelayInMillis());
 
-        return pulsarService.getBrokerService()
-                
.getManagedLedgerConfig(getMLTransactionLogName(tcId)).thenCompose(v -> {
-                            TransactionTimeoutTracker timeoutTracker = 
timeoutTrackerFactory.newTracker(tcId);
-                            TransactionRecoverTracker recoverTracker =
-                                    new 
TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
-                                    timeoutTracker, tcId.getId());
-                            return transactionMetadataStoreProvider
-                                    .openStore(tcId, 
pulsarService.getManagedLedgerFactory(), v,
-                                            timeoutTracker, recoverTracker,
-                                            
pulsarService.getConfig().getMaxActiveTransactionsPerCoordinator(),
-                                            txnLogBufferedWriterConfig, 
brokerClientSharedTimer);
-                });
+        return 
pulsarService.getBrokerService().getManagedLedgerConfig(getMLTransactionLogName(tcId)).thenCompose(
+                v -> transactionMetadataStoreProvider.openStore(tcId, 
pulsarService.getManagedLedgerFactory(), v,
+                        timeoutTracker, recoverTracker,
+                        
pulsarService.getConfig().getMaxActiveTransactionsPerCoordinator(), 
txnLogBufferedWriterConfig,
+                        brokerClientSharedTimer));
     }
 
     public CompletableFuture<Void> 
removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 65e9a654a78..77a56f415c0 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -127,8 +127,6 @@ public class MLTransactionMetadataStore
                                         + tcID.toString() + " change state to 
Ready error when init it"));
 
                     } else {
-                        
recoverTracker.handleCommittingAndAbortingTransaction();
-                        timeoutTracker.start();
                         
completableFuture.complete(MLTransactionMetadataStore.this);
                         
recoverTime.setRecoverEndTime(System.currentTimeMillis());
                     }

Reply via email to