This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2f474463abb3a29978cd540ae1de41c75bb47af0
Author: Oneby Wang <[email protected]>
AuthorDate: Fri Feb 6 00:58:59 2026 +0800

    [fix][broker] Fix ManagedCursorImpl.asyncDelete() method may lose previous 
async mark delete properties in race condition (#25165)
    
    (cherry picked from commit bea6f8ac48740b44c5d162bd4aa0ca851eb91ae5)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   5 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 100 +++++++++++++++++++++
 2 files changed, 101 insertions(+), 4 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index c120b9fa719..1ab6ceab9a2 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2681,10 +2681,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         }
 
         try {
-            Map<String, Long> properties = lastMarkDeleteEntry != null ? 
lastMarkDeleteEntry.properties
-                    : Collections.emptyMap();
-
-            internalAsyncMarkDelete(newMarkDeletePosition, properties, new 
MarkDeleteCallback() {
+            internalAsyncMarkDelete(newMarkDeletePosition, null, new 
MarkDeleteCallback() {
                 @Override
                 public void markDeleteComplete(Object ctx) {
                     callback.deleteComplete(ctx);
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index bfb9b6ecca1..f41269b49af 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -5926,6 +5926,106 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         assertEquals(future1.get(2, TimeUnit.SECONDS).get(0).getData(), 
"msg".getBytes());
     }
 
+    @Test(timeOut = 20000)
+    public void testAsyncMarkDeleteNeverLoseProperties() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(3);
+        config.setRetentionTime(20, TimeUnit.SECONDS);
+        config.setRetentionSizeInMB(5);
+
+        @Cleanup ManagedLedgerImpl ledger =
+                (ManagedLedgerImpl) 
factory.open("testAsyncMarkDeleteNeverLoseProperties", config);
+        @Cleanup ManagedCursorImpl cursor = (ManagedCursorImpl) 
ledger.openCursor("c1");
+
+        int numMessages = 20;
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < numMessages; i++) {
+            Position pos = ledger.addEntry("entry-1".getBytes(Encoding));
+            positions.add(pos);
+        }
+
+        String propertyKey = "test-property";
+        CountDownLatch latch = new CountDownLatch(numMessages);
+        for (int i = 0; i < numMessages; i++) {
+            Map<String, Long> properties = new HashMap<>();
+            properties.put(propertyKey, (long) i);
+            cursor.asyncMarkDelete(positions.get(i), properties, new 
MarkDeleteCallback() {
+                @Override
+                public void markDeleteComplete(Object ctx) {
+                    latch.countDown();
+                }
+
+                @Override
+                public void markDeleteFailed(ManagedLedgerException exception, 
Object ctx) {
+                    fail("Mark delete should succeed");
+                }
+            }, null);
+        }
+
+        latch.await();
+
+        int lastIndex = numMessages - 1;
+        assertEquals(cursor.getMarkDeletedPosition(), 
positions.get(lastIndex));
+        Map<String, Long> properties = cursor.getProperties();
+        assertEquals(properties.size(), 1);
+        assertEquals(properties.get(propertyKey), lastIndex);
+    }
+
+    @Test(timeOut = 20000)
+    public void testAsyncDeleteNeverLoseMarkDeleteProperties() throws 
Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(11);
+
+        @Cleanup ManagedLedgerImpl ledger =
+                (ManagedLedgerImpl) 
factory.open("testAsyncDeleteNeverLoseMarkDeleteProperty", config);
+        @Cleanup ManagedCursorImpl cursor = (ManagedCursorImpl) 
ledger.openCursor("c1");
+
+        int numMessages = 10;
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < numMessages; i++) {
+            Position pos = ledger.addEntry("entry-1".getBytes(Encoding));
+            positions.add(pos);
+        }
+
+        String propertyKey = "test-property";
+        CountDownLatch latch = new CountDownLatch(numMessages);
+        for (int i = 0; i < numMessages - 1; i++) {
+            Map<String, Long> properties = new HashMap<>();
+            properties.put(propertyKey, (long) i);
+            cursor.asyncMarkDelete(positions.get(i), properties, new 
MarkDeleteCallback() {
+                @Override
+                public void markDeleteComplete(Object ctx) {
+                    latch.countDown();
+                }
+
+                @Override
+                public void markDeleteFailed(ManagedLedgerException exception, 
Object ctx) {
+                    fail("Mark delete should succeed");
+                }
+            }, null);
+        }
+
+        int lastIndex = numMessages - 1;
+        cursor.asyncDelete(positions.get(lastIndex), new DeleteCallback() {
+            @Override
+            public void deleteComplete(Object ctx) {
+                latch.countDown();
+            }
+
+            @Override
+            public void deleteFailed(ManagedLedgerException exception, Object 
ctx) {
+                fail("Delete should succeed");
+            }
+        }, null);
+
+        latch.await();
+
+        assertEquals(cursor.getMarkDeletedPosition(), 
positions.get(lastIndex));
+        Map<String, Long> properties = cursor.getProperties();
+        assertEquals(properties.size(), 1);
+        assertEquals(properties.get(propertyKey), lastIndex - 1);
+    }
+
     class TestPulsarMockBookKeeper extends PulsarMockBookKeeper {
         Map<Long, Integer> ledgerErrors = new HashMap<>();
 

Reply via email to