massakam opened a new pull request #7841:
URL: https://github.com/apache/pulsar/pull/7841


   ### Motivation
   
   A deadlock occurred on our Pulsar 2.4.2 broker server. The cause is the 
following two threads:
   ```
   "prometheus-stats-36-1" #410 prio=5 os_prio=0 tid=0x00007f4b70019800 
nid=0x30ca waiting for monitor entry [0x00007f4bbe3b7000]
      java.lang.Thread.State: BLOCKED (on object monitor)
           at 
org.apache.pulsar.broker.service.persistent.PersistentSubscription.getNumberOfEntriesDelayed(PersistentSubscription.java:1013)
           - waiting to lock <0x00007f913d098dd0> (a 
org.apache.pulsar.broker.service.persistent.PersistentSubscription)
           at 
org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$getTopicStats$8(NamespaceStatsAggregator.java:129)
           at 
org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$523/2109257042.accept(Unknown
 Source)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
           at 
org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.getTopicStats(NamespaceStatsAggregator.java:122)
           at 
org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$null$0(NamespaceStatsAggregator.java:64)
           at 
org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$521/1017174654.accept(Unknown
 Source)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
           at 
org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$null$1(NamespaceStatsAggregator.java:63)
           at 
org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$520/1098830264.accept(Unknown
 Source)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
           at 
org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$generate$2(NamespaceStatsAggregator.java:62)
           at 
org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$316/212211274.accept(Unknown
 Source)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
           at 
org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.generate(NamespaceStatsAggregator.java:59)
           at 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator.generate(PrometheusMetricsGenerator.java:73)
           at 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet.lambda$doGet$0(PrometheusMetricsServlet.java:70)
           at 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet$$Lambda$315/1221766138.run(Unknown
 Source)
           at 
org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
           at 
org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
           at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
           at java.lang.Thread.run(Thread.java:748)
   
   "ForkJoinPool.commonPool-worker-104" #953 daemon prio=5 os_prio=0 
tid=0x00007f4dc8030800 nid=0x3b87 waiting on condition [0x00007f48f6ce1000]
      java.lang.Thread.State: WAITING (parking)
           at sun.misc.Unsafe.park(Native Method)
           - parking to wait for  <0x00007f913d08b5c8> (a 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
           at 
java.util.concurrent.locks.StampedLock.acquireRead(StampedLock.java:1215)
           at 
java.util.concurrent.locks.StampedLock.readLock(StampedLock.java:428)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:377)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.values(ConcurrentOpenHashMap.java:174)
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.getNumberOfConsumers(PersistentTopic.java:1227)
           at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.isConsumersExceededOnTopic(PersistentDispatcherMultipleConsumers.java:178)
           at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:144)
           - locked <0x00007f91120dc258> (a 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
           at 
org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:238)
           - locked <0x00007f913d098dd0> (a 
org.apache.pulsar.broker.service.persistent.PersistentSubscription)
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$subscribe$11(PersistentTopic.java:590)
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$451/1414070467.accept(Unknown
 Source)
           at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
           at 
java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
           at 
java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:584)
           at 
org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
           at 
org.apache.pulsar.broker.service.ServerCnx$$Lambda$408/1168861154.apply(Unknown 
Source)
           at 
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
           at 
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
           at 
org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
           at 
org.apache.pulsar.broker.service.ServerCnx$$Lambda$406/1351396211.apply(Unknown 
Source)
           at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
           at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
           at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
           at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
           at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
           at 
java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
           at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
           at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
           at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
           at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:163)
   ```
   
   `prometheus-stats-36-1` was trying to lock `PersistentSubscription` (and 
`PersistentDispatcherMultipleConsumers`) after locking `ConcurrentOpenHashMap`.
   
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java#L122-L129
   
   On the other hand, `ForkJoinPool.commonPool-worker-104` was trying to lock 
these instances in reverse order.
   
   Actually, `PersistentSubscription#getNumberOfEntriesDelayed()` is no longer 
used in the master code, but it seems that this deadlock has not yet been 
resolved.
   
https://github.com/apache/pulsar/blob/e06e8726847584700d9e4fc98fd56a495eb05a23/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1528-L1529
   
https://github.com/apache/pulsar/blob/17ae233a5d0fa364048b7c30ec90b8f7291d0d07/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L967-L1000
   
https://github.com/apache/pulsar/blob/6e7d1a83c3c2737610f01cb372f61e2b830a62f7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L809
   
   ### Modifications
   
   Moved the `isConsumersExceededOnTopic()` method to check the number of 
connected consumers when adding a consumer to the topic from `Dispatcher` to 
`AbstractTopic`. Avoid the deadlock mentioned above by calling 
`PersistentTopic#getNumberOfConsumers()` before locking the 
`PersistentSubscription` instance.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to