massakam opened a new pull request #8877:
URL: https://github.com/apache/pulsar/pull/8877


   ### Motivation
   
   Some of our broker servers experienced what appears to be a deadlock. The 
following is the thread dump at that time.
   
   
[threaddump.txt.zip](https://github.com/apache/pulsar/files/5665572/threaddump.txt.zip)
   
   The thread "ForkJoinPool.commonPool-worker-120" was locking an instance of 
`ManagedLedgerImpl`. And this thread seemed to be waiting for `subscriptions`, 
which is an instance of `ConcurrentOpenHashMap`, to be unlocked. Many other 
threads were blocked because the lock on the `ManagedLedgerImpl` instance was 
not released.
   
   ```
   "ForkJoinPool.commonPool-worker-120" #903 daemon prio=5 os_prio=0 
tid=0x00007f9aa0010000 nid=0x12b59 waiting on condition [0x00007f9528cc3000]
      java.lang.Thread.State: WAITING (parking)
           at sun.misc.Unsafe.park(Native Method)
           - parking to wait for  <0x00007fa20b3e5eb0> (a 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
           at 
java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
           at 
java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:650)
           at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:720)
           - locked <0x00007fa20512f968> (a 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:643)
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:590)
           at 
org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
           at 
org.apache.pulsar.broker.service.ServerCnx$$Lambda$476/1880414247.apply(Unknown 
Source)
           at 
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
           at 
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
           at 
org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
           at 
org.apache.pulsar.broker.service.ServerCnx$$Lambda$475/707554512.apply(Unknown 
Source)
           at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
           at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
           at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
           at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
           at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
           at 
java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
           at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
           at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
           at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
           at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
   ```
   
   The thread that locked `subscriptions` seems to be 
"pulsar-msg-expiry-monitor-24-1".
   ```
   "pulsar-msg-expiry-monitor-24-1" #304 prio=5 os_prio=0 
tid=0x00007f99602dd000 nid=0x12036 waiting on condition [0x00007f998d47c000]
      java.lang.Thread.State: TIMED_WAITING (parking)
           at sun.misc.Unsafe.park(Native Method)
           - parking to wait for  <0x00007fca4361dfb0> (a 
java.util.concurrent.CountDownLatch$Sync)
           at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
           at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
           at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
           at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
           at 
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.getNthEntry(ManagedCursorImpl.java:537)
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.isOldestMessageExpired(PersistentTopic.java:1820)
           at 
org.apache.pulsar.broker.service.persistent.PersistentSubscription.expireMessages(PersistentSubscription.java:901)
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$checkMessageExpiry$36(PersistentTopic.java:1102)
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$1011/2104832020.accept(Unknown
 Source)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.checkMessageExpiry(PersistentTopic.java:1102)
           at 
org.apache.pulsar.broker.service.BrokerService$$Lambda$1009/2005752676.accept(Unknown
 Source)
           at 
org.apache.pulsar.broker.service.BrokerService.lambda$forEachTopic$32(BrokerService.java:951)
           at 
org.apache.pulsar.broker.service.BrokerService$$Lambda$779/1852910990.accept(Unknown
 Source)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
           at 
org.apache.pulsar.broker.service.BrokerService.forEachTopic(BrokerService.java:948)
           at 
org.apache.pulsar.broker.service.BrokerService.checkMessageExpiry(BrokerService.java:925)
           at 
org.apache.pulsar.broker.service.BrokerService$$Lambda$108/203149502.run(Unknown
 Source)
           at 
org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
           at 
org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
           at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
           at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
           at java.lang.Thread.run(Thread.java:748)
   ```
   
   I didn't understand why "pulsar-msg-expiry-monitor-24-1" was stopped. 
However, it seems that this deadlock can be avoided if `subscriptions` is not 
locked when checking for message expiration, so I created this PR. If anyone 
understands why "pulsar-msg-expiry-monitor-24-1" was stopped, please let me 
know.
   
   ### Modifications
   
   When expiring messages for each subscription, copy the values of 
`subscriptions` as `List` and execute `forEach()` for that `List` instance.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to