This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d30414  Fix deadlock by consumer and reader (#6728)
6d30414 is described below

commit 6d304140b6205c6a2e94ad34bfd3cc5d16aca5d1
Author: k2la <[email protected]>
AuthorDate: Tue Apr 14 09:11:51 2020 +0900

    Fix deadlock by consumer and reader (#6728)
    
    ### Motivation
    
    Broker servers were not able to connect clients when consumers and readers 
connected to broker servers at almost the same time.
    This happened in v2.4.2 and master branch.
    
    As the following threaddump at that time:
    ```
    "bookkeeper-ml-workers-OrderedExecutor-5-0" #52 prio=5 os_prio=0 
tid=0x00007ff425fd0800 nid=0x28bf waiting on condition [0x00007ff3478f6000]
       java.lang.Thread.State: WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for  <0x0000000750c51a00> (a 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
            at 
java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
            at 
java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
            at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
            at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
            at 
org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:638)
            at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:712)
            - locked <0x0000000750c53a00> (a 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
            at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:631)
            at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:578)
            at 
org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
            at 
org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown 
Source)
            at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
            at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
            at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
            at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
            at 
org.apache.pulsar.broker.service.BrokerService$2.lambda$openLedgerComplete$1(BrokerService.java:687)
            at 
org.apache.pulsar.broker.service.BrokerService$2$$Lambda$229/1013432130.run(Unknown
 Source)
            at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
            at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
            at 
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
            at 
org.apache.pulsar.broker.service.BrokerService$2.openLedgerComplete(BrokerService.java:680)
            at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$7(ManagedLedgerFactoryImpl.java:328)
            at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$184/272111809.accept(Unknown
 Source)
            at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
            at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
            at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
            at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
            at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:316)
            at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3$1.operationComplete(ManagedLedgerImpl.java:464)
            at 
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:276)
            at 
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:249)
            at 
org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$7(MetaStoreImplZookeeper.java:241)
            at 
org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper$$Lambda$584/1125537287.run(Unknown
 Source)
            at 
org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
            at 
org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.lang.Thread.run(Thread.java:748)
    
    ...
    
    "ForkJoinPool.commonPool-worker-36" #1043 daemon prio=5 os_prio=0 
tid=0x00007ff34c0ce800 nid=0x26f2 waiting for monitor entry [0x00007ff32d2eb000]
       java.lang.Thread.State: BLOCKED (on object monitor)
            at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:856)
            - waiting to lock <0x0000000750c53a00> (a 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
            at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:684)
            at 
org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$572/174683985.apply(Unknown
 Source)
            at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274)
            at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
            at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:667)
            at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:579)
            at 
org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
            at 
org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown 
Source)
            at 
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
            at 
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
            at 
org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
            at 
org.apache.pulsar.broker.service.ServerCnx$$Lambda$458/375938934.apply(Unknown 
Source)
            at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
            at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
            at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
            at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
            at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
            at 
java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
            at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
            at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
            at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
            at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
    ```
    `PersistentTopic#getDurableSubscription` locked  `ConcurrentOpenHashMap` 
after locking `ManagedLedgerImpl`.
    ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
    
    On the other hand, `PersistentTopic#getNonDurableSubscription` tried to 
lock `ManagedLedgerImpl` after trying to lock `ConcurrentOpenHashMap`.
    ( `ConcurrentOpenHashMap` => `ManagedLedgerImpl`)
    
    So, it seems that deadlock happens.
    
    ### Modifications
    Fixed as `PersistentTopic#getNonDurableSubscription` try to lock 
`ConcurrentOpenHashMap` after trying to lock `ManagedLedgerImpl`. ( 
`ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
---
 .../broker/service/persistent/PersistentTopic.java | 76 +++++++++++-----------
 1 file changed, 39 insertions(+), 37 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1b2822f..c5c5cee 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -692,49 +692,51 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         CompletableFuture<Subscription> subscriptionFuture = new 
CompletableFuture<>();
         log.info("[{}][{}] Creating non-durable subscription at msg id {}", 
topic, subscriptionName, startMessageId);
 
-        // Create a new non-durable cursor only for the first consumer that 
connects
-        Subscription subscription = 
subscriptions.computeIfAbsent(subscriptionName, name -> {
-            MessageIdImpl msgId = startMessageId != null ? (MessageIdImpl) 
startMessageId
-                    : (MessageIdImpl) MessageId.latest;
-
-            long ledgerId = msgId.getLedgerId();
-            long entryId = msgId.getEntryId();
-            if (ledgerId >= 0
-                && msgId instanceof BatchMessageIdImpl) {
-                // When the start message is relative to a batch, we need to 
take one step back on the previous message,
-                // because the "batch" might not have been consumed in its 
entirety.
-                // The client will then be able to discard the first messages 
if needed.
-                entryId = msgId.getEntryId() - 1;
-            }
+        synchronized (ledger) {
+            // Create a new non-durable cursor only for the first consumer 
that connects
+            Subscription subscription = 
subscriptions.computeIfAbsent(subscriptionName, name -> {
+                MessageIdImpl msgId = startMessageId != null ? (MessageIdImpl) 
startMessageId
+                        : (MessageIdImpl) MessageId.latest;
+
+                long ledgerId = msgId.getLedgerId();
+                long entryId = msgId.getEntryId();
+                if (ledgerId >= 0
+                        && msgId instanceof BatchMessageIdImpl) {
+                    // When the start message is relative to a batch, we need 
to take one step back on the previous message,
+                    // because the "batch" might not have been consumed in its 
entirety.
+                    // The client will then be able to discard the first 
messages if needed.
+                    entryId = msgId.getEntryId() - 1;
+                }
 
-            Position startPosition = new PositionImpl(ledgerId, entryId);
-            ManagedCursor cursor = null;
-            try {
-                cursor = ledger.newNonDurableCursor(startPosition, 
subscriptionName);
-            } catch (ManagedLedgerException e) {
-                subscriptionFuture.completeExceptionally(e);
-            }
+                Position startPosition = new PositionImpl(ledgerId, entryId);
+                ManagedCursor cursor = null;
+                try {
+                    cursor = ledger.newNonDurableCursor(startPosition, 
subscriptionName);
+                } catch (ManagedLedgerException e) {
+                    subscriptionFuture.completeExceptionally(e);
+                }
 
-            return new PersistentSubscription(this, subscriptionName, cursor, 
false);
-        });
+                return new PersistentSubscription(this, subscriptionName, 
cursor, false);
+            });
 
-        if (!subscriptionFuture.isDone()) {
-            if (startMessageRollbackDurationSec > 0) {
-                long timestamp = System.currentTimeMillis() - 
TimeUnit.SECONDS.toMillis(startMessageRollbackDurationSec);
-                subscription.resetCursor(timestamp).handle((s, ex) -> {
-                    if (ex != null) {
-                        log.warn("[{}] Failed to reset cursor {} position at 
timestamp {}", topic, subscriptionName,
-                                startMessageRollbackDurationSec);
-                    }
+            if (!subscriptionFuture.isDone()) {
+                if (startMessageRollbackDurationSec > 0) {
+                    long timestamp = System.currentTimeMillis() - 
TimeUnit.SECONDS.toMillis(startMessageRollbackDurationSec);
+                    subscription.resetCursor(timestamp).handle((s, ex) -> {
+                        if (ex != null) {
+                            log.warn("[{}] Failed to reset cursor {} position 
at timestamp {}", topic, subscriptionName,
+                                    startMessageRollbackDurationSec);
+                        }
+                        subscriptionFuture.complete(subscription);
+                        return null;
+                    });
+                } else {
                     subscriptionFuture.complete(subscription);
-                    return null;
-                });
+                }
             } else {
-                subscriptionFuture.complete(subscription);
+                // failed to initialize managed-cursor: clean up created 
subscription
+                subscriptions.remove(subscriptionName);
             }
-        } else {
-            // failed to initialize managed-cursor: clean up created 
subscription
-            subscriptions.remove(subscriptionName);
         }
 
         return subscriptionFuture;

Reply via email to