This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 71f3928  Bugfix - release and recycle on discarded messages (#4342)
71f3928 is described below

commit 71f3928722fc702f2f8699c68dd7262d53d0a630
Author: Ezequiel Lovelle <[email protected]>
AuthorDate: Fri May 24 13:48:06 2019 -0300

    Bugfix - release and recycle on discarded messages (#4342)
    
    Don't leak resources when a message is being discarded.
    
    *Modifications*
    
      - Fix missing release() and recycle() for discarded message on
        receiveIndividualMessagesFromBatch method.
      - Fix argument missing of debug logging {}-placeholder.
      - Fix unnecessary variable reference `payload` on messageReceived().
---
 .../java/org/apache/pulsar/client/impl/ConsumerImpl.java   | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8d91ca1..3426e22 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -747,17 +747,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                     messageId.getEntryId());
         }
 
-        MessageMetadata msgMetadata = null;
-        ByteBuf payload = headersAndPayload;
-
         if (!verifyChecksum(headersAndPayload, messageId)) {
             // discard message with checksum error
             discardCorruptedMessage(messageId, cnx, 
ValidationError.ChecksumMismatch);
             return;
         }
 
+        MessageMetadata msgMetadata;
         try {
-            msgMetadata = Commands.parseMessageMetadata(payload);
+            msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
         } catch (Throwable t) {
             discardCorruptedMessage(messageId, cnx, 
ValidationError.ChecksumMismatch);
             return;
@@ -768,15 +766,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), 
messageId.getEntryId(), getPartitionIndex());
         if (acknowledgmentsGroupingTracker.isDuplicate(msgId)) {
             if (log.isDebugEnabled()) {
-                log.debug("[{}][{}] Ignoring message as it was already being 
acked earlier by same consumer {}/{}",
-                        topic, subscription, msgId);
+                log.debug("[{}] [{}] Ignoring message as it was already being 
acked earlier by same consumer {}/{}",
+                        topic, subscription, consumerName, msgId);
             }
 
             increaseAvailablePermits(cnx, numMessages);
             return;
         }
 
-        ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, 
msgMetadata, payload, cnx);
+        ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, 
msgMetadata, headersAndPayload, cnx);
 
         boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata);
 
@@ -950,6 +948,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                         log.debug("[{}] [{}] Ignoring message from before the 
startMessageId", subscription,
                                 consumerName);
                     }
+                    singleMessagePayload.release();
+                    singleMessageMetadataBuilder.recycle();
 
                     ++skippedMessages;
                     continue;

Reply via email to