This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 666ad3b Fixed accessing MessageImpl after it was enqueued on user
queue (#11824)
666ad3b is described below
commit 666ad3b13cbbf35c329fb3fd433f117d6d893e0a
Author: Matteo Merli <[email protected]>
AuthorDate: Sat Aug 28 08:46:41 2021 -0700
Fixed accessing MessageImpl after it was enqueued on user queue (#11824)
---
.../main/java/org/apache/pulsar/client/impl/ConsumerBase.java | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index f4002c2..d2b4f2a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -727,8 +727,11 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
}
protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
+ int messageSize = message.size();
if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
- increaseIncomingMessageSize(message);
+ // After we have enqueued the messages on `incomingMessages`
queue, we cannot touch the message instance
+ // anymore, since for pooled messages, this instance was possibly
already been released and recycled.
+ INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
}
return hasEnoughMessagesForBatchReceive();
}
@@ -970,10 +973,6 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
return pendingBatchReceives != null && hasNextBatchReceive();
}
- protected void increaseIncomingMessageSize(final Message<?> message) {
- INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.size());
- }
-
protected void resetIncomingMessageSize() {
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
}