thetumbled opened a new issue, #19568:
URL: https://github.com/apache/pulsar/issues/19568

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Version
   
   2.9
   
   ### Minimal reproduce step
   
   When i try to add support for resending messages that is timeout out, i 
modify the `PerformanceProducer` like below.
   
![image](https://user-images.githubusercontent.com/52550727/220042337-202d1419-8293-4617-8679-9c9f729d0a14.png)
   
   and soon i encounter the deadlock problem. 
   I emulate the risk of message loss by TC command, for example:
   ```
   tc qdisc add dev p2p1 root netem loss random 30%
   ```
   this command will drop some messages that broker responds to producer. So 
when producer send some messages to broker, the ack for sending may be dropped 
for 30%, as a consequence the messages in `pendingMessages` may be timeout, 
TimeOut exception will be throw to trigger resending logic i add above.
   
   But soon the whole process is stuck, and i dump the stack and find out the 
cause of problem.
   ```
   "pulsar-client-internal-5-1" #16 prio=5 os_prio=31 tid=0x000000012509d000 
nid=0x7a03 waiting for monitor entry [0x000000016e986000]
      java.lang.Thread.State: BLOCKED (on object monitor)
       at 
org.apache.pulsar.client.impl.**ProducerImpl.lambda$triggerBatchMessageAndSend**$16(ProducerImpl.java:1648)
       - waiting to lock <0x00000006c6ecab08> (a 
org.apache.pulsar.client.impl.ProducerImpl)
       at 
org.apache.pulsar.client.impl.ProducerImpl$$Lambda$91/1627650835.run(Unknown 
Source)
       at 
org.apache.pulsar.common.util.Runnables$CatchingAndLoggingRunnable.run(Runnables.java:53)
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
       at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
       at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
       at java.lang.Thread.run(Thread.java:750)
   ```
   the timer task for `triggerBatchMessageAndSend` is stuck because other 
threads hold the `ProducerImpl` object. the timer task for 
`triggerBatchMessageAndSend` is used for batch messages and send them to 
brokers, then the broker send back the ack to producer, **producer will release 
the semaphore** used to control the size of `pendingMessages`.
   
   and i find that below thread hold the `ProducerImpl` object, and it is 
waiting for semaphore which is released by thread above, which result into the 
deadlock.
   ```
   "pulsar-client-io-2-1" #14 prio=5 os_prio=31 tid=0x000000012516d000 
nid=0x5307 waiting on condition [0x000000016e56d000]
      java.lang.Thread.State: WAITING (parking)
       at sun.misc.Unsafe.park(Native Method)
       - parking to wait for  <0x00000006c6ece7a8> (a 
java.util.concurrent.Semaphore$FairSync)
       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
       at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
       at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
       at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
       at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
       at 
org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:833)
       at 
org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:438)
       at 
org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:333)
      java.lang.Thread.State: WAITING (parking)
       at sun.misc.Unsafe.park(Native Method)
       - parking to wait for  <0x00000006c6ece7a8> (a 
java.util.concurrent.Semaphore$FairSync)
       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
       at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
       at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
       at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
       at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
       at 
org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:833)
       at 
org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:438)
       at 
org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:333)
       at 
org.apache.pulsar.client.impl.ProducerImpl.internalSendWithTxnAsync(ProducerImpl.java:404)
       at 
org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendWithTxnAsync(PartitionedProducerImpl.java:244)
       at 
org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendAsync(PartitionedProducerImpl.java:191)
       at 
org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:104)
       at 
org.apache.pulsar.testclient.PerformanceProducer.lambda$runProducer$3(PerformanceProducer.java:966)
       at 
org.apache.pulsar.testclient.PerformanceProducer$$Lambda$98/1280550469.apply(Unknown
 Source)
       at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
       at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
       at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
       at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
       at 
org.apache.pulsar.client.impl.ProducerImpl$1.sendComplete(ProducerImpl.java:359)
       at 
org.apache.pulsar.client.impl.ProducerImpl$OpSendMsg.sendComplete(ProducerImpl.java:1313)
       at 
org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$18(ProducerImpl.java:1860)
       at 
org.apache.pulsar.client.impl.ProducerImpl$$Lambda$103/1226908104.accept(Unknown
 Source)
       at java.lang.Iterable.forEach(Iterable.java:75)
       at 
org.apache.pulsar.client.impl.ProducerImpl$OpSendMsgQueue.forEach(ProducerImpl.java:1392)
       at 
org.apache.pulsar.client.impl.ProducerImpl.failPendingMessages(ProducerImpl.java:1850)
       at 
org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$19(ProducerImpl.java:1882)
       - locked <0x00000006c6ecab08> (a 
org.apache.pulsar.client.impl.ProducerImpl)
       at 
org.apache.pulsar.client.impl.ProducerImpl$$Lambda$102/788075384.run(Unknown 
Source)
       at 
io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
       at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
       at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
       at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
       at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
       at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
       at java.lang.Thread.run(Thread.java:750)
   ```
   
   Stack above provide help for further analysis. When messages timeout occur, 
thread `pulsar-client-io` will hold the `ProducerImpl` object, try to fail 
them, and throw TimeoutException, which will be handle in 
`CompletableFuture..exceptionally` block.  As i add resending logic in 
`CompletableFuture..exceptionally` block, thread **`pulsar-client-io`  will try 
to acquire semaphore synchronously,**  if `permits` of semaphore equals to 
zero, thread `pulsar-client-io` will be stuck. And the timer task for release 
semaphore will also be stuck for acquiring `ProducerImpl` object, which is hold 
by thread `pulsar-client-io`.
   
   ### What did you expect to see?
   
   Producing process will producing messages without errors.
   
   ### What did you see instead?
   
   producing process is stuck.
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to