This is an automated email from the ASF dual-hosted git repository.
nodece pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dd9462646ac [fix][broker] Fix
ManagedLedgerImpl.advanceCursorsIfNecessary() method may lose non-durable
cursor properties in race condition (#25796)
dd9462646ac is described below
commit dd9462646ace229237d40e1d6036b9aa3c917599
Author: Oneby Wang <[email protected]>
AuthorDate: Mon May 18 17:14:10 2026 +0800
[fix][broker] Fix ManagedLedgerImpl.advanceCursorsIfNecessary() method may
lose non-durable cursor properties in race condition (#25796)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 62 ++++++++++++++++++++++
2 files changed, 63 insertions(+), 1 deletion(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 4a1a3d12ab0..c0a870ea29a 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3281,7 +3281,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
&&
highestPositionToDelete.compareTo(cursor.getManagedLedger()
.getLastConfirmedEntry()) <= 0 && !(!cursor.isDurable() &&
cursor instanceof NonDurableCursorImpl
&& ((NonDurableCursorImpl) cursor).isReadCompacted())) {
- cursor.asyncMarkDelete(highestPositionToDelete,
cursor.getProperties(), new MarkDeleteCallback() {
+ cursor.asyncMarkDelete(highestPositionToDelete, null, new
MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index bef94ecec5e..5dbdcaa71e8 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -26,6 +26,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
@@ -5409,4 +5410,65 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
// Verify properties are preserved after cursor reset
assertEquals(cursor.getProperties(), expectedProperties);
}
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testAdvanceCursorsIfNecessaryNeverLoseMarkDeleteProperties()
throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(1);
+ config.setRetentionTime(0, TimeUnit.SECONDS);
+ config.setRetentionSizeInMB(0);
+
+ @Cleanup
+ ManagedLedgerImpl ledger =
+ (ManagedLedgerImpl)
factory.open("testAdvanceCursorsIfNecessaryNeverLoseMarkDeleteProperties",
config);
+ @Cleanup
+ ManagedCursorImpl durableCursor = (ManagedCursorImpl)
ledger.openCursor("durableCursor1");
+ @Cleanup
+ NonDurableCursorImpl realNonDurableCursor =
+ (NonDurableCursorImpl)
ledger.newNonDurableCursor(PositionFactory.EARLIEST);
+ NonDurableCursorImpl nonDurableCursor = spy(realNonDurableCursor);
+
+ ledger.getCursors().removeCursor(realNonDurableCursor.getName());
+ ledger.getCursors().add(nonDurableCursor, null);
+
+ CountDownLatch advanceCursorsMarkDeleteEnteredLatch = new
CountDownLatch(1);
+ CountDownLatch nonDurableCursorsMarkDeleteCompletedLatch = new
CountDownLatch(1);
+ CountDownLatch advanceCursorsMarkDeleteCompletedLatch = new
CountDownLatch(1);
+
+ doAnswer(invocation -> {
+ Map<String, Long> invocationProperties = invocation.getArgument(1);
+ // Pause the advanceCursorsIfNecessary mark-delete so the
nonDurableCursor markDelete() can complete first.
+ if (invocationProperties == null ||
invocationProperties.isEmpty()) {
+ advanceCursorsMarkDeleteEnteredLatch.countDown();
+ assertTrue(nonDurableCursorsMarkDeleteCompletedLatch.await(5,
TimeUnit.SECONDS));
+ try {
+ return invocation.callRealMethod();
+ } finally {
+ advanceCursorsMarkDeleteCompletedLatch.countDown();
+ }
+ }
+
+ return invocation.callRealMethod();
+ }).when(nonDurableCursor)
+ .internalAsyncMarkDelete(any(Position.class),
nullable(Map.class), any(MarkDeleteCallback.class),
+ nullable(Object.class), nullable(Runnable.class));
+
+ ledger.addEntry("entry-1".getBytes(Encoding));
+ Position pos2 = ledger.addEntry("entry-2".getBytes(Encoding));
+
+ // Mark-delete the durable cursor to trigger trimming, which advances
non-durable cursors.
+ durableCursor.markDelete(pos2);
+ assertTrue(advanceCursorsMarkDeleteEnteredLatch.await(5,
TimeUnit.SECONDS));
+
+ String propertyKey = "test-property";
+ Map<String, Long> properties = new HashMap<>();
+ properties.put(propertyKey, 1L);
+ nonDurableCursor.markDelete(pos2, properties);
+ nonDurableCursorsMarkDeleteCompletedLatch.countDown();
+
+ assertTrue(advanceCursorsMarkDeleteCompletedLatch.await(5,
TimeUnit.SECONDS));
+ assertEquals(nonDurableCursor.getMarkDeletedPosition(), pos2);
+ assertEquals(nonDurableCursor.getProperties(), properties);
+ }
}