This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 937335bab2a1078021940513d2964b2b9e6138e6 Author: fengyubiao <[email protected]> AuthorDate: Tue Jun 17 06:40:37 2025 +0800 [fix][ml]Received more than once callback when calling cursor.delete (#24405) (cherry picked from commit 72953bf3d692409ba313bd8fe5d237c890a693aa) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 7 +++--- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 28 ++++++++++++++++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index fc70ce4c204..af231d80e9d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2355,7 +2355,7 @@ public class ManagedCursorImpl implements ManagedCursor { PositionImpl newMarkDeletePosition = null; lock.writeLock().lock(); - + boolean skipMarkDeleteBecauseAckedNothing = false; try { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Deleting individual messages at {}. Current status: {} - md-position: {}", @@ -2424,6 +2424,7 @@ public class ManagedCursorImpl implements ManagedCursor { if (individualDeletedMessages.isEmpty()) { // No changes to individually deleted messages, so nothing to do at this point + skipMarkDeleteBecauseAckedNothing = true; return; } @@ -2441,6 +2442,7 @@ public class ManagedCursorImpl implements ManagedCursor { if (range == null) { // The set was completely cleaned up now + skipMarkDeleteBecauseAckedNothing = true; return; } @@ -2467,9 +2469,8 @@ public class ManagedCursorImpl implements ManagedCursor { callback.deleteFailed(getManagedLedgerException(e), ctx); return; } finally { - boolean empty = individualDeletedMessages.isEmpty(); lock.writeLock().unlock(); - if (empty) { + if (skipMarkDeleteBecauseAckedNothing) { callback.deleteComplete(ctx); } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index d223164a07c..3f6545f9136 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -5233,5 +5233,33 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { ml.delete(); } + @Test + public void testCallbackTimes() throws Exception { + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("testCallbackTimes"); + ManagedCursorImpl cursor = (ManagedCursorImpl) ml.openCursor("c1"); + Position position1 = ml.addEntry(new byte[1]); + Position position2 = ml.addEntry(new byte[2]); + AtomicInteger executedCallbackTimes = new AtomicInteger(); + cursor.asyncDelete(Arrays.asList(position1, position2), new DeleteCallback() { + @Override + public void deleteComplete(Object ctx) { + executedCallbackTimes.incrementAndGet(); + } + + @Override + public void deleteFailed(ManagedLedgerException exception, Object ctx) { + executedCallbackTimes.incrementAndGet(); + } + }, new Object()); + // Verify that the executed count of callback is "1". + Awaitility.await().untilAsserted(() -> { + assertTrue(executedCallbackTimes.get() > 0); + }); + Thread.sleep(2000); + assertEquals(executedCallbackTimes.get(), 1); + // cleanup. + ml.delete(); + } + private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); }
