This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 71f3928 Bugfix - release and recycle on discarded messages (#4342)
71f3928 is described below
commit 71f3928722fc702f2f8699c68dd7262d53d0a630
Author: Ezequiel Lovelle <[email protected]>
AuthorDate: Fri May 24 13:48:06 2019 -0300
Bugfix - release and recycle on discarded messages (#4342)
Don't leak resources when a message is being discarded.
*Modifications*
- Fix missing release() and recycle() for discarded message on
receiveIndividualMessagesFromBatch method.
- Fix argument missing of debug logging {}-placeholder.
- Fix unnecessary variable reference `payload` on messageReceived().
---
.../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8d91ca1..3426e22 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -747,17 +747,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
messageId.getEntryId());
}
- MessageMetadata msgMetadata = null;
- ByteBuf payload = headersAndPayload;
-
if (!verifyChecksum(headersAndPayload, messageId)) {
// discard message with checksum error
discardCorruptedMessage(messageId, cnx,
ValidationError.ChecksumMismatch);
return;
}
+ MessageMetadata msgMetadata;
try {
- msgMetadata = Commands.parseMessageMetadata(payload);
+ msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
} catch (Throwable t) {
discardCorruptedMessage(messageId, cnx,
ValidationError.ChecksumMismatch);
return;
@@ -768,15 +766,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), getPartitionIndex());
if (acknowledgmentsGroupingTracker.isDuplicate(msgId)) {
if (log.isDebugEnabled()) {
- log.debug("[{}][{}] Ignoring message as it was already being
acked earlier by same consumer {}/{}",
- topic, subscription, msgId);
+ log.debug("[{}] [{}] Ignoring message as it was already being
acked earlier by same consumer {}/{}",
+ topic, subscription, consumerName, msgId);
}
increaseAvailablePermits(cnx, numMessages);
return;
}
- ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId,
msgMetadata, payload, cnx);
+ ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId,
msgMetadata, headersAndPayload, cnx);
boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata);
@@ -950,6 +948,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
log.debug("[{}] [{}] Ignoring message from before the
startMessageId", subscription,
consumerName);
}
+ singleMessagePayload.release();
+ singleMessageMetadataBuilder.recycle();
++skippedMessages;
continue;