rdhabalia opened a new issue, #22840:
URL: https://github.com/apache/pulsar/issues/22840

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [X] I understand that unsupported versions don't get bug fixes. I will 
attempt to reproduce the issue on a supported version of Pulsar client and 
Pulsar broker.
   
   
   ### Version
   
   > 2.10
   
   ### Minimal reproduce step
   
   1. Restart Broker
   2. Introduce race condition  
[BookieRackAffinityMapping](https://github.com/apache/pulsar/blob/branch-3.3/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java#L122)
 with its blocking call which locks managed-ledger opening execution path at 
[ManagedLedgerFactoryImpl](https://github.com/apache/pulsar/blob/branch-3.3/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java#L377)
 
   
   ```
   "pulsar-io-4-32" #166 prio=5 os_prio=0 cpu=429.42ms elapsed=701.77s 
tid=0x00007facc80314b0 nid=0x3881 waiting on condition  [0x00007facc73f4000]
      java.lang.Thread.State: WAITING (parking)
           at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
           - parking to wait for  <0x00001000f838ac60> (a 
java.util.concurrent.CompletableFuture$Signaller)
           at 
java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
           at 
java.util.concurrent.CompletableFuture$Signaller.block([email protected]/CompletableFuture.java:1796)
           at 
java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3128)
           at 
java.util.concurrent.CompletableFuture.waitingGet([email protected]/CompletableFuture.java:1823)
           at 
java.util.concurrent.CompletableFuture.get([email protected]/CompletableFuture.java:1998)
           at 
org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.setConf(BookieRackAffinityMapping.java:121)
           - locked <0x00001000f8389698> (a 
org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping)
           at 
org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.initialize(RackawareEnsemblePlacementPolicyImpl.java:265)
           at 
org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy.initialize(IsolatedBookieEnsemblePlacementPolicy.java:105)
           at 
org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy.initialize(IsolatedBookieEnsemblePlacementPolicy.java:51)
           at 
org.apache.bookkeeper.client.BookKeeper.initializeEnsemblePlacementPolicy(BookKeeper.java:581)
           at 
org.apache.bookkeeper.client.BookKeeper.<init>(BookKeeper.java:505)
           at 
org.apache.bookkeeper.client.BookKeeper$Builder.build(BookKeeper.java:306)
           at 
org.apache.pulsar.broker.BookKeeperClientFactoryImpl.create(BookKeeperClientFactoryImpl.java:87)
           at 
org.apache.pulsar.broker.ManagedLedgerClientFactory.lambda$initialize$0(ManagedLedgerClientFactory.java:95)
           at 
org.apache.pulsar.broker.ManagedLedgerClientFactory$$Lambda$1006/0x00007facb41b1440.apply(Unknown
 Source)
           at 
java.util.concurrent.ConcurrentHashMap.computeIfAbsent([email protected]/ConcurrentHashMap.java:1705)
           - locked <0x00001000f3c03a28> (a 
java.util.concurrent.ConcurrentHashMap$ReservationNode)
           at 
org.apache.pulsar.broker.ManagedLedgerClientFactory.lambda$initialize$1(ManagedLedgerClientFactory.java:93)
           at 
org.apache.pulsar.broker.ManagedLedgerClientFactory$$Lambda$202/0x00007fae1e73e9b0.get(Unknown
 Source)
           at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$6(ManagedLedgerFactoryImpl.java:369)
           at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$660/0x00007facb91ecd60.apply(Unknown
 Source)
           at 
java.util.concurrent.ConcurrentHashMap.computeIfAbsent([email protected]/ConcurrentHashMap.java:1705)
           - locked <0x00001000f4a02bc0> (a 
java.util.concurrent.ConcurrentHashMap$ReservationNode)
           at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.asyncOpen(ManagedLedgerFactoryImpl.java:365)
           at 
org.apache.pulsar.broker.service.BrokerService.lambda$createPersistentTopic$66(BrokerService.java:1533)
           at 
org.apache.pulsar.broker.service.BrokerService$$Lambda$644/0x00007facb91c8040.accept(Unknown
 Source)
           at 
java.util.concurrent.CompletableFuture.uniAcceptNow([email protected]/CompletableFuture.java:753)
           at 
java.util.concurrent.CompletableFuture.uniAcceptStage([email protected]/CompletableFuture.java:731)
           at 
java.util.concurrent.CompletableFuture.thenAccept([email protected]/CompletableFuture.java:2108)
           at 
org.apache.pulsar.broker.service.BrokerService.createPersistentTopic(BrokerService.java:1514)
           at 
org.apache.pulsar.broker.service.BrokerService.lambda$checkOwnershipAndCreatePersistentTopic$60(BrokerService.java:1472)
           at 
org.apache.pulsar.broker.service.BrokerService$$Lambda$634/0x00007facb91c6968.accept(Unknown
 Source)
           at 
java.util.concurrent.CompletableFuture.uniAcceptNow([email protected]/CompletableFuture.java:753)
           at 
java.util.concurrent.CompletableFuture.uniAcceptStage([email protected]/CompletableFuture.java:731)
           at 
java.util.concurrent.CompletableFuture.thenAccept([email protected]/CompletableFuture.java:2108)
           at 
org.apache.pulsar.broker.service.BrokerService.checkOwnershipAndCreatePersistentTopic(BrokerService.java:1470)
           at 
org.apache.pulsar.broker.service.BrokerService.lambda$loadOrCreatePersistentTopic$57(BrokerService.java:1437)
           at 
org.apache.pulsar.broker.service.BrokerService$$Lambda$492/0x00007facc317a960.run(Unknown
 Source)
           at 
java.util.concurrent.CompletableFuture.uniRunNow([email protected]/CompletableFuture.java:815)
           at 
java.util.concurrent.CompletableFuture.uniRunStage([email protected]/CompletableFuture.java:799)
           at 
java.util.concurrent.CompletableFuture.thenRun([email protected]/CompletableFuture.java:2121)
           at 
org.apache.pulsar.broker.service.BrokerService.loadOrCreatePersistentTopic(BrokerService.java:1433)
           at 
org.apache.pulsar.broker.service.BrokerService.lambda$getTopic$31(BrokerService.java:1014)
           at 
org.apache.pulsar.broker.service.BrokerService$$Lambda$476/0x00007facc3057108.apply(Unknown
 Source)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:404)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:238)
           at 
org.apache.pulsar.broker.service.BrokerService.getTopic(BrokerService.java:1013)
           at 
org.apache.pulsar.broker.service.BrokerService.getTopic(BrokerService.java:978)
           at 
org.apache.pulsar.broker.service.BrokerService.lambda$getOrCreateTopic$29(BrokerService.java:973)
   ```
   
   
   3. Same time metadata-store callback thread waits on the same lock while 
loading topic and opening managed-ledgerand all other IO threads wait on the 
lock taken by metadatastore callback thread
   
   ```
   "metadata-store-10-1" #25 prio=5 os_prio=0 cpu=7574.38ms elapsed=703.54s 
tid=0x00007fafdd884ed0 nid=0x37eb waiting for monitor entry  
[0x00007fae8f9d3000]
      java.lang.Thread.State: BLOCKED (on object monitor)
           at 
java.util.concurrent.ConcurrentHashMap.computeIfAbsent([email protected]/ConcurrentHashMap.java:1723)
           - waiting to lock <0x00001000f4a02bc0> (a 
java.util.concurrent.ConcurrentHashMap$ReservationNode)
           at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.asyncOpen(ManagedLedgerFactoryImpl.java:365)
           at 
org.apache.pulsar.broker.service.BrokerService.lambda$createPersistentTopic$66(BrokerService.java:1533)
           at 
org.apache.pulsar.broker.service.BrokerService$$Lambda$644/0x00007facb91c8040.accept(Unknown
 Source)
           at 
java.util.concurrent.CompletableFuture.uniAcceptNow([email protected]/CompletableFuture.java:753)
           at 
java.util.concurrent.CompletableFuture.uniAcceptStage([email protected]/CompletableFuture.java:731)
           at 
java.util.concurrent.CompletableFuture.thenAccept([email protected]/CompletableFuture.java:2108)
           at 
org.apache.pulsar.broker.service.BrokerService.createPersistentTopic(BrokerService.java:1514)
           at 
org.apache.pulsar.broker.service.BrokerService.lambda$checkOwnershipAndCreatePersistentTopic$60(BrokerService.java:1472)
           at 
org.apache.pulsar.broker.service.BrokerService$$Lambda$634/0x00007facb91c6968.accept(Unknown
 Source)
           at 
java.util.concurrent.CompletableFuture.uniAcceptNow([email protected]/CompletableFuture.java:753)
           at 
java.util.concurrent.CompletableFuture.uniAcceptStage([email protected]/CompletableFuture.java:731)
           at 
java.util.concurrent.CompletableFuture.thenAccept([email protected]/CompletableFuture.java:2108)
           at 
org.apache.pulsar.broker.service.BrokerService.checkOwnershipAndCreatePersistentTopic(BrokerService.java:1470)
           at 
org.apache.pulsar.broker.service.BrokerService.lambda$loadOrCreatePersistentTopic$57(BrokerService.java:1437)
           at 
org.apache.pulsar.broker.service.BrokerService$$Lambda$492/0x00007facc317a960.run(Unknown
 Source)
           at 
java.util.concurrent.CompletableFuture$UniRun.tryFire([email protected]/CompletableFuture.java:783)
           at 
java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506)
           at 
java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2073)
           at 
org.apache.pulsar.metadata.coordination.impl.LockManagerImpl.lambda$acquireLock$1(LockManagerImpl.java:105)
           at 
org.apache.pulsar.metadata.coordination.impl.LockManagerImpl$$Lambda$613/0x00007facbf1d9d08.run(Unknown
 Source)
           at 
java.util.concurrent.CompletableFuture$UniRun.tryFire([email protected]/CompletableFuture.java:783)
           at 
java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506)
           at 
java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2073)
           at 
org.apache.pulsar.metadata.coordination.impl.ResourceLockImpl.lambda$acquire$2(ResourceLockImpl.java:128)
           at 
org.apache.pulsar.metadata.coordination.impl.ResourceLockImpl$$Lambda$611/0x00007facbdc40040.run(Unknown
 Source)
           at 
java.util.concurrent.CompletableFuture$UniRun.tryFire([email protected]/CompletableFuture.java:783)
           at 
java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506)
           at 
java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2073)
           at 
org.apache.pulsar.metadata.coordination.impl.ResourceLockImpl.lambda$acquireWithNoRevalidation$6(ResourceLockImpl.java:167)
           at 
org.apache.pulsar.metadata.coordination.impl.ResourceLockImpl$$Lambda$609/0x00007facbdc42908.accept(Unknown
 Source)
           at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire([email protected]/CompletableFuture.java:714)
           at 
java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506)
           at 
java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2073)
           at 
org.apache.pulsar.metadata.impl.ZKMetadataStore.handlePutResult(ZKMetadataStore.java:225)
           at 
org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$7(ZKMetadataStore.java:182)
           at 
org.apache.pulsar.metadata.impl.ZKMetadataStore$$Lambda$160/0x00007fae8db950b0.run(Unknown
 Source)
           at 
java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:515)
           at 
java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
           at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run([email protected]/ScheduledThreadPoolExecutor.java:304)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1128)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:628)
           at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
           at java.lang.Thread.run([email protected]/Thread.java:829)
   ```
   
   4. that will create deadlock across all threads into broker and broker stop 
responding and serving
   
   ```
   "pulsar-io-4-4" #138 prio=5 os_prio=0 cpu=724.17ms elapsed=702.07s 
tid=0x00007facc800c320 nid=0x3860 waiting on condition  [0x00007fad3dc8c000]
      java.lang.Thread.State: WAITING (parking)
           at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
           - parking to wait for  <0x000010001b7478f8> (a 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
           at 
java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
           at 
java.util.concurrent.locks.StampedLock.acquireRead([email protected]/StampedLock.java:1429)
           at 
java.util.concurrent.locks.StampedLock.readLock([email protected]/StampedLock.java:539)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.get(ConcurrentOpenHashMap.java:349)
           at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.get(ConcurrentOpenHashMap.java:213)
           at 
org.apache.pulsar.broker.service.BrokerService.getTopic(BrokerService.java:984)
           at 
org.apache.pulsar.broker.service.BrokerService.getTopic(BrokerService.java:978)
           at 
org.apache.pulsar.broker.service.BrokerService.lambda$getOrCreateTopic$29(BrokerService.java:973)
           at 
org.apache.pulsar.broker.service.BrokerService$$Lambda$516/0x00007facc32a20b0.apply(Unknown
 Source)
           at 
java.util.concurrent.CompletableFuture.uniComposeStage([email protected]/CompletableFuture.java:1106)
           at 
java.util.concurrent.CompletableFuture.thenCompose([email protected]/CompletableFuture.java:2235)
           at 
org.apache.pulsar.broker.service.BrokerService.getOrCreateTopic(BrokerService.java:973)
           at 
org.apache.pulsar.broker.service.ServerCnx.lambda$handleProducer$30(ServerCnx.java:1304)
           at 
org.apache.pulsar.broker.service.ServerCnx$$Lambda$515/0x00007facc32a5440.apply(Unknown
 Source)
           at 
java.util.concurrent.CompletableFuture.uniApplyNow([email protected]/CompletableFuture.java:680)
           at 
java.util.concurrent.CompletableFuture.uniApplyStage([email protected]/CompletableFuture.java:658)
           at 
java.util.concurrent.CompletableFuture.thenApply([email protected]/CompletableFuture.java:2094)
           at 
org.apache.pulsar.broker.service.ServerCnx.handleProducer(ServerCnx.java:1260)
           at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:195)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
           at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
           at 
io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:202)
           at 
io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:164)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
           at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
           at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
           at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
           at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
           at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1383)
           at 
io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1257)
           at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1297)
           at 
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
           at 
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
           at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
           at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
           at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
           at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
           at 
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
   ```
   ```
   
   ### What did you expect to see?
   
   This issue must be solved by addressing three different problems in the 
execution path
   
   **1. Bottleneck at SINGLE metadatastore callback thread**
   having single metadatastore callback can easily cause deadlock in any 
regression bug. Therefore, we must have multiple or same number of callback 
thread as number of IO thread so, callback thread doesn't face bottleneck and 
don't cause any deadlock in the system.
   
   2. Make BookKeeper client creation async
   BookKeeper client creation path has a blocking call and blocks 
managed-ledger creation, and that eventually causes the deadlock. therefore, 
managed-ledger creation must create bk-client asynchronously.
   
   3. Prevent forever wait on bk-client blocking creation
   Currently Bk-client has multiple blocking call with metadatastore  and it 
doesn't have timeout. introducing timeout and retry helps bk-client to break 
the deadlock and successfully complete the execution path.
   
   ### What did you see instead?
   
   deadlock
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to