This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 93668577f5c [fix][txn] fix txn coordinator recover handle committing
and aborting txn race condition. (#19201)
93668577f5c is described below
commit 93668577f5cff9ba1c7d685ae539fedec448a50b
Author: thetumbled <[email protected]>
AuthorDate: Wed Feb 1 09:06:09 2023 +0800
[fix][txn] fix txn coordinator recover handle committing and aborting txn
race condition. (#19201)
Fixes #19200
transaction lasted for long time and will not be aborted, which cause TB's
MaxReadPosition do not move and will not take snapshot. With an old snapshot,
TB will read a lot of entry while doing recovery.
In worst cases, there are 30 minutes of unavailable time with Topics.
avoid concurrent execution.
---
.../broker/TransactionMetadataStoreService.java | 125 +++++++++++----------
.../impl/MLTransactionMetadataStore.java | 2 -
2 files changed, 66 insertions(+), 61 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index fb56e8e6857..07f542bbad5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -133,54 +133,66 @@ public class TransactionMetadataStoreService {
return;
}
- openTransactionMetadataStore(tcId).thenAccept((store)
-> internalPinnedExecutor.execute(() -> {
- stores.put(tcId, store);
- LOG.info("Added new transaction meta store {}",
tcId);
- long endTime = System.currentTimeMillis() +
HANDLE_PENDING_CONNECT_TIME_OUT;
- while (true) {
- // prevent thread in a busy loop.
- if (System.currentTimeMillis() < endTime) {
- CompletableFuture<Void> future =
deque.poll();
- if (future != null) {
- // complete queue request future
- future.complete(null);
- } else {
- break;
- }
- } else {
- deque.clear();
- break;
- }
- }
-
- completableFuture.complete(null);
- tcLoadSemaphore.release();
- })).exceptionally(e -> {
- internalPinnedExecutor.execute(() -> {
-
completableFuture.completeExceptionally(e.getCause());
- // release before handle request queue,
- //in order to client reconnect
infinite loop
- tcLoadSemaphore.release();
- long endTime =
System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
- while (true) {
- // prevent thread in a busy loop.
- if (System.currentTimeMillis() <
endTime) {
- CompletableFuture<Void> future
= deque.poll();
- if (future != null) {
- // this means that this tc
client connection connect fail
-
future.completeExceptionally(e);
- } else {
- break;
- }
+ TransactionTimeoutTracker timeoutTracker =
timeoutTrackerFactory.newTracker(tcId);
+ TransactionRecoverTracker recoverTracker =
+ new
TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
+ timeoutTracker, tcId.getId());
+ openTransactionMetadataStore(tcId, timeoutTracker,
recoverTracker).thenAccept(
+ store -> internalPinnedExecutor.execute(() -> {
+ // TransactionMetadataStore initialization
+ // need to use TransactionMetadataStore
itself.
+ // we need to put store into stores map
before
+ // handle committing and aborting
transaction.
+ stores.put(tcId, store);
+ LOG.info("Added new transaction meta store
{}", tcId);
+
recoverTracker.handleCommittingAndAbortingTransaction();
+ timeoutTracker.start();
+
+ long endTime = System.currentTimeMillis()
+ HANDLE_PENDING_CONNECT_TIME_OUT;
+ while (true) {
+ // prevent thread in a busy loop.
+ if (System.currentTimeMillis() <
endTime) {
+ CompletableFuture<Void> future =
deque.poll();
+ if (future != null) {
+ // complete queue request
future
+ future.complete(null);
} else {
- deque.clear();
break;
}
+ } else {
+ deque.clear();
+ break;
}
- LOG.error("Add transaction metadata
store with id {} error", tcId.getId(), e);
- });
- return null;
- });
+ }
+
+ completableFuture.complete(null);
+ tcLoadSemaphore.release();
+ })).exceptionally(e -> {
+ internalPinnedExecutor.execute(() -> {
+
completableFuture.completeExceptionally(e.getCause());
+ // release before handle request queue,
+ //in order to client reconnect infinite loop
+ tcLoadSemaphore.release();
+ long endTime = System.currentTimeMillis() +
HANDLE_PENDING_CONNECT_TIME_OUT;
+ while (true) {
+ // prevent thread in a busy loop.
+ if (System.currentTimeMillis() < endTime) {
+ CompletableFuture<Void> future =
deque.poll();
+ if (future != null) {
+ // this means that this tc client
connection connect fail
+ future.completeExceptionally(e);
+ } else {
+ break;
+ }
+ } else {
+ deque.clear();
+ break;
+ }
+ }
+ LOG.error("Add transaction metadata store with
id {} error", tcId.getId(), e);
+ });
+ return null;
+ });
} else {
// only one command can open transaction metadata
store,
// other will be added to the deque, when the op of
openTransactionMetadataStore finished
@@ -200,9 +212,11 @@ public class TransactionMetadataStoreService {
return completableFuture;
}
- public CompletableFuture<TransactionMetadataStore>
openTransactionMetadataStore(TransactionCoordinatorID tcId) {
- final Timer brokerClientSharedTimer =
- pulsarService.getBrokerClientSharedTimer();
+ public CompletableFuture<TransactionMetadataStore>
+ openTransactionMetadataStore(TransactionCoordinatorID tcId,
+ TransactionTimeoutTracker timeoutTracker,
+ TransactionRecoverTracker recoverTracker) {
+ final Timer brokerClientSharedTimer =
pulsarService.getBrokerClientSharedTimer();
final ServiceConfiguration serviceConfiguration =
pulsarService.getConfiguration();
final TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new
TxnLogBufferedWriterConfig();
txnLogBufferedWriterConfig.setBatchEnabled(serviceConfiguration.isTransactionLogBatchedWriteEnabled());
@@ -212,18 +226,11 @@ public class TransactionMetadataStoreService {
txnLogBufferedWriterConfig
.setBatchedWriteMaxDelayInMillis(serviceConfiguration.getTransactionLogBatchedWriteMaxDelayInMillis());
- return pulsarService.getBrokerService()
-
.getManagedLedgerConfig(getMLTransactionLogName(tcId)).thenCompose(v -> {
- TransactionTimeoutTracker timeoutTracker =
timeoutTrackerFactory.newTracker(tcId);
- TransactionRecoverTracker recoverTracker =
- new
TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
- timeoutTracker, tcId.getId());
- return transactionMetadataStoreProvider
- .openStore(tcId,
pulsarService.getManagedLedgerFactory(), v,
- timeoutTracker, recoverTracker,
-
pulsarService.getConfig().getMaxActiveTransactionsPerCoordinator(),
- txnLogBufferedWriterConfig,
brokerClientSharedTimer);
- });
+ return
pulsarService.getBrokerService().getManagedLedgerConfig(getMLTransactionLogName(tcId)).thenCompose(
+ v -> transactionMetadataStoreProvider.openStore(tcId,
pulsarService.getManagedLedgerFactory(), v,
+ timeoutTracker, recoverTracker,
+
pulsarService.getConfig().getMaxActiveTransactionsPerCoordinator(),
txnLogBufferedWriterConfig,
+ brokerClientSharedTimer));
}
public CompletableFuture<Void>
removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 65e9a654a78..77a56f415c0 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -127,8 +127,6 @@ public class MLTransactionMetadataStore
+ tcID.toString() + " change state to
Ready error when init it"));
} else {
-
recoverTracker.handleCommittingAndAbortingTransaction();
- timeoutTracker.start();
completableFuture.complete(MLTransactionMetadataStore.this);
recoverTime.setRecoverEndTime(System.currentTimeMillis());
}