This is an automated email from the ASF dual-hosted git repository. mattisonchao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0d303c89dfcda16991b1ceb8844645a89a347603 Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com> AuthorDate: Wed Jun 15 14:17:44 2022 +0800 [Transaction] Set TC state is Ready after open MLTransactionMetadataStore completely. (#13957) [Transaction] Set TC state is Ready after open MLTransactionMetadataStore completely. ### Motivation The MLTransactionMetadataStore constructor and openTransactionMetadataStore method are asynchronous. So there may be situations where the store in the Initializing state was put into stores ### Modification Pass in the future to wait for MLTransactionMetadataStore initialization to complete (cherry picked from commit 0fe8ac0d2c1bb909729580e5456b4d57c2a00346) --- .../pulsar/broker/transaction/TransactionTest.java | 13 +- .../impl/MLTransactionMetadataStore.java | 174 +++++++++++---------- .../impl/MLTransactionMetadataStoreProvider.java | 4 +- .../MLTransactionMetadataStoreTest.java | 38 ++--- 4 files changed, 121 insertions(+), 108 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 10aa86aed2c..f9fc52aa547 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -695,9 +695,8 @@ public class TransactionTest extends TransactionTestBase { doNothing().when(timeoutTracker).start(); MLTransactionMetadataStore metadataStore1 = new MLTransactionMetadataStore(new TransactionCoordinatorID(1), - mlTransactionLog, timeoutTracker, transactionRecoverTracker, - mlTransactionSequenceIdGenerator); - + mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator); + metadataStore1.init(transactionRecoverTracker).get(); Awaitility.await().untilAsserted(() -> assertEquals(metadataStore1.getCoordinatorStats().state, "Ready")); @@ -709,8 +708,8 @@ public class TransactionTest extends TransactionTestBase { MLTransactionMetadataStore metadataStore2 = new MLTransactionMetadataStore(new TransactionCoordinatorID(1), - mlTransactionLog, timeoutTracker, transactionRecoverTracker, - mlTransactionSequenceIdGenerator); + mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator); + metadataStore2.init(transactionRecoverTracker).get(); Awaitility.await().untilAsserted(() -> assertEquals(metadataStore2.getCoordinatorStats().state, "Ready")); @@ -722,8 +721,8 @@ public class TransactionTest extends TransactionTestBase { MLTransactionMetadataStore metadataStore3 = new MLTransactionMetadataStore(new TransactionCoordinatorID(1), - mlTransactionLog, timeoutTracker, transactionRecoverTracker, - mlTransactionSequenceIdGenerator); + mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator); + metadataStore3.init(transactionRecoverTracker).get(); Awaitility.await().untilAsserted(() -> assertEquals(metadataStore3.getCoordinatorStats().state, "Ready")); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index 685d57e664e..6c88d27cc22 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -34,6 +34,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.Position; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.Subscription; import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats; @@ -79,7 +80,6 @@ public class MLTransactionMetadataStore public MLTransactionMetadataStore(TransactionCoordinatorID tcID, MLTransactionLogImpl mlTransactionLog, TransactionTimeoutTracker timeoutTracker, - TransactionRecoverTracker recoverTracker, MLTransactionSequenceIdGenerator sequenceIdGenerator) { super(State.None); this.sequenceIdGenerator = sequenceIdGenerator; @@ -96,96 +96,108 @@ public class MLTransactionMetadataStore DefaultThreadFactory threadFactory = new DefaultThreadFactory("transaction_coordinator_" + tcID.toString() + "thread_factory"); this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); + } + public CompletableFuture<TransactionMetadataStore> init(TransactionRecoverTracker recoverTracker) { + CompletableFuture<TransactionMetadataStore> completableFuture = new CompletableFuture<>(); if (!changeToInitializingState()) { log.error("Managed ledger transaction metadata store change state error when init it"); - return; - } - - internalPinnedExecutor.execute(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() { - - @Override - public void replayComplete() { - recoverTracker.appendOpenTransactionToTimeoutTracker(); - if (!changeToReadyState()) { - log.error("Managed ledger transaction metadata store change state error when replay complete"); - } else { - recoverTracker.handleCommittingAndAbortingTransaction(); - timeoutTracker.start(); + completableFuture + .completeExceptionally(new TransactionCoordinatorClientException + .CoordinatorNotFoundException("transaction metadata store with tcId " + + tcID.toString() + " change state to Initializing error when init it")); + } else { + internalPinnedExecutor.execute(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() { + + @Override + public void replayComplete() { + recoverTracker.appendOpenTransactionToTimeoutTracker(); + if (!changeToReadyState()) { + log.error("Managed ledger transaction metadata store change state error when replay complete"); + completableFuture + .completeExceptionally(new TransactionCoordinatorClientException + .CoordinatorNotFoundException("transaction metadata store with tcId " + + tcID.toString() + " change state to Ready error when init it")); + + } else { + recoverTracker.handleCommittingAndAbortingTransaction(); + timeoutTracker.start(); + completableFuture.complete(MLTransactionMetadataStore.this); + } } - } - - @Override - public void handleMetadataEntry(Position position, TransactionMetadataEntry transactionMetadataEntry) { - try { + @Override + public void handleMetadataEntry(Position position, TransactionMetadataEntry transactionMetadataEntry) { - TxnID txnID = new TxnID(transactionMetadataEntry.getTxnidMostBits(), + try { + TxnID txnID = new TxnID(transactionMetadataEntry.getTxnidMostBits(), transactionMetadataEntry.getTxnidLeastBits()); - long transactionId = transactionMetadataEntry.getTxnidLeastBits(); - switch (transactionMetadataEntry.getMetadataOp()) { - case NEW: - long txnSequenceId = transactionMetadataEntry.getTxnidLeastBits(); - if (txnMetaMap.containsKey(transactionId)) { - txnMetaMap.get(transactionId).getRight().add(position); - } else { - List<Position> positions = new ArrayList<>(); - positions.add(position); - long openTimestamp = transactionMetadataEntry.getStartTime(); - long timeoutAt = transactionMetadataEntry.getTimeoutMs(); - txnMetaMap.put(transactionId, MutablePair.of(new TxnMetaImpl(txnID, - openTimestamp, timeoutAt), positions)); - recoverTracker.handleOpenStatusTransaction(txnSequenceId, - timeoutAt + openTimestamp); - } - break; - case ADD_PARTITION: - if (!txnMetaMap.containsKey(transactionId)) { - transactionLog.deletePosition(Collections.singletonList(position)); - } else { - txnMetaMap.get(transactionId).getLeft() - .addProducedPartitions(transactionMetadataEntry.getPartitionsList()); - txnMetaMap.get(transactionId).getRight().add(position); - } - break; - case ADD_SUBSCRIPTION: - if (!txnMetaMap.containsKey(transactionId)) { - transactionLog.deletePosition(Collections.singletonList(position)); - } else { - txnMetaMap.get(transactionId).getLeft() - .addAckedPartitions(subscriptionToTxnSubscription( - transactionMetadataEntry.getSubscriptionsList())); - txnMetaMap.get(transactionId).getRight().add(position); - } - break; - case UPDATE: - if (!txnMetaMap.containsKey(transactionId)) { - transactionLog.deletePosition(Collections.singletonList(position)); - } else { - TxnStatus newStatus = transactionMetadataEntry.getNewStatus(); - txnMetaMap.get(transactionId).getLeft() - .updateTxnStatus(transactionMetadataEntry.getNewStatus(), - transactionMetadataEntry.getExpectedStatus()); - txnMetaMap.get(transactionId).getRight().add(position); - recoverTracker.updateTransactionStatus(txnID.getLeastSigBits(), newStatus); - if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) { - transactionLog.deletePosition(txnMetaMap - .get(transactionId).getRight()).thenAccept(v -> - txnMetaMap.remove(transactionId).getLeft()); + long transactionId = transactionMetadataEntry.getTxnidLeastBits(); + switch (transactionMetadataEntry.getMetadataOp()) { + case NEW: + long txnSequenceId = transactionMetadataEntry.getTxnidLeastBits(); + if (txnMetaMap.containsKey(transactionId)) { + txnMetaMap.get(transactionId).getRight().add(position); + } else { + List<Position> positions = new ArrayList<>(); + positions.add(position); + long openTimestamp = transactionMetadataEntry.getStartTime(); + long timeoutAt = transactionMetadataEntry.getTimeoutMs(); + txnMetaMap.put(transactionId, MutablePair.of(new TxnMetaImpl(txnID, + openTimestamp, timeoutAt), positions)); + recoverTracker.handleOpenStatusTransaction(txnSequenceId, + timeoutAt + openTimestamp); } - } - break; - default: - throw new InvalidTxnStatusException("Transaction `" - + txnID + "` load replay metadata operation " - + "from transaction log with unknown operation"); + break; + case ADD_PARTITION: + if (!txnMetaMap.containsKey(transactionId)) { + transactionLog.deletePosition(Collections.singletonList(position)); + } else { + txnMetaMap.get(transactionId).getLeft() + .addProducedPartitions(transactionMetadataEntry.getPartitionsList()); + txnMetaMap.get(transactionId).getRight().add(position); + } + break; + case ADD_SUBSCRIPTION: + if (!txnMetaMap.containsKey(transactionId)) { + transactionLog.deletePosition(Collections.singletonList(position)); + } else { + txnMetaMap.get(transactionId).getLeft() + .addAckedPartitions(subscriptionToTxnSubscription( + transactionMetadataEntry.getSubscriptionsList())); + txnMetaMap.get(transactionId).getRight().add(position); + } + break; + case UPDATE: + if (!txnMetaMap.containsKey(transactionId)) { + transactionLog.deletePosition(Collections.singletonList(position)); + } else { + TxnStatus newStatus = transactionMetadataEntry.getNewStatus(); + txnMetaMap.get(transactionId).getLeft() + .updateTxnStatus(transactionMetadataEntry.getNewStatus(), + transactionMetadataEntry.getExpectedStatus()); + txnMetaMap.get(transactionId).getRight().add(position); + recoverTracker.updateTransactionStatus(txnID.getLeastSigBits(), newStatus); + if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) { + transactionLog.deletePosition(txnMetaMap + .get(transactionId).getRight()).thenAccept(v -> + txnMetaMap.remove(transactionId).getLeft()); + } + } + break; + default: + throw new InvalidTxnStatusException("Transaction `" + + txnID + "` load replay metadata operation " + + "from transaction log with unknown operation"); + } + } catch (InvalidTxnStatusException e) { + transactionLog.deletePosition(Collections.singletonList(position)); + log.error(e.getMessage(), e); } - } catch (InvalidTxnStatusException e) { - transactionLog.deletePosition(Collections.singletonList(position)); - log.error(e.getMessage(), e); } - } - })); + })); + } + return completableFuture; } @Override diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java index f0c32d26482..22a58ebcc97 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java @@ -51,8 +51,8 @@ public class MLTransactionMetadataStoreProvider implements TransactionMetadataSt managedLedgerFactory, managedLedgerConfig); // MLTransactionLogInterceptor will init sequenceId and update the sequenceId to managedLedger properties. - return txnLog.initialize().thenApply(__ -> + return txnLog.initialize().thenCompose(__ -> new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker, - recoverTracker, mlTransactionSequenceIdGenerator)); + mlTransactionSequenceIdGenerator).init(recoverTracker)); } } \ No newline at end of file diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java index a06bf9e6dea..aafe54e6069 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java @@ -74,8 +74,9 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { mlTransactionLog.initialize().join(); MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, - new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(), + new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator); + transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get(); int checkReplayRetryCount = 0; while (true) { checkReplayRetryCount++; @@ -149,8 +150,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { mlTransactionLog.initialize().join(); MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, - new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(), - mlTransactionSequenceIdGenerator); + new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator); + transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get(); Awaitility.await().until(transactionMetadataStore::checkIfReady); TxnID txnID = transactionMetadataStore.newTransaction(20000).get(); @@ -178,8 +179,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { mlTransactionLog.initialize().join(); transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, - new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(), - mlTransactionSequenceIdGenerator); + new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator); + transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get(); Awaitility.await().until(transactionMetadataStore::checkIfReady); txnID = transactionMetadataStore.newTransaction(100000).get(); @@ -201,10 +202,11 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, managedLedgerConfig); mlTransactionLog.initialize().join(); + MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, - new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(), - mlTransactionSequenceIdGenerator); + new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator); + transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get(); int checkReplayRetryCount = 0; while (true) { if (checkReplayRetryCount > 3) { @@ -244,10 +246,11 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { MLTransactionLogImpl txnLog2 = new MLTransactionLogImpl(transactionCoordinatorID, factory, managedLedgerConfig); txnLog2.initialize().join(); + MLTransactionMetadataStore transactionMetadataStoreTest = new MLTransactionMetadataStore(transactionCoordinatorID, - txnLog2, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(), - mlTransactionSequenceIdGenerator); + txnLog2, new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator); + transactionMetadataStoreTest.init(new TransactionRecoverTrackerImpl()).get(); while (true) { if (checkReplayRetryCount > 6) { @@ -315,8 +318,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { mlTransactionLog.initialize().join(); MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, - new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(), - mlTransactionSequenceIdGenerator); + new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator); + transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get(); int checkReplayRetryCount = 0; while (true) { if (checkReplayRetryCount > 3) { @@ -382,9 +385,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { mlTransactionLog.initialize().join(); MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, - new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(), - mlTransactionSequenceIdGenerator); - + new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator); + transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get(); Awaitility.await().until(transactionMetadataStore::checkIfReady); @@ -401,8 +403,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { mlTransactionLog.initialize().join(); transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, - new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(), - mlTransactionSequenceIdGenerator); + new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator); + transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get(); Awaitility.await().until(transactionMetadataStore::checkIfReady); } @@ -423,8 +425,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { mlTransactionLog.initialize().join(); MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, - new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(), - mlTransactionSequenceIdGenerator); + new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator); + transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get(); Awaitility.await().until(transactionMetadataStore::checkIfReady); transactionMetadataStore.newTransaction(5000).get();