massakam opened a new pull request #7841:
URL: https://github.com/apache/pulsar/pull/7841
### Motivation
A deadlock occurred on our Pulsar 2.4.2 broker server. The cause is the
following two threads:
```
"prometheus-stats-36-1" #410 prio=5 os_prio=0 tid=0x00007f4b70019800
nid=0x30ca waiting for monitor entry [0x00007f4bbe3b7000]
java.lang.Thread.State: BLOCKED (on object monitor)
at
org.apache.pulsar.broker.service.persistent.PersistentSubscription.getNumberOfEntriesDelayed(PersistentSubscription.java:1013)
- waiting to lock <0x00007f913d098dd0> (a
org.apache.pulsar.broker.service.persistent.PersistentSubscription)
at
org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$getTopicStats$8(NamespaceStatsAggregator.java:129)
at
org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$523/2109257042.accept(Unknown
Source)
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
at
org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.getTopicStats(NamespaceStatsAggregator.java:122)
at
org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$null$0(NamespaceStatsAggregator.java:64)
at
org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$521/1017174654.accept(Unknown
Source)
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
at
org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$null$1(NamespaceStatsAggregator.java:63)
at
org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$520/1098830264.accept(Unknown
Source)
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
at
org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$generate$2(NamespaceStatsAggregator.java:62)
at
org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$316/212211274.accept(Unknown
Source)
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
at
org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.generate(NamespaceStatsAggregator.java:59)
at
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator.generate(PrometheusMetricsGenerator.java:73)
at
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet.lambda$doGet$0(PrometheusMetricsServlet.java:70)
at
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet$$Lambda$315/1221766138.run(Unknown
Source)
at
org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
at
org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
"ForkJoinPool.commonPool-worker-104" #953 daemon prio=5 os_prio=0
tid=0x00007f4dc8030800 nid=0x3b87 waiting on condition [0x00007f48f6ce1000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00007f913d08b5c8> (a
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
at
java.util.concurrent.locks.StampedLock.acquireRead(StampedLock.java:1215)
at
java.util.concurrent.locks.StampedLock.readLock(StampedLock.java:428)
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:377)
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.values(ConcurrentOpenHashMap.java:174)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.getNumberOfConsumers(PersistentTopic.java:1227)
at
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.isConsumersExceededOnTopic(PersistentDispatcherMultipleConsumers.java:178)
at
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:144)
- locked <0x00007f91120dc258> (a
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
at
org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:238)
- locked <0x00007f913d098dd0> (a
org.apache.pulsar.broker.service.persistent.PersistentSubscription)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$subscribe$11(PersistentTopic.java:590)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$451/1414070467.accept(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
at
java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
at
java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:584)
at
org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
at
org.apache.pulsar.broker.service.ServerCnx$$Lambda$408/1168861154.apply(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
at
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
at
org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
at
org.apache.pulsar.broker.service.ServerCnx$$Lambda$406/1351396211.apply(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at
java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:163)
```
`prometheus-stats-36-1` was trying to lock `PersistentSubscription` (and
`PersistentDispatcherMultipleConsumers`) after locking `ConcurrentOpenHashMap`.
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java#L122-L129
On the other hand, `ForkJoinPool.commonPool-worker-104` was trying to lock
these instances in reverse order.
Actually, `PersistentSubscription#getNumberOfEntriesDelayed()` is no longer
used in the master code, but it seems that this deadlock has not yet been
resolved.
https://github.com/apache/pulsar/blob/e06e8726847584700d9e4fc98fd56a495eb05a23/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1528-L1529
https://github.com/apache/pulsar/blob/17ae233a5d0fa364048b7c30ec90b8f7291d0d07/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L967-L1000
https://github.com/apache/pulsar/blob/6e7d1a83c3c2737610f01cb372f61e2b830a62f7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L809
### Modifications
Moved the `isConsumersExceededOnTopic()` method to check the number of
connected consumers when adding a consumer to the topic from `Dispatcher` to
`AbstractTopic`. Avoid the deadlock mentioned above by calling
`PersistentTopic#getNumberOfConsumers()` before locking the
`PersistentSubscription` instance.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]