This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a9938558a8a09414d9adadc520f4c797c750c8a7
Author: gaozhangmin <[email protected]>
AuthorDate: Tue Mar 29 01:00:20 2022 +0800

    [fix][transaction] Properly close transaction-buffer-sub non durable cursor 
(#14900)
    
    Fixes #14880
    
    ### Motivation
    
    Non durable cursor was not closed properly.
    
    ### Modifications
    For non durable cursor,  `cursor.asyncClose` did nothing. The proper way is 
`topic.getManagedLedger().asyncDeleteCursor`
    
    (cherry picked from commit 4e62ffc15714cfa49ed441f3ba7ededb866b9062)
---
 .../transaction/buffer/impl/TopicTransactionBuffer.java     | 13 ++++++++-----
 .../apache/pulsar/broker/transaction/TransactionTest.java   |  7 +++++--
 2 files changed, 13 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 66ce8f5..e2888d9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -55,6 +55,7 @@ import 
org.apache.pulsar.common.policies.data.TransactionBufferStats;
 import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.Codec;
 import org.jctools.queues.MessagePassingQueue;
 import org.jctools.queues.SpscArrayQueue;
 
@@ -639,7 +640,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                                 }
                             }
 
-                            closeCursor(managedCursor);
+                            closeCursor(SUBSCRIPTION_NAME);
                             callBack.recoverComplete();
                         }, 
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
                                 .getExecutor(this)).exceptionally(e -> {
@@ -656,17 +657,19 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
             });
         }
 
-        private void closeCursor(ManagedCursor cursor) {
-            cursor.asyncClose(new AsyncCallbacks.CloseCallback() {
+        private void closeCursor(String subscriptionName) {
+            
topic.getManagedLedger().asyncDeleteCursor(Codec.encode(subscriptionName),
+                    new AsyncCallbacks.DeleteCursorCallback() {
                 @Override
-                public void closeComplete(Object ctx) {
+                public void deleteCursorComplete(Object ctx) {
                     log.info("[{}]Transaction buffer snapshot recover cursor 
close complete.", topic.getName());
                 }
 
                 @Override
-                public void closeFailed(ManagedLedgerException exception, 
Object ctx) {
+                public void deleteCursorFailed(ManagedLedgerException 
exception, Object ctx) {
                     log.error("[{}]Transaction buffer snapshot recover cursor 
close fail.", topic.getName());
                 }
+
             }, null);
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 231c183..ae5e8c4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -538,7 +538,7 @@ public class TransactionTest extends TransactionTestBase {
                 .getTopic("persistent://" + topic, false).get().get();
         
persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
 
-        ManagedCursor managedCursor = mock(ManagedCursor.class);
+        ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class);
         doReturn("transaction-buffer-sub").when(managedCursor).getName();
         doReturn(true).when(managedCursor).hasMoreEntries();
         doAnswer(invocation -> {
@@ -579,6 +579,9 @@ public class TransactionTest extends TransactionTestBase {
         TransactionBuffer buffer3 = new 
TopicTransactionBuffer(persistentTopic);
         Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
                 assertEquals(buffer3.getStats().state, "Ready"));
+        persistentTopic.getInternalStats(false).thenAccept(internalStats -> {
+            assertTrue(internalStats.cursors.isEmpty());
+        });
         managedCursors.removeCursor("transaction-buffer-sub");
     }
 
@@ -893,4 +896,4 @@ public class TransactionTest extends TransactionTestBase {
         pulsarServiceList.forEach((pulsarService ->
                 
pulsarService.getConfiguration().setAllowAutoUpdateSchemaEnabled(true)));
     }
-}
\ No newline at end of file
+}

Reply via email to