This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 14a731e88f46f5c565acafc1b9944b4e7a8293be Author: Matteo Merli <[email protected]> AuthorDate: Fri Jul 2 18:56:14 2021 +0200 Fixed possible deadlock in the initialization of MLTransactionLog (#11194) * Fixed possible deadlock in the initialization of MLTransactionLog * Fixed tests (cherry picked from commit 555042a45d9692148c302fbdedbb47f1f2572a48) --- .../broker/stats/ManagedLedgerMetricsTest.java | 3 +- .../transaction/coordinator/TransactionLog.java | 5 +++ .../coordinator/impl/MLTransactionLogImpl.java | 46 +++++++++++++++++++--- .../impl/MLTransactionMetadataStoreProvider.java | 17 +++----- .../MLTransactionMetadataStoreTest.java | 13 +++++- 5 files changed, 64 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java index 9d19e3e..98ac261 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java @@ -103,7 +103,8 @@ public class ManagedLedgerMetricsTest extends BrokerTestBase { ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); managedLedgerConfig.setMaxEntriesPerLedger(2); new MLTransactionLogImpl(TransactionCoordinatorID.get(0), - pulsar.getManagedLedgerFactory(), managedLedgerConfig); + pulsar.getManagedLedgerFactory(), managedLedgerConfig) + .initialize().join(); ManagedLedgerMetrics metrics = new ManagedLedgerMetrics(pulsar); metrics.generate(); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLog.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLog.java index 7f70c28..a752c3b 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLog.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLog.java @@ -27,6 +27,11 @@ import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry; */ public interface TransactionLog { + /** + * Initialize the TransactionLog implementation + */ + CompletableFuture<Void> initialize(); + /** * Replay transaction log to load the transaction map. diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java index 4820805..4098cfd 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java @@ -52,11 +52,13 @@ public class MLTransactionLogImpl implements TransactionLog { private static final Logger log = LoggerFactory.getLogger(MLTransactionLogImpl.class); - private final ManagedLedger managedLedger; + private final ManagedLedgerFactory managedLedgerFactory; + private final ManagedLedgerConfig managedLedgerConfig; + private ManagedLedger managedLedger; public final static String TRANSACTION_LOG_PREFIX = "__transaction_log_"; - private final ManagedCursor cursor; + private ManagedCursor cursor; public final static String TRANSACTION_SUBSCRIPTION_NAME = "transaction.subscription"; @@ -70,19 +72,51 @@ public class MLTransactionLogImpl implements TransactionLog { public MLTransactionLogImpl(TransactionCoordinatorID tcID, ManagedLedgerFactory managedLedgerFactory, - ManagedLedgerConfig managedLedgerConfig) throws Exception { + ManagedLedgerConfig managedLedgerConfig) { this.topicName = TopicName.get(TopicDomain.persistent.value(), NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX + tcID.getId()); this.tcId = tcID.getId(); this.mlTransactionLogInterceptor = new MLTransactionLogInterceptor(); managedLedgerConfig.setManagedLedgerInterceptor(this.mlTransactionLogInterceptor); - this.managedLedger = managedLedgerFactory.open(topicName.getPersistenceNamingEncoding(), managedLedgerConfig); - this.cursor = managedLedger.openCursor(TRANSACTION_SUBSCRIPTION_NAME, - CommandSubscribe.InitialPosition.Earliest); + this.managedLedgerFactory = managedLedgerFactory; + this.managedLedgerConfig = managedLedgerConfig; this.entryQueue = new SpscArrayQueue<>(2000); } @Override + public CompletableFuture<Void> initialize() { + CompletableFuture<Void> future = new CompletableFuture<>(); + managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), + managedLedgerConfig, + new AsyncCallbacks.OpenLedgerCallback() { + @Override + public void openLedgerComplete(ManagedLedger ledger, Object ctx) { + MLTransactionLogImpl.this.managedLedger = ledger; + + managedLedger.asyncOpenCursor(TRANSACTION_SUBSCRIPTION_NAME, + CommandSubscribe.InitialPosition.Earliest, new AsyncCallbacks.OpenCursorCallback() { + @Override + public void openCursorComplete(ManagedCursor cursor, Object ctx) { + MLTransactionLogImpl.this.cursor = cursor; + future.complete(null); + } + + @Override + public void openCursorFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); + } + + @Override + public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null, null); + return future; + } + + @Override public void replayAsync(TransactionLogReplayCallback transactionLogReplayCallback) { new TransactionLogReplayer(transactionLogReplayCallback).start(); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java index bdf0d56..36b1958 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java @@ -23,6 +23,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionLog; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider; import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker; @@ -44,16 +45,10 @@ public class MLTransactionMetadataStoreProvider implements TransactionMetadataSt ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker, TransactionRecoverTracker recoverTracker) { - TransactionMetadataStore transactionMetadataStore; - try { - transactionMetadataStore = - new MLTransactionMetadataStore(transactionCoordinatorId, - new MLTransactionLogImpl(transactionCoordinatorId, - managedLedgerFactory, managedLedgerConfig), timeoutTracker, recoverTracker); - } catch (Exception e) { - log.error("MLTransactionMetadataStore init fail", e); - return FutureUtil.failedFuture(e); - } - return CompletableFuture.completedFuture(transactionMetadataStore); + MLTransactionLogImpl txnLog = new MLTransactionLogImpl(transactionCoordinatorId, + managedLedgerFactory, managedLedgerConfig); + + return txnLog.initialize().thenApply(__ -> + new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker, recoverTracker)); } } \ No newline at end of file diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java index 5316470..e9b3e0c 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java @@ -67,6 +67,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1); MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, new ManagedLedgerConfig()); + mlTransactionLog.initialize().join(); MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl()); @@ -138,6 +139,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { managedLedgerConfig.setMaxEntriesPerLedger(3); MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, managedLedgerConfig); + mlTransactionLog.initialize().join(); MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl()); @@ -181,6 +183,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { managedLedgerConfig.setMaxEntriesPerLedger(2); MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, managedLedgerConfig); + mlTransactionLog.initialize().join(); MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl()); @@ -220,10 +223,12 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { transactionMetadataStore.closeAsync(); + MLTransactionLogImpl txnLog2 = new MLTransactionLogImpl(transactionCoordinatorID, factory, + new ManagedLedgerConfig()); + txnLog2.initialize().join(); MLTransactionMetadataStore transactionMetadataStoreTest = new MLTransactionMetadataStore(transactionCoordinatorID, - new MLTransactionLogImpl(transactionCoordinatorID, factory, - new ManagedLedgerConfig()), new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl()); + txnLog2, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl()); while (true) { if (checkReplayRetryCount > 6) { @@ -285,6 +290,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1); MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, new ManagedLedgerConfig()); + mlTransactionLog.initialize().join(); MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl()); @@ -347,6 +353,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1); MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, new ManagedLedgerConfig()); + mlTransactionLog.initialize().join(); MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl()); @@ -364,6 +371,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, new ManagedLedgerConfig()); + mlTransactionLog.initialize().join(); transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl()); @@ -381,6 +389,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1); MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, new ManagedLedgerConfig()); + mlTransactionLog.initialize().join(); MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
