This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 14a731e88f46f5c565acafc1b9944b4e7a8293be
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Jul 2 18:56:14 2021 +0200

    Fixed possible deadlock in the initialization of MLTransactionLog (#11194)
    
    * Fixed possible deadlock in the initialization of MLTransactionLog
    
    * Fixed tests
    
    (cherry picked from commit 555042a45d9692148c302fbdedbb47f1f2572a48)
---
 .../broker/stats/ManagedLedgerMetricsTest.java     |  3 +-
 .../transaction/coordinator/TransactionLog.java    |  5 +++
 .../coordinator/impl/MLTransactionLogImpl.java     | 46 +++++++++++++++++++---
 .../impl/MLTransactionMetadataStoreProvider.java   | 17 +++-----
 .../MLTransactionMetadataStoreTest.java            | 13 +++++-
 5 files changed, 64 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
index 9d19e3e..98ac261 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
@@ -103,7 +103,8 @@ public class ManagedLedgerMetricsTest extends 
BrokerTestBase {
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
         managedLedgerConfig.setMaxEntriesPerLedger(2);
         new MLTransactionLogImpl(TransactionCoordinatorID.get(0),
-                pulsar.getManagedLedgerFactory(), managedLedgerConfig);
+                pulsar.getManagedLedgerFactory(), managedLedgerConfig)
+                .initialize().join();
         ManagedLedgerMetrics metrics = new ManagedLedgerMetrics(pulsar);
         metrics.generate();
     }
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLog.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLog.java
index 7f70c28..a752c3b 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLog.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLog.java
@@ -27,6 +27,11 @@ import 
org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
  */
 public interface TransactionLog {
 
+    /**
+     * Initialize the TransactionLog implementation
+     */
+    CompletableFuture<Void> initialize();
+
 
     /**
      * Replay transaction log to load the transaction map.
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index 4820805..4098cfd 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -52,11 +52,13 @@ public class MLTransactionLogImpl implements TransactionLog 
{
 
     private static final Logger log = 
LoggerFactory.getLogger(MLTransactionLogImpl.class);
 
-    private final ManagedLedger managedLedger;
+    private final ManagedLedgerFactory managedLedgerFactory;
+    private final ManagedLedgerConfig managedLedgerConfig;
+    private ManagedLedger managedLedger;
 
     public final static String TRANSACTION_LOG_PREFIX = "__transaction_log_";
 
-    private final ManagedCursor cursor;
+    private ManagedCursor cursor;
 
     public final static String TRANSACTION_SUBSCRIPTION_NAME = 
"transaction.subscription";
 
@@ -70,19 +72,51 @@ public class MLTransactionLogImpl implements TransactionLog 
{
 
     public MLTransactionLogImpl(TransactionCoordinatorID tcID,
                                 ManagedLedgerFactory managedLedgerFactory,
-                                ManagedLedgerConfig managedLedgerConfig) 
throws Exception {
+                                ManagedLedgerConfig managedLedgerConfig) {
         this.topicName = TopicName.get(TopicDomain.persistent.value(),
                 NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX + 
tcID.getId());
         this.tcId = tcID.getId();
         this.mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
         
managedLedgerConfig.setManagedLedgerInterceptor(this.mlTransactionLogInterceptor);
-        this.managedLedger = 
managedLedgerFactory.open(topicName.getPersistenceNamingEncoding(), 
managedLedgerConfig);
-        this.cursor =  managedLedger.openCursor(TRANSACTION_SUBSCRIPTION_NAME,
-                CommandSubscribe.InitialPosition.Earliest);
+        this.managedLedgerFactory = managedLedgerFactory;
+        this.managedLedgerConfig = managedLedgerConfig;
         this.entryQueue = new SpscArrayQueue<>(2000);
     }
 
     @Override
+    public CompletableFuture<Void> initialize() {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        
managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(),
+                managedLedgerConfig,
+                new AsyncCallbacks.OpenLedgerCallback() {
+                    @Override
+                    public void openLedgerComplete(ManagedLedger ledger, 
Object ctx) {
+                        MLTransactionLogImpl.this.managedLedger = ledger;
+
+                        
managedLedger.asyncOpenCursor(TRANSACTION_SUBSCRIPTION_NAME,
+                                CommandSubscribe.InitialPosition.Earliest, new 
AsyncCallbacks.OpenCursorCallback() {
+                                    @Override
+                                    public void 
openCursorComplete(ManagedCursor cursor, Object ctx) {
+                                        MLTransactionLogImpl.this.cursor = 
cursor;
+                                        future.complete(null);
+                                    }
+
+                                    @Override
+                                    public void 
openCursorFailed(ManagedLedgerException exception, Object ctx) {
+                                        
future.completeExceptionally(exception);
+                                    }
+                                }, null);
+                    }
+
+                    @Override
+                    public void openLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
+                        future.completeExceptionally(exception);
+                    }
+                }, null, null);
+        return future;
+    }
+
+    @Override
     public void replayAsync(TransactionLogReplayCallback 
transactionLogReplayCallback) {
         new TransactionLogReplayer(transactionLogReplayCallback).start();
     }
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
index bdf0d56..36b1958 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -23,6 +23,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionLog;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import 
org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
 import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
@@ -44,16 +45,10 @@ public class MLTransactionMetadataStoreProvider implements 
TransactionMetadataSt
                                                                  
ManagedLedgerConfig managedLedgerConfig,
                                                                  
TransactionTimeoutTracker timeoutTracker,
                                                                  
TransactionRecoverTracker recoverTracker) {
-        TransactionMetadataStore transactionMetadataStore;
-        try {
-            transactionMetadataStore =
-                    new MLTransactionMetadataStore(transactionCoordinatorId,
-                            new MLTransactionLogImpl(transactionCoordinatorId,
-                                    managedLedgerFactory, 
managedLedgerConfig), timeoutTracker, recoverTracker);
-        } catch (Exception e) {
-            log.error("MLTransactionMetadataStore init fail", e);
-            return FutureUtil.failedFuture(e);
-        }
-        return CompletableFuture.completedFuture(transactionMetadataStore);
+        MLTransactionLogImpl txnLog = new 
MLTransactionLogImpl(transactionCoordinatorId,
+                managedLedgerFactory, managedLedgerConfig);
+
+        return txnLog.initialize().thenApply(__ ->
+                new MLTransactionMetadataStore(transactionCoordinatorId, 
txnLog, timeoutTracker, recoverTracker));
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index 5316470..e9b3e0c 100644
--- 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -67,6 +67,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         TransactionCoordinatorID transactionCoordinatorID = new 
TransactionCoordinatorID(1);
         MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
                 new ManagedLedgerConfig());
+        mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl());
@@ -138,6 +139,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         managedLedgerConfig.setMaxEntriesPerLedger(3);
         MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig);
+        mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl());
@@ -181,6 +183,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         managedLedgerConfig.setMaxEntriesPerLedger(2);
         MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig);
+        mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl());
@@ -220,10 +223,12 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
 
                 transactionMetadataStore.closeAsync();
 
+                MLTransactionLogImpl txnLog2 = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
+                        new ManagedLedgerConfig());
+                txnLog2.initialize().join();
                 MLTransactionMetadataStore transactionMetadataStoreTest =
                         new 
MLTransactionMetadataStore(transactionCoordinatorID,
-                                new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
-                                        new ManagedLedgerConfig()), new 
TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
+                                txnLog2, new TransactionTimeoutTrackerImpl(), 
new TransactionRecoverTrackerImpl());
 
                 while (true) {
                     if (checkReplayRetryCount > 6) {
@@ -285,6 +290,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         TransactionCoordinatorID transactionCoordinatorID = new 
TransactionCoordinatorID(1);
         MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
                 new ManagedLedgerConfig());
+        mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl());
@@ -347,6 +353,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         TransactionCoordinatorID transactionCoordinatorID = new 
TransactionCoordinatorID(1);
         MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
                 new ManagedLedgerConfig());
+        mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl());
@@ -364,6 +371,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
 
         mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, 
factory,
                 new ManagedLedgerConfig());
+        mlTransactionLog.initialize().join();
         transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl());
@@ -381,6 +389,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         TransactionCoordinatorID transactionCoordinatorID = new 
TransactionCoordinatorID(1);
         MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
                 new ManagedLedgerConfig());
+        mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl());

Reply via email to