massakam opened a new pull request #5139: [pulsar-broker] Fix deadlock when 
resetting cursor
URL: https://github.com/apache/pulsar/pull/5139
 
 
   ### Motivation
   
   A deadlock occurred in one of our production brokers:
   
   ```
   Found one Java-level deadlock:
   =============================
   "ForkJoinPool.commonPool-worker-0":
     waiting to lock monitor 0x00007f22fc01cfd8 (object 0x00007f25f7a706e0, a 
org.apache.pulsar.broker.service.persistent.PersistentSubscription),
     which is held by "pulsar-io-19-26"
   "pulsar-io-19-26":
     waiting to lock monitor 0x00007f1f9c12b578 (object 0x00007f25f7a2b350, a 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers),
     which is held by "bookkeeper-ml-workers-OrderedExecutor-4-0"
   "bookkeeper-ml-workers-OrderedExecutor-4-0":
     waiting to lock monitor 0x00007f22fc01cfd8 (object 0x00007f25f7a706e0, a 
org.apache.pulsar.broker.service.persistent.PersistentSubscription),
     which is held by "pulsar-io-19-26"
   
   Java stack information for the threads listed above:
   ===================================================
   "ForkJoinPool.commonPool-worker-0":
           at 
org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:100)
           - waiting to lock <0x00007f25f7a706e0> (a 
org.apache.pulsar.broker.service.persistent.PersistentSubscription)
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$subscribe$9(PersistentTopic.java:564)
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$415/1188846689.accept(Unknown
 Source)
           at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
           at 
java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:669)
           at 
java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:1997)
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:560)
           at 
org.apache.pulsar.broker.service.ServerCnx.lambda$null$9(ServerCnx.java:613)
           at 
org.apache.pulsar.broker.service.ServerCnx$$Lambda$380/1267675078.apply(Unknown 
Source)
           at 
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
           at 
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124)
           at 
org.apache.pulsar.broker.service.ServerCnx.lambda$null$12(ServerCnx.java:596)
           at 
org.apache.pulsar.broker.service.ServerCnx$$Lambda$378/1599958909.apply(Unknown 
Source)
           at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
           at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
           at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
           at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
           at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
           at 
java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
           at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
           at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
           at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
           at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
   "pulsar-io-19-26":
           at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.redeliverUnacknowledgedMessages(PersistentDispatcherMultipleConsumers.java:570)
           - waiting to lock <0x00007f25f7a2b350> (a 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
           at 
org.apache.pulsar.broker.service.persistent.PersistentSubscription.redeliverUnacknowledgedMessages(PersistentSubscription.java:656)
           - locked <0x00007f25f7a706e0> (a 
org.apache.pulsar.broker.service.persistent.PersistentSubscription)
           at 
org.apache.pulsar.broker.service.Consumer.redeliverUnacknowledgedMessages(Consumer.java:657)
           at 
org.apache.pulsar.broker.service.ServerCnx.handleRedeliverUnacknowledged(ServerCnx.java:1023)
           at 
org.apache.pulsar.common.api.PulsarDecoder.channelRead(PulsarDecoder.java:244)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
           at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
           at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
           at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
           at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
           at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1436)
           at 
io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1215)
           at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1249)
           at 
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502)
           at 
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441)
           at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
           at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
           at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
           at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
           at 
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
           at 
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:433)
           at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:330)
           at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
           at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
           at java.lang.Thread.run(Thread.java:748)
   "bookkeeper-ml-workers-OrderedExecutor-4-0":
           at 
org.apache.pulsar.broker.service.persistent.PersistentSubscription.removeConsumer(PersistentSubscription.java:144)
           - waiting to lock <0x00007f25f7a706e0> (a 
org.apache.pulsar.broker.service.persistent.PersistentSubscription)
           at org.apache.pulsar.broker.service.Consumer.close(Consumer.java:364)
           at 
org.apache.pulsar.broker.service.Consumer.disconnect(Consumer.java:372)
           at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers$$Lambda$1152/1337268762.accept(Unknown
 Source)
           at 
java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:891)
           at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.disconnectAllConsumers(PersistentDispatcherMultipleConsumers.java:362)
           - locked <0x00007f25f7a2b350> (a 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
           at 
org.apache.pulsar.broker.service.persistent.PersistentSubscription.resetCursor(PersistentSubscription.java:393)
           at 
org.apache.pulsar.broker.service.persistent.PersistentSubscription.access$100(PersistentSubscription.java:62)
           at 
org.apache.pulsar.broker.service.persistent.PersistentSubscription$5.findEntryComplete(PersistentSubscription.java:361)
           at 
org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findEntryComplete(PersistentMessageFinder.java:105)
           at 
org.apache.bookkeeper.mledger.impl.OpFindNewest.readEntryComplete(OpFindNewest.java:67)
           at 
org.apache.bookkeeper.mledger.impl.EntryCacheImpl.lambda$asyncReadEntry0$0(EntryCacheImpl.java:205)
           at 
org.apache.bookkeeper.mledger.impl.EntryCacheImpl$$Lambda$1039/1389130675.accept(Unknown
 Source)
           at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
           at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
           at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
           at java.lang.Thread.run(Thread.java:748)
   
   Found 1 deadlock.
   ```
   
   The `pulsar-io` thread locks the `PersistentSubscription` object and then 
tries to lock the `PersistentDispatcherMultipleConsumers` object in order to 
redeliver unacked messages. On the other hand, the 
`bookkeeper-ml-workers-OrderedExecutor` thread tries to lock the 
`PersistentSubscription` after locking the 
`PersistentDispatcherMultipleConsumers` object in order to reset the cursor. As 
a result, a deadlock occurs and both threads are blocked.
   
   ### Modifications
   
   Fixed `PersistentSubscription` to lock itself before locking the 
`Dispatcher` object when resetting the cursor.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to