This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bea6f8ac487 [fix][broker] Fix ManagedCursorImpl.asyncDelete() method
may lose previous async mark delete properties in race condition (#25165)
bea6f8ac487 is described below
commit bea6f8ac48740b44c5d162bd4aa0ca851eb91ae5
Author: Oneby Wang <[email protected]>
AuthorDate: Fri Feb 6 00:58:59 2026 +0800
[fix][broker] Fix ManagedCursorImpl.asyncDelete() method may lose previous
async mark delete properties in race condition (#25165)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 5 +-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 100 +++++++++++++++++++++
2 files changed, 101 insertions(+), 4 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index c120b9fa719..1ab6ceab9a2 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2681,10 +2681,7 @@ public class ManagedCursorImpl implements ManagedCursor {
}
try {
- Map<String, Long> properties = lastMarkDeleteEntry != null ?
lastMarkDeleteEntry.properties
- : Collections.emptyMap();
-
- internalAsyncMarkDelete(newMarkDeletePosition, properties, new
MarkDeleteCallback() {
+ internalAsyncMarkDelete(newMarkDeletePosition, null, new
MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
callback.deleteComplete(ctx);
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index bfb9b6ecca1..f41269b49af 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -5926,6 +5926,106 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
assertEquals(future1.get(2, TimeUnit.SECONDS).get(0).getData(),
"msg".getBytes());
}
+ @Test(timeOut = 20000)
+ public void testAsyncMarkDeleteNeverLoseProperties() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(3);
+ config.setRetentionTime(20, TimeUnit.SECONDS);
+ config.setRetentionSizeInMB(5);
+
+ @Cleanup ManagedLedgerImpl ledger =
+ (ManagedLedgerImpl)
factory.open("testAsyncMarkDeleteNeverLoseProperties", config);
+ @Cleanup ManagedCursorImpl cursor = (ManagedCursorImpl)
ledger.openCursor("c1");
+
+ int numMessages = 20;
+ List<Position> positions = new ArrayList<>();
+ for (int i = 0; i < numMessages; i++) {
+ Position pos = ledger.addEntry("entry-1".getBytes(Encoding));
+ positions.add(pos);
+ }
+
+ String propertyKey = "test-property";
+ CountDownLatch latch = new CountDownLatch(numMessages);
+ for (int i = 0; i < numMessages; i++) {
+ Map<String, Long> properties = new HashMap<>();
+ properties.put(propertyKey, (long) i);
+ cursor.asyncMarkDelete(positions.get(i), properties, new
MarkDeleteCallback() {
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ latch.countDown();
+ }
+
+ @Override
+ public void markDeleteFailed(ManagedLedgerException exception,
Object ctx) {
+ fail("Mark delete should succeed");
+ }
+ }, null);
+ }
+
+ latch.await();
+
+ int lastIndex = numMessages - 1;
+ assertEquals(cursor.getMarkDeletedPosition(),
positions.get(lastIndex));
+ Map<String, Long> properties = cursor.getProperties();
+ assertEquals(properties.size(), 1);
+ assertEquals(properties.get(propertyKey), lastIndex);
+ }
+
+ @Test(timeOut = 20000)
+ public void testAsyncDeleteNeverLoseMarkDeleteProperties() throws
Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(11);
+
+ @Cleanup ManagedLedgerImpl ledger =
+ (ManagedLedgerImpl)
factory.open("testAsyncDeleteNeverLoseMarkDeleteProperty", config);
+ @Cleanup ManagedCursorImpl cursor = (ManagedCursorImpl)
ledger.openCursor("c1");
+
+ int numMessages = 10;
+ List<Position> positions = new ArrayList<>();
+ for (int i = 0; i < numMessages; i++) {
+ Position pos = ledger.addEntry("entry-1".getBytes(Encoding));
+ positions.add(pos);
+ }
+
+ String propertyKey = "test-property";
+ CountDownLatch latch = new CountDownLatch(numMessages);
+ for (int i = 0; i < numMessages - 1; i++) {
+ Map<String, Long> properties = new HashMap<>();
+ properties.put(propertyKey, (long) i);
+ cursor.asyncMarkDelete(positions.get(i), properties, new
MarkDeleteCallback() {
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ latch.countDown();
+ }
+
+ @Override
+ public void markDeleteFailed(ManagedLedgerException exception,
Object ctx) {
+ fail("Mark delete should succeed");
+ }
+ }, null);
+ }
+
+ int lastIndex = numMessages - 1;
+ cursor.asyncDelete(positions.get(lastIndex), new DeleteCallback() {
+ @Override
+ public void deleteComplete(Object ctx) {
+ latch.countDown();
+ }
+
+ @Override
+ public void deleteFailed(ManagedLedgerException exception, Object
ctx) {
+ fail("Delete should succeed");
+ }
+ }, null);
+
+ latch.await();
+
+ assertEquals(cursor.getMarkDeletedPosition(),
positions.get(lastIndex));
+ Map<String, Long> properties = cursor.getProperties();
+ assertEquals(properties.size(), 1);
+ assertEquals(properties.get(propertyKey), lastIndex - 1);
+ }
+
class TestPulsarMockBookKeeper extends PulsarMockBookKeeper {
Map<Long, Integer> ledgerErrors = new HashMap<>();