devinbost commented on issue #6054:
URL: https://github.com/apache/pulsar/issues/6054#issuecomment-830477787


   Update on findings:
   
   We discovered that this issue is multi-faceted. 
   
   1. The first issue was that the subscription was getting stuck where there 
were permits > 0, pendingReadOps = 0, and the cursor's read-position was not 
advancing despite an existing backlog. That issue was mitigated by 
https://github.com/apache/pulsar/pull/9789 though the root cause of why the 
cursor wasn't advancing is still unknown. It's possible that some code branch 
that should be calling 
`PersistentDispatcherMultipleConsumers.readMoreEntries()` isn't doing so. We 
also discovered that `readMoreEntries()` wasn't synchronized, which could be 
causing concurrency issues from preventing the reads or causing permit issues. 
Any such concurrency issues should be resolved now by 
https://github.com/apache/pulsar/pull/10413 . After those changes, I wasn't 
able to reproduce the frozen subscription except when consuming from a 
partitioned topic using a Pulsar function with several parallel function 
instances. 
   
   2. The second issue is that when consuming from a partitioned, persistent 
topic using a Pulsar function (on a shared subscription) with parallelism > 1, 
messages were not being dispatched properly. There were two parts to that issue:
   
   2.1. The first part of the issue was preventing messages from dispatching to 
functions that have permits > 0. What was happening is that the dispatcher was 
failing to dispatch to consumers that were reading more than the other 
consumers (e.g. where one consumer had permits > 0 and the others had permits 
<= 0.) So, some consumers were just waiting around and not processing messages 
until the other consumers caught up. That issue was fixed by 
https://github.com/apache/pulsar/pull/10417
   
   2.2. The second part of the issue is that due to current message batching 
behavior, too many messages were being dispatched, which resulted in negative 
permits on the dispatcher. That issue will be fixed by #7266.
   
   3. After applying #10417, #10413, and #9789 in a custom build of Pulsar, we 
resolved the dispatching issues and discovered a new problem: When consuming 
from a partitioned, persistent topic using a Pulsar function (on a shared 
subscription) with parallelism > 1, we discovered that functions with permits 
<= 0 are sitting around doing nothing despite having received messages from the 
broker into the `incomingMessages` queue on `ConsumerBase`.  
   A thread-dump revealed that it appears the functions are stuck waiting for 
the release of a semaphore during `ProducerImpl.sendAsync()`:
   
   ```
   "myTenant/myNamespace/function-filter-0" prio=5 tid=32 WAITING
       at sun.misc.Unsafe.park(Native Method)
       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
       at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
       at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
          local variable: 
java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#40
          local variable: 
java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#41
       at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
          local variable: java.util.concurrent.Semaphore$FairSync#1
       at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
       at 
org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:748)
       at 
org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:391)
          local variable: org.apache.pulsar.client.impl.ProducerImpl#1
          local variable: org.apache.pulsar.client.impl.ProducerImpl$1#1
          local variable: io.netty.buffer.UnpooledHeapByteBuf#17
          local variable: org.apache.pulsar.common.api.proto.MessageMetadata#83
          local variable: org.apache.pulsar.client.impl.MessageImpl#82
       at 
org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:290)
          local variable: java.util.concurrent.CompletableFuture#33
       at 
org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:103)
       at 
org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkAtLeastOnceProcessor.sendOutputMessage(PulsarSink.java:274)
          local variable: 
org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkAtLeastOnceProcessor#1
          local variable: org.apache.pulsar.functions.instance.SinkRecord#2
       at org.apache.pulsar.functions.sink.PulsarSink.write(PulsarSink.java:393)
       at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:349)
       at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.lambda$processResult$0(JavaInstanceRunnable.java:331)
       at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable$$Lambda$193.accept(<unknown
 string>)
       at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
          local variable: 
org.apache.pulsar.functions.instance.JavaExecutionResult#1
       at 
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792)
          local variable: 
org.apache.pulsar.functions.instance.JavaInstanceRunnable$$Lambda$193#1
          local variable: java.util.concurrent.CompletableFuture#35
       at 
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153)
       at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.processResult(JavaInstanceRunnable.java:322)
       at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:275)
          local variable: 
org.apache.pulsar.functions.instance.JavaInstanceRunnable#1
          local variable: java.util.concurrent.CompletableFuture#34
       at java.lang.Thread.run(Thread.java:748)
   ```
   After disabling batching on functions in a custom build, the issue 
disappeared. However, performance is severely degraded when batching is 
disabled, so it's not a viable workaround. 
   The current behavior expects the semaphore to block the thread since 
`blockIfQueueFullDisabled = true` by default in functions. So, that implies the 
producing queue is full. 
   
   So, the remaining mysteries are:
   Why is the producer-queue full? (Why is the function not getting ack from 
the broker?)
   Or, if the function is getting ack from the broker, why is the pulsar-client 
not batching and sending? 
   
   However, we need to confirm if the thread responsible for consuming is 
indeed blocked by producing. 
   (@codelipenghui or @jerrypeng , do either of you know? )
   
   Many thanks to @rdhabalia for helping to deep dive on this issue. 
   Also thanks to @lhotari for helping with the concurrency side of things. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to