devinbost commented on issue #6054: URL: https://github.com/apache/pulsar/issues/6054#issuecomment-830477787
Update on findings: We discovered that this issue is multi-faceted. 1. The first issue was that the subscription was getting stuck where there were permits > 0, pendingReadOps = 0, and the cursor's read-position was not advancing despite an existing backlog. That issue was mitigated by https://github.com/apache/pulsar/pull/9789 though the root cause of why the cursor wasn't advancing is still unknown. It's possible that some code branch that should be calling `PersistentDispatcherMultipleConsumers.readMoreEntries()` isn't doing so. We also discovered that `readMoreEntries()` wasn't synchronized, which could be causing concurrency issues from preventing the reads or causing permit issues. Any such concurrency issues should be resolved now by https://github.com/apache/pulsar/pull/10413 . After those changes, I wasn't able to reproduce the frozen subscription except when consuming from a partitioned topic using a Pulsar function with several parallel function instances. 2. The second issue is that when consuming from a partitioned, persistent topic using a Pulsar function (on a shared subscription) with parallelism > 1, messages were not being dispatched properly. There were two parts to that issue: 2.1. The first part of the issue was preventing messages from dispatching to functions that have permits > 0. What was happening is that the dispatcher was failing to dispatch to consumers that were reading more than the other consumers (e.g. where one consumer had permits > 0 and the others had permits <= 0.) So, some consumers were just waiting around and not processing messages until the other consumers caught up. That issue was fixed by https://github.com/apache/pulsar/pull/10417 2.2. The second part of the issue is that due to current message batching behavior, too many messages were being dispatched, which resulted in negative permits on the dispatcher. That issue will be fixed by #7266. 3. After applying #10417, #10413, and #9789 in a custom build of Pulsar, we resolved the dispatching issues and discovered a new problem: When consuming from a partitioned, persistent topic using a Pulsar function (on a shared subscription) with parallelism > 1, we discovered that functions with permits <= 0 are sitting around doing nothing despite having received messages from the broker into the `incomingMessages` queue on `ConsumerBase`. A thread-dump revealed that it appears the functions are stuck waiting for the release of a semaphore during `ProducerImpl.sendAsync()`: ``` "myTenant/myNamespace/function-filter-0" prio=5 tid=32 WAITING at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) local variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#40 local variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#41 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) local variable: java.util.concurrent.Semaphore$FairSync#1 at java.util.concurrent.Semaphore.acquire(Semaphore.java:312) at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:748) at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:391) local variable: org.apache.pulsar.client.impl.ProducerImpl#1 local variable: org.apache.pulsar.client.impl.ProducerImpl$1#1 local variable: io.netty.buffer.UnpooledHeapByteBuf#17 local variable: org.apache.pulsar.common.api.proto.MessageMetadata#83 local variable: org.apache.pulsar.client.impl.MessageImpl#82 at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:290) local variable: java.util.concurrent.CompletableFuture#33 at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:103) at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkAtLeastOnceProcessor.sendOutputMessage(PulsarSink.java:274) local variable: org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkAtLeastOnceProcessor#1 local variable: org.apache.pulsar.functions.instance.SinkRecord#2 at org.apache.pulsar.functions.sink.PulsarSink.write(PulsarSink.java:393) at org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:349) at org.apache.pulsar.functions.instance.JavaInstanceRunnable.lambda$processResult$0(JavaInstanceRunnable.java:331) at org.apache.pulsar.functions.instance.JavaInstanceRunnable$$Lambda$193.accept(<unknown string>) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) local variable: org.apache.pulsar.functions.instance.JavaExecutionResult#1 at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792) local variable: org.apache.pulsar.functions.instance.JavaInstanceRunnable$$Lambda$193#1 local variable: java.util.concurrent.CompletableFuture#35 at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153) at org.apache.pulsar.functions.instance.JavaInstanceRunnable.processResult(JavaInstanceRunnable.java:322) at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:275) local variable: org.apache.pulsar.functions.instance.JavaInstanceRunnable#1 local variable: java.util.concurrent.CompletableFuture#34 at java.lang.Thread.run(Thread.java:748) ``` After disabling batching on functions in a custom build, the issue disappeared. However, performance is severely degraded when batching is disabled, so it's not a viable workaround. The current behavior expects the semaphore to block the thread since `blockIfQueueFullDisabled = true` by default in functions. So, that implies the producing queue is full. So, the remaining mysteries are: Why is the producer-queue full? (Why is the function not getting ack from the broker?) Or, if the function is getting ack from the broker, why is the pulsar-client not batching and sending? However, we need to confirm if the thread responsible for consuming is indeed blocked by producing. (@codelipenghui or @jerrypeng , do either of you know? ) Many thanks to @rdhabalia for helping to deep dive on this issue. Also thanks to @lhotari for helping with the concurrency side of things. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
