This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 014a69c701513e53508afde69411784d82964924 Author: Matteo Merli <[email protected]> AuthorDate: Sat Aug 28 08:46:41 2021 -0700 Fixed accessing MessageImpl after it was enqueued on user queue (#11824) (cherry picked from commit 666ad3b13cbbf35c329fb3fd433f117d6d893e0a) --- .../main/java/org/apache/pulsar/client/impl/ConsumerBase.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index f4002c2..d2b4f2a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -727,8 +727,11 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T } protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) { + int messageSize = message.size(); if (canEnqueueMessage(message) && incomingMessages.offer(message)) { - increaseIncomingMessageSize(message); + // After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message instance + // anymore, since for pooled messages, this instance was possibly already been released and recycled. + INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize); } return hasEnoughMessagesForBatchReceive(); } @@ -970,10 +973,6 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T return pendingBatchReceives != null && hasNextBatchReceive(); } - protected void increaseIncomingMessageSize(final Message<?> message) { - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.size()); - } - protected void resetIncomingMessageSize() { INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); }
