merlimat opened a new pull request #11824:
URL: https://github.com/apache/pulsar/pull/11824
### Motivation
There is an exception that is thrown (and not shown, but that's a different
issue) when using a consumer with message pooling enabled and consuming from a
partitioned topic.
After a short amount of time the consumption silently stops.
The exception is:
```
java.lang.NullPointerException
at org.apache.pulsar.client.impl.MessageImpl.size(MessageImpl.java:398)
at
org.apache.pulsar.client.impl.TopicMessageImpl.size(TopicMessageImpl.java:98)
at
org.apache.pulsar.client.impl.ConsumerBase.increaseIncomingMessageSize(ConsumerBase.java:974)
at
org.apache.pulsar.client.impl.ConsumerBase.enqueueMessageAndCheckBatchReceive(ConsumerBase.java:731)
at
org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.messageReceived(MultiTopicsConsumerImpl.java:301)
at
org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.lambda$receiveMessageFromConsumer$8(MultiTopicsConsumerImpl.java:255)
at
java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2144)
at
org.apache.pulsar.client.impl.ConsumerImpl.lambda$internalReceiveAsync$3(ConsumerImpl.java:434)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:830)
```
The issue is easily reproducible by using `pulsar-perf consume` over a
partitioned topic, since `pulsar-perf` is using message pooling by default.
The root cause is that we are accessing the message object instance after it
was enqueued on the queue where the user will pick it up.
The user is doing something like:
```
while (true) {
Message<?> msg = consumer.receive();
///
consumer.acknowledge();
msg.release();
}
```
When `msg.release()` gets called, the object fields are nulled and then the
object goes back to the pool. For that, from inside the client lib, we cannot
touch that object from the moment it gets put on the queue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]