This is an automated email from the ASF dual-hosted git repository.

Technoboy- pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new f9b8ac46118 [fix][broker] Fix 
ManagedLedgerImpl.advanceCursorsIfNecessary() method may lose non-durable 
cursor properties in race condition (#25796)
f9b8ac46118 is described below

commit f9b8ac461180f5412a152974671b43d5075e9eac
Author: Oneby Wang <[email protected]>
AuthorDate: Mon May 18 17:14:10 2026 +0800

    [fix][broker] Fix ManagedLedgerImpl.advanceCursorsIfNecessary() method may 
lose non-durable cursor properties in race condition (#25796)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 62 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 4c2a344d85c..54be0723d47 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3281,7 +3281,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                     && 
highestPositionToDelete.compareTo(cursor.getManagedLedger()
                     .getLastConfirmedEntry()) <= 0 && !(!cursor.isDurable() && 
cursor instanceof NonDurableCursorImpl
                     && ((NonDurableCursorImpl) cursor).isReadCompacted())) {
-                cursor.asyncMarkDelete(highestPositionToDelete, 
cursor.getProperties(), new MarkDeleteCallback() {
+                cursor.asyncMarkDelete(highestPositionToDelete, null, new 
MarkDeleteCallback() {
                     @Override
                     public void markDeleteComplete(Object ctx) {
                     }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index a03de59736e..693823f8b94 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -26,6 +26,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyMap;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
@@ -5358,4 +5359,65 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         // Verify properties are preserved after cursor reset
         assertEquals(cursor.getProperties(), expectedProperties);
     }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testAdvanceCursorsIfNecessaryNeverLoseMarkDeleteProperties() 
throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(1);
+        config.setRetentionTime(0, TimeUnit.SECONDS);
+        config.setRetentionSizeInMB(0);
+
+        @Cleanup
+        ManagedLedgerImpl ledger =
+                (ManagedLedgerImpl) 
factory.open("testAdvanceCursorsIfNecessaryNeverLoseMarkDeleteProperties", 
config);
+        @Cleanup
+        ManagedCursorImpl durableCursor = (ManagedCursorImpl) 
ledger.openCursor("durableCursor1");
+        @Cleanup
+        NonDurableCursorImpl realNonDurableCursor =
+                (NonDurableCursorImpl) 
ledger.newNonDurableCursor(PositionFactory.EARLIEST);
+        NonDurableCursorImpl nonDurableCursor = spy(realNonDurableCursor);
+
+        ledger.getCursors().removeCursor(realNonDurableCursor.getName());
+        ledger.getCursors().add(nonDurableCursor, null);
+
+        CountDownLatch advanceCursorsMarkDeleteEnteredLatch = new 
CountDownLatch(1);
+        CountDownLatch nonDurableCursorsMarkDeleteCompletedLatch = new 
CountDownLatch(1);
+        CountDownLatch advanceCursorsMarkDeleteCompletedLatch = new 
CountDownLatch(1);
+
+        doAnswer(invocation -> {
+            Map<String, Long> invocationProperties = invocation.getArgument(1);
+            // Pause the advanceCursorsIfNecessary mark-delete so the 
nonDurableCursor markDelete() can complete first.
+            if (invocationProperties == null || 
invocationProperties.isEmpty()) {
+                advanceCursorsMarkDeleteEnteredLatch.countDown();
+                assertTrue(nonDurableCursorsMarkDeleteCompletedLatch.await(5, 
TimeUnit.SECONDS));
+                try {
+                    return invocation.callRealMethod();
+                } finally {
+                    advanceCursorsMarkDeleteCompletedLatch.countDown();
+                }
+            }
+
+            return invocation.callRealMethod();
+        }).when(nonDurableCursor)
+                .internalAsyncMarkDelete(any(Position.class), 
nullable(Map.class), any(MarkDeleteCallback.class),
+                        nullable(Object.class), nullable(Runnable.class));
+
+        ledger.addEntry("entry-1".getBytes(Encoding));
+        Position pos2 = ledger.addEntry("entry-2".getBytes(Encoding));
+
+        // Mark-delete the durable cursor to trigger trimming, which advances 
non-durable cursors.
+        durableCursor.markDelete(pos2);
+        assertTrue(advanceCursorsMarkDeleteEnteredLatch.await(5, 
TimeUnit.SECONDS));
+
+        String propertyKey = "test-property";
+        Map<String, Long> properties = new HashMap<>();
+        properties.put(propertyKey, 1L);
+        nonDurableCursor.markDelete(pos2, properties);
+        nonDurableCursorsMarkDeleteCompletedLatch.countDown();
+
+        assertTrue(advanceCursorsMarkDeleteCompletedLatch.await(5, 
TimeUnit.SECONDS));
+        assertEquals(nonDurableCursor.getMarkDeletedPosition(), pos2);
+        assertEquals(nonDurableCursor.getProperties(), properties);
+    }
 }

Reply via email to