This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 79233ae Fix deleteTransactionMarker memory leak (#9752)
79233ae is described below
commit 79233ae9e5afdae66bf3408121b2016d17045590
Author: Michael Marshall <[email protected]>
AuthorDate: Sun Feb 28 18:17:29 2021 -0700
Fix deleteTransactionMarker memory leak (#9752)
Relates to https://github.com/apache/pulsar/issues/9725.
Opening this PR because https://github.com/apache/pulsar/pull/9751 will
(likely) have conflicts for cherry picking. See
https://github.com/apache/pulsar/pull/9751 for details on the change.
---
.../service/persistent/PersistentSubscription.java | 16 ++++++++++------
1 file changed, 10 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 9c0b524..c39d4bf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -379,12 +379,16 @@ public class PersistentSubscription implements
Subscription {
managedLedger.asyncReadEntry(nextPosition, new ReadEntryCallback()
{
@Override
public void readEntryComplete(Entry entry, Object ctx) {
- MessageMetadata messageMetadata =
Commands.parseMessageMetadata(entry.getDataBuffer());
- isDeleteTransactionMarkerInProcess = false;
- if (Markers.isTxnCommitMarker(messageMetadata) ||
Markers.isTxnAbortMarker(messageMetadata)) {
- lastMarkDeleteForTransactionMarker = position;
- messageMetadata.recycle();
-
acknowledgeMessage(Collections.singletonList(nextPosition), ackType,
properties);
+ try {
+ MessageMetadata messageMetadata =
Commands.parseMessageMetadata(entry.getDataBuffer());
+ isDeleteTransactionMarkerInProcess = false;
+ if (Markers.isTxnCommitMarker(messageMetadata) ||
Markers.isTxnAbortMarker(messageMetadata)) {
+ lastMarkDeleteForTransactionMarker = position;
+ messageMetadata.recycle();
+
acknowledgeMessage(Collections.singletonList(nextPosition), ackType,
properties);
+ }
+ } finally {
+ entry.release();
}
}