This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 79233ae  Fix deleteTransactionMarker memory leak (#9752)
79233ae is described below

commit 79233ae9e5afdae66bf3408121b2016d17045590
Author: Michael Marshall <[email protected]>
AuthorDate: Sun Feb 28 18:17:29 2021 -0700

    Fix deleteTransactionMarker memory leak (#9752)
    
    Relates to https://github.com/apache/pulsar/issues/9725.
    
    Opening this PR because https://github.com/apache/pulsar/pull/9751 will 
(likely) have conflicts for cherry picking. See 
https://github.com/apache/pulsar/pull/9751 for details on the change.
---
 .../service/persistent/PersistentSubscription.java       | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 9c0b524..c39d4bf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -379,12 +379,16 @@ public class PersistentSubscription implements 
Subscription {
             managedLedger.asyncReadEntry(nextPosition, new ReadEntryCallback() 
{
                 @Override
                 public void readEntryComplete(Entry entry, Object ctx) {
-                    MessageMetadata messageMetadata = 
Commands.parseMessageMetadata(entry.getDataBuffer());
-                    isDeleteTransactionMarkerInProcess = false;
-                    if (Markers.isTxnCommitMarker(messageMetadata) || 
Markers.isTxnAbortMarker(messageMetadata)) {
-                        lastMarkDeleteForTransactionMarker = position;
-                        messageMetadata.recycle();
-                        
acknowledgeMessage(Collections.singletonList(nextPosition), ackType, 
properties);
+                    try {
+                        MessageMetadata messageMetadata = 
Commands.parseMessageMetadata(entry.getDataBuffer());
+                        isDeleteTransactionMarkerInProcess = false;
+                        if (Markers.isTxnCommitMarker(messageMetadata) || 
Markers.isTxnAbortMarker(messageMetadata)) {
+                            lastMarkDeleteForTransactionMarker = position;
+                            messageMetadata.recycle();
+                            
acknowledgeMessage(Collections.singletonList(nextPosition), ackType, 
properties);
+                        }
+                    } finally {
+                        entry.release();
                     }
                 }
 

Reply via email to