This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 666ad3b  Fixed accessing MessageImpl after it was enqueued on user 
queue (#11824)
666ad3b is described below

commit 666ad3b13cbbf35c329fb3fd433f117d6d893e0a
Author: Matteo Merli <[email protected]>
AuthorDate: Sat Aug 28 08:46:41 2021 -0700

    Fixed accessing MessageImpl after it was enqueued on user queue (#11824)
---
 .../main/java/org/apache/pulsar/client/impl/ConsumerBase.java    | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index f4002c2..d2b4f2a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -727,8 +727,11 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     }
 
     protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
+        int messageSize = message.size();
         if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
-            increaseIncomingMessageSize(message);
+            // After we have enqueued the messages on `incomingMessages` 
queue, we cannot touch the message instance
+            // anymore, since for pooled messages, this instance was possibly 
already been released and recycled.
+            INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
         }
         return hasEnoughMessagesForBatchReceive();
     }
@@ -970,10 +973,6 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         return pendingBatchReceives != null && hasNextBatchReceive();
     }
 
-    protected void increaseIncomingMessageSize(final Message<?> message) {
-        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.size());
-    }
-
     protected void resetIncomingMessageSize() {
         INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
     }

Reply via email to