This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2f474463abb3a29978cd540ae1de41c75bb47af0 Author: Oneby Wang <[email protected]> AuthorDate: Fri Feb 6 00:58:59 2026 +0800 [fix][broker] Fix ManagedCursorImpl.asyncDelete() method may lose previous async mark delete properties in race condition (#25165) (cherry picked from commit bea6f8ac48740b44c5d162bd4aa0ca851eb91ae5) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 5 +- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 100 +++++++++++++++++++++ 2 files changed, 101 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index c120b9fa719..1ab6ceab9a2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2681,10 +2681,7 @@ public class ManagedCursorImpl implements ManagedCursor { } try { - Map<String, Long> properties = lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties - : Collections.emptyMap(); - - internalAsyncMarkDelete(newMarkDeletePosition, properties, new MarkDeleteCallback() { + internalAsyncMarkDelete(newMarkDeletePosition, null, new MarkDeleteCallback() { @Override public void markDeleteComplete(Object ctx) { callback.deleteComplete(ctx); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index bfb9b6ecca1..f41269b49af 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -5926,6 +5926,106 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { assertEquals(future1.get(2, TimeUnit.SECONDS).get(0).getData(), "msg".getBytes()); } + @Test(timeOut = 20000) + public void testAsyncMarkDeleteNeverLoseProperties() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(3); + config.setRetentionTime(20, TimeUnit.SECONDS); + config.setRetentionSizeInMB(5); + + @Cleanup ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("testAsyncMarkDeleteNeverLoseProperties", config); + @Cleanup ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1"); + + int numMessages = 20; + List<Position> positions = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + Position pos = ledger.addEntry("entry-1".getBytes(Encoding)); + positions.add(pos); + } + + String propertyKey = "test-property"; + CountDownLatch latch = new CountDownLatch(numMessages); + for (int i = 0; i < numMessages; i++) { + Map<String, Long> properties = new HashMap<>(); + properties.put(propertyKey, (long) i); + cursor.asyncMarkDelete(positions.get(i), properties, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + latch.countDown(); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + fail("Mark delete should succeed"); + } + }, null); + } + + latch.await(); + + int lastIndex = numMessages - 1; + assertEquals(cursor.getMarkDeletedPosition(), positions.get(lastIndex)); + Map<String, Long> properties = cursor.getProperties(); + assertEquals(properties.size(), 1); + assertEquals(properties.get(propertyKey), lastIndex); + } + + @Test(timeOut = 20000) + public void testAsyncDeleteNeverLoseMarkDeleteProperties() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(11); + + @Cleanup ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("testAsyncDeleteNeverLoseMarkDeleteProperty", config); + @Cleanup ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1"); + + int numMessages = 10; + List<Position> positions = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + Position pos = ledger.addEntry("entry-1".getBytes(Encoding)); + positions.add(pos); + } + + String propertyKey = "test-property"; + CountDownLatch latch = new CountDownLatch(numMessages); + for (int i = 0; i < numMessages - 1; i++) { + Map<String, Long> properties = new HashMap<>(); + properties.put(propertyKey, (long) i); + cursor.asyncMarkDelete(positions.get(i), properties, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + latch.countDown(); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + fail("Mark delete should succeed"); + } + }, null); + } + + int lastIndex = numMessages - 1; + cursor.asyncDelete(positions.get(lastIndex), new DeleteCallback() { + @Override + public void deleteComplete(Object ctx) { + latch.countDown(); + } + + @Override + public void deleteFailed(ManagedLedgerException exception, Object ctx) { + fail("Delete should succeed"); + } + }, null); + + latch.await(); + + assertEquals(cursor.getMarkDeletedPosition(), positions.get(lastIndex)); + Map<String, Long> properties = cursor.getProperties(); + assertEquals(properties.size(), 1); + assertEquals(properties.get(propertyKey), lastIndex - 1); + } + class TestPulsarMockBookKeeper extends PulsarMockBookKeeper { Map<Long, Integer> ledgerErrors = new HashMap<>();
