rdhabalia opened a new issue, #22840: URL: https://github.com/apache/pulsar/issues/22840
### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [X] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### Version > 2.10 ### Minimal reproduce step 1. Restart Broker 2. Introduce race condition [BookieRackAffinityMapping](https://github.com/apache/pulsar/blob/branch-3.3/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java#L122) with its blocking call which locks managed-ledger opening execution path at [ManagedLedgerFactoryImpl](https://github.com/apache/pulsar/blob/branch-3.3/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java#L377) ``` "pulsar-io-4-32" #166 prio=5 os_prio=0 cpu=429.42ms elapsed=701.77s tid=0x00007facc80314b0 nid=0x3881 waiting on condition [0x00007facc73f4000] java.lang.Thread.State: WAITING (parking) at jdk.internal.misc.Unsafe.park([email protected]/Native Method) - parking to wait for <0x00001000f838ac60> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194) at java.util.concurrent.CompletableFuture$Signaller.block([email protected]/CompletableFuture.java:1796) at java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3128) at java.util.concurrent.CompletableFuture.waitingGet([email protected]/CompletableFuture.java:1823) at java.util.concurrent.CompletableFuture.get([email protected]/CompletableFuture.java:1998) at org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.setConf(BookieRackAffinityMapping.java:121) - locked <0x00001000f8389698> (a org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping) at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.initialize(RackawareEnsemblePlacementPolicyImpl.java:265) at org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy.initialize(IsolatedBookieEnsemblePlacementPolicy.java:105) at org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy.initialize(IsolatedBookieEnsemblePlacementPolicy.java:51) at org.apache.bookkeeper.client.BookKeeper.initializeEnsemblePlacementPolicy(BookKeeper.java:581) at org.apache.bookkeeper.client.BookKeeper.<init>(BookKeeper.java:505) at org.apache.bookkeeper.client.BookKeeper$Builder.build(BookKeeper.java:306) at org.apache.pulsar.broker.BookKeeperClientFactoryImpl.create(BookKeeperClientFactoryImpl.java:87) at org.apache.pulsar.broker.ManagedLedgerClientFactory.lambda$initialize$0(ManagedLedgerClientFactory.java:95) at org.apache.pulsar.broker.ManagedLedgerClientFactory$$Lambda$1006/0x00007facb41b1440.apply(Unknown Source) at java.util.concurrent.ConcurrentHashMap.computeIfAbsent([email protected]/ConcurrentHashMap.java:1705) - locked <0x00001000f3c03a28> (a java.util.concurrent.ConcurrentHashMap$ReservationNode) at org.apache.pulsar.broker.ManagedLedgerClientFactory.lambda$initialize$1(ManagedLedgerClientFactory.java:93) at org.apache.pulsar.broker.ManagedLedgerClientFactory$$Lambda$202/0x00007fae1e73e9b0.get(Unknown Source) at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$6(ManagedLedgerFactoryImpl.java:369) at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$660/0x00007facb91ecd60.apply(Unknown Source) at java.util.concurrent.ConcurrentHashMap.computeIfAbsent([email protected]/ConcurrentHashMap.java:1705) - locked <0x00001000f4a02bc0> (a java.util.concurrent.ConcurrentHashMap$ReservationNode) at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.asyncOpen(ManagedLedgerFactoryImpl.java:365) at org.apache.pulsar.broker.service.BrokerService.lambda$createPersistentTopic$66(BrokerService.java:1533) at org.apache.pulsar.broker.service.BrokerService$$Lambda$644/0x00007facb91c8040.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniAcceptNow([email protected]/CompletableFuture.java:753) at java.util.concurrent.CompletableFuture.uniAcceptStage([email protected]/CompletableFuture.java:731) at java.util.concurrent.CompletableFuture.thenAccept([email protected]/CompletableFuture.java:2108) at org.apache.pulsar.broker.service.BrokerService.createPersistentTopic(BrokerService.java:1514) at org.apache.pulsar.broker.service.BrokerService.lambda$checkOwnershipAndCreatePersistentTopic$60(BrokerService.java:1472) at org.apache.pulsar.broker.service.BrokerService$$Lambda$634/0x00007facb91c6968.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniAcceptNow([email protected]/CompletableFuture.java:753) at java.util.concurrent.CompletableFuture.uniAcceptStage([email protected]/CompletableFuture.java:731) at java.util.concurrent.CompletableFuture.thenAccept([email protected]/CompletableFuture.java:2108) at org.apache.pulsar.broker.service.BrokerService.checkOwnershipAndCreatePersistentTopic(BrokerService.java:1470) at org.apache.pulsar.broker.service.BrokerService.lambda$loadOrCreatePersistentTopic$57(BrokerService.java:1437) at org.apache.pulsar.broker.service.BrokerService$$Lambda$492/0x00007facc317a960.run(Unknown Source) at java.util.concurrent.CompletableFuture.uniRunNow([email protected]/CompletableFuture.java:815) at java.util.concurrent.CompletableFuture.uniRunStage([email protected]/CompletableFuture.java:799) at java.util.concurrent.CompletableFuture.thenRun([email protected]/CompletableFuture.java:2121) at org.apache.pulsar.broker.service.BrokerService.loadOrCreatePersistentTopic(BrokerService.java:1433) at org.apache.pulsar.broker.service.BrokerService.lambda$getTopic$31(BrokerService.java:1014) at org.apache.pulsar.broker.service.BrokerService$$Lambda$476/0x00007facc3057108.apply(Unknown Source) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:404) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:238) at org.apache.pulsar.broker.service.BrokerService.getTopic(BrokerService.java:1013) at org.apache.pulsar.broker.service.BrokerService.getTopic(BrokerService.java:978) at org.apache.pulsar.broker.service.BrokerService.lambda$getOrCreateTopic$29(BrokerService.java:973) ``` 3. Same time metadata-store callback thread waits on the same lock while loading topic and opening managed-ledgerand all other IO threads wait on the lock taken by metadatastore callback thread ``` "metadata-store-10-1" #25 prio=5 os_prio=0 cpu=7574.38ms elapsed=703.54s tid=0x00007fafdd884ed0 nid=0x37eb waiting for monitor entry [0x00007fae8f9d3000] java.lang.Thread.State: BLOCKED (on object monitor) at java.util.concurrent.ConcurrentHashMap.computeIfAbsent([email protected]/ConcurrentHashMap.java:1723) - waiting to lock <0x00001000f4a02bc0> (a java.util.concurrent.ConcurrentHashMap$ReservationNode) at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.asyncOpen(ManagedLedgerFactoryImpl.java:365) at org.apache.pulsar.broker.service.BrokerService.lambda$createPersistentTopic$66(BrokerService.java:1533) at org.apache.pulsar.broker.service.BrokerService$$Lambda$644/0x00007facb91c8040.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniAcceptNow([email protected]/CompletableFuture.java:753) at java.util.concurrent.CompletableFuture.uniAcceptStage([email protected]/CompletableFuture.java:731) at java.util.concurrent.CompletableFuture.thenAccept([email protected]/CompletableFuture.java:2108) at org.apache.pulsar.broker.service.BrokerService.createPersistentTopic(BrokerService.java:1514) at org.apache.pulsar.broker.service.BrokerService.lambda$checkOwnershipAndCreatePersistentTopic$60(BrokerService.java:1472) at org.apache.pulsar.broker.service.BrokerService$$Lambda$634/0x00007facb91c6968.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniAcceptNow([email protected]/CompletableFuture.java:753) at java.util.concurrent.CompletableFuture.uniAcceptStage([email protected]/CompletableFuture.java:731) at java.util.concurrent.CompletableFuture.thenAccept([email protected]/CompletableFuture.java:2108) at org.apache.pulsar.broker.service.BrokerService.checkOwnershipAndCreatePersistentTopic(BrokerService.java:1470) at org.apache.pulsar.broker.service.BrokerService.lambda$loadOrCreatePersistentTopic$57(BrokerService.java:1437) at org.apache.pulsar.broker.service.BrokerService$$Lambda$492/0x00007facc317a960.run(Unknown Source) at java.util.concurrent.CompletableFuture$UniRun.tryFire([email protected]/CompletableFuture.java:783) at java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506) at java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2073) at org.apache.pulsar.metadata.coordination.impl.LockManagerImpl.lambda$acquireLock$1(LockManagerImpl.java:105) at org.apache.pulsar.metadata.coordination.impl.LockManagerImpl$$Lambda$613/0x00007facbf1d9d08.run(Unknown Source) at java.util.concurrent.CompletableFuture$UniRun.tryFire([email protected]/CompletableFuture.java:783) at java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506) at java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2073) at org.apache.pulsar.metadata.coordination.impl.ResourceLockImpl.lambda$acquire$2(ResourceLockImpl.java:128) at org.apache.pulsar.metadata.coordination.impl.ResourceLockImpl$$Lambda$611/0x00007facbdc40040.run(Unknown Source) at java.util.concurrent.CompletableFuture$UniRun.tryFire([email protected]/CompletableFuture.java:783) at java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506) at java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2073) at org.apache.pulsar.metadata.coordination.impl.ResourceLockImpl.lambda$acquireWithNoRevalidation$6(ResourceLockImpl.java:167) at org.apache.pulsar.metadata.coordination.impl.ResourceLockImpl$$Lambda$609/0x00007facbdc42908.accept(Unknown Source) at java.util.concurrent.CompletableFuture$UniAccept.tryFire([email protected]/CompletableFuture.java:714) at java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506) at java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2073) at org.apache.pulsar.metadata.impl.ZKMetadataStore.handlePutResult(ZKMetadataStore.java:225) at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$7(ZKMetadataStore.java:182) at org.apache.pulsar.metadata.impl.ZKMetadataStore$$Lambda$160/0x00007fae8db950b0.run(Unknown Source) at java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:515) at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run([email protected]/ScheduledThreadPoolExecutor.java:304) at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1128) at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:628) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run([email protected]/Thread.java:829) ``` 4. that will create deadlock across all threads into broker and broker stop responding and serving ``` "pulsar-io-4-4" #138 prio=5 os_prio=0 cpu=724.17ms elapsed=702.07s tid=0x00007facc800c320 nid=0x3860 waiting on condition [0x00007fad3dc8c000] java.lang.Thread.State: WAITING (parking) at jdk.internal.misc.Unsafe.park([email protected]/Native Method) - parking to wait for <0x000010001b7478f8> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section) at java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194) at java.util.concurrent.locks.StampedLock.acquireRead([email protected]/StampedLock.java:1429) at java.util.concurrent.locks.StampedLock.readLock([email protected]/StampedLock.java:539) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.get(ConcurrentOpenHashMap.java:349) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.get(ConcurrentOpenHashMap.java:213) at org.apache.pulsar.broker.service.BrokerService.getTopic(BrokerService.java:984) at org.apache.pulsar.broker.service.BrokerService.getTopic(BrokerService.java:978) at org.apache.pulsar.broker.service.BrokerService.lambda$getOrCreateTopic$29(BrokerService.java:973) at org.apache.pulsar.broker.service.BrokerService$$Lambda$516/0x00007facc32a20b0.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage([email protected]/CompletableFuture.java:1106) at java.util.concurrent.CompletableFuture.thenCompose([email protected]/CompletableFuture.java:2235) at org.apache.pulsar.broker.service.BrokerService.getOrCreateTopic(BrokerService.java:973) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleProducer$30(ServerCnx.java:1304) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$515/0x00007facc32a5440.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApplyNow([email protected]/CompletableFuture.java:680) at java.util.concurrent.CompletableFuture.uniApplyStage([email protected]/CompletableFuture.java:658) at java.util.concurrent.CompletableFuture.thenApply([email protected]/CompletableFuture.java:2094) at org.apache.pulsar.broker.service.ServerCnx.handleProducer(ServerCnx.java:1260) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:195) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:202) at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:164) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1383) at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1257) at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1297) at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) ``` ``` ### What did you expect to see? This issue must be solved by addressing three different problems in the execution path **1. Bottleneck at SINGLE metadatastore callback thread** having single metadatastore callback can easily cause deadlock in any regression bug. Therefore, we must have multiple or same number of callback thread as number of IO thread so, callback thread doesn't face bottleneck and don't cause any deadlock in the system. 2. Make BookKeeper client creation async BookKeeper client creation path has a blocking call and blocks managed-ledger creation, and that eventually causes the deadlock. therefore, managed-ledger creation must create bk-client asynchronously. 3. Prevent forever wait on bk-client blocking creation Currently Bk-client has multiple blocking call with metadatastore and it doesn't have timeout. introducing timeout and retry helps bk-client to break the deadlock and successfully complete the execution path. ### What did you see instead? deadlock ### Anything else? _No response_ ### Are you willing to submit a PR? - [X] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
