k2la opened a new pull request #6728: [pulsar-broker] Fix Deadlock by Consumer 
and Reader
URL: https://github.com/apache/pulsar/pull/6728
 
 
   ### Motivation
   
   Broker servers were not able to connect clients when consumers and readers 
connected to broker servers at almost the same time. 
   This happened in v2.4.2 and master branch. 
   
   As the following threaddump at that time:
   ```
   "bookkeeper-ml-workers-OrderedExecutor-5-0" #52 prio=5 os_prio=0 
tid=0x00007ff425fd0800 nid=0x28bf waiting on condition [0x00007ff3478f6000]
      java.lang.Thread.State: WAITING (parking)
           at sun.misc.Unsafe.park(Native Method)
           - parking to wait for  <0x0000000750c51a00> (a 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
           at 
java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
           at 
java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:638)
           at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:712)
           - locked <0x0000000750c53a00> (a 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:631)
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:578)
           at 
org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
           at 
org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown 
Source)
           at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
           at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
           at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
           at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
           at 
org.apache.pulsar.broker.service.BrokerService$2.lambda$openLedgerComplete$1(BrokerService.java:687)
           at 
org.apache.pulsar.broker.service.BrokerService$2$$Lambda$229/1013432130.run(Unknown
 Source)
           at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
           at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
           at 
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
           at 
org.apache.pulsar.broker.service.BrokerService$2.openLedgerComplete(BrokerService.java:680)
           at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$7(ManagedLedgerFactoryImpl.java:328)
           at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$184/272111809.accept(Unknown
 Source)
           at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
           at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
           at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
           at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
           at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:316)
           at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3$1.operationComplete(ManagedLedgerImpl.java:464)
           at 
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:276)
           at 
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:249)
           at 
org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$7(MetaStoreImplZookeeper.java:241)
           at 
org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper$$Lambda$584/1125537287.run(Unknown
 Source)
           at 
org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
           at 
org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
           at java.lang.Thread.run(Thread.java:748)
   
   ...
   
   "ForkJoinPool.commonPool-worker-36" #1043 daemon prio=5 os_prio=0 
tid=0x00007ff34c0ce800 nid=0x26f2 waiting for monitor entry [0x00007ff32d2eb000]
      java.lang.Thread.State: BLOCKED (on object monitor)
           at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:856)
           - waiting to lock <0x0000000750c53a00> (a 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:684)
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$572/174683985.apply(Unknown
 Source)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:667)
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:579)
           at 
org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
           at 
org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown 
Source)
           at 
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
           at 
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
           at 
org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
           at 
org.apache.pulsar.broker.service.ServerCnx$$Lambda$458/375938934.apply(Unknown 
Source)
           at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
           at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
           at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
           at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
           at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
           at 
java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
           at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
           at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
           at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
           at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
   ``` 
   `PersistentTopic#getDurableSubscription` locked  `ConcurrentOpenHashMap` 
after locking `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => 
`ConcurrentOpenHashMap`)
   On the other hand, `PersistentTopic#getNonDurableSubscription` tried to lock 
`ManagedLedgerImpl` after trying to lock `ConcurrentOpenHashMap`. ( 
`ConcurrentOpenHashMap` => `ManagedLedgerImpl`)
   So, it seems that deadlock happens.
   
   ### Modifications
   Fixed as `PersistentTopic#getNonDurableSubscription` try to lock 
`ConcurrentOpenHashMap` after trying to lock `ManagedLedgerImpl`. ( 
`ManagedLedgerImpl` => `ConcurrentOpenHashMap`)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to