This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 937335bab2a1078021940513d2964b2b9e6138e6
Author: fengyubiao <[email protected]>
AuthorDate: Tue Jun 17 06:40:37 2025 +0800

    [fix][ml]Received more than once callback when calling cursor.delete 
(#24405)
    
    (cherry picked from commit 72953bf3d692409ba313bd8fe5d237c890a693aa)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  7 +++---
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 28 ++++++++++++++++++++++
 2 files changed, 32 insertions(+), 3 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index fc70ce4c204..af231d80e9d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2355,7 +2355,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         PositionImpl newMarkDeletePosition = null;
 
         lock.writeLock().lock();
-
+        boolean skipMarkDeleteBecauseAckedNothing = false;
         try {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] Deleting individual messages at {}. 
Current status: {} - md-position: {}",
@@ -2424,6 +2424,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
             if (individualDeletedMessages.isEmpty()) {
                 // No changes to individually deleted messages, so nothing to 
do at this point
+                skipMarkDeleteBecauseAckedNothing = true;
                 return;
             }
 
@@ -2441,6 +2442,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
             if (range == null) {
                 // The set was completely cleaned up now
+                skipMarkDeleteBecauseAckedNothing = true;
                 return;
             }
 
@@ -2467,9 +2469,8 @@ public class ManagedCursorImpl implements ManagedCursor {
             callback.deleteFailed(getManagedLedgerException(e), ctx);
             return;
         } finally {
-            boolean empty = individualDeletedMessages.isEmpty();
             lock.writeLock().unlock();
-            if (empty) {
+            if (skipMarkDeleteBecauseAckedNothing) {
                 callback.deleteComplete(ctx);
             }
         }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index d223164a07c..3f6545f9136 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -5233,5 +5233,33 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         ml.delete();
     }
 
+    @Test
+    public void testCallbackTimes() throws Exception {
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
factory.open("testCallbackTimes");
+        ManagedCursorImpl cursor = (ManagedCursorImpl) ml.openCursor("c1");
+        Position position1 = ml.addEntry(new byte[1]);
+        Position position2 = ml.addEntry(new byte[2]);
+        AtomicInteger executedCallbackTimes = new AtomicInteger();
+        cursor.asyncDelete(Arrays.asList(position1, position2), new 
DeleteCallback() {
+            @Override
+            public void deleteComplete(Object ctx) {
+                executedCallbackTimes.incrementAndGet();
+            }
+
+            @Override
+            public void deleteFailed(ManagedLedgerException exception, Object 
ctx) {
+                executedCallbackTimes.incrementAndGet();
+            }
+        }, new Object());
+        // Verify that the executed count of callback is "1".
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(executedCallbackTimes.get() > 0);
+        });
+        Thread.sleep(2000);
+        assertEquals(executedCallbackTimes.get(), 1);
+        // cleanup.
+        ml.delete();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorTest.class);
 }

Reply via email to