This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 252050cca35 [fix][broker] Fix markDeletedPosition race condition in
ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger() method
(#25110)
252050cca35 is described below
commit 252050cca35cb94b624bc0e14f6545d208a82c26
Author: Oneby Wang <[email protected]>
AuthorDate: Mon Jan 12 20:48:33 2026 +0800
[fix][broker] Fix markDeletedPosition race condition in
ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger() method
(#25110)
(cherry picked from commit 1617bb22173a117f24d47ac6f11cc2f7c68de635)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 58 ++++---
.../mledger/impl/ManagedLedgerFactoryImpl.java | 17 +-
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 64 ++++---
.../mledger/impl/NonDurableCursorImpl.java | 8 +-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 23 ++-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 187 ++++++++++++++++++++-
6 files changed, 285 insertions(+), 72 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index aaf67108c8b..0b789fc2a24 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1249,14 +1249,17 @@ public class ManagedCursorImpl implements ManagedCursor
{
@Override
public long getNumberOfEntries() {
- if (readPosition.compareTo(ledger.getLastPosition().getNext()) > 0) {
+ Position readPos = readPosition;
+ Position lastPosition = ledger.getLastPosition();
+ Position nextPosition = lastPosition.getNext();
+ if (readPos.compareTo(nextPosition) > 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Read position {} is ahead of last
position {}. There are no entries to read",
- ledger.getName(), name, readPosition,
ledger.getLastPosition());
+ ledger.getName(), name, readPos, lastPosition);
}
return 0;
} else {
- return getNumberOfEntries(Range.closedOpen(readPosition,
ledger.getLastPosition().getNext()));
+ return getNumberOfEntries(Range.closedOpen(readPos, nextPosition));
}
}
@@ -2250,13 +2253,15 @@ public class ManagedCursorImpl implements ManagedCursor
{
}
Position newPosition = ackBatchPosition(position);
- if (ledger.getLastConfirmedEntry().compareTo(newPosition) < 0) {
+ Position markDeletePos = markDeletePosition;
+ Position lastConfirmedEntry = ledger.getLastConfirmedEntry();
+ if (lastConfirmedEntry.compareTo(newPosition) < 0) {
boolean shouldCursorMoveForward = false;
try {
- long ledgerEntries =
ledger.getLedgerInfo(markDeletePosition.getLedgerId()).get().getEntries();
- Long nextValidLedger =
ledger.getNextValidLedger(ledger.getLastConfirmedEntry().getLedgerId());
+ long ledgerEntries =
ledger.getLedgerInfo(markDeletePos.getLedgerId()).get().getEntries();
+ Long nextValidLedger =
ledger.getNextValidLedger(lastConfirmedEntry.getLedgerId());
shouldCursorMoveForward = nextValidLedger != null
- && (markDeletePosition.getEntryId() + 1 >=
ledgerEntries)
+ && (markDeletePos.getEntryId() + 1 >= ledgerEntries)
&& (newPosition.getLedgerId() == nextValidLedger);
} catch (Exception e) {
log.warn("Failed to get ledger entries while setting
mark-delete-position", e);
@@ -2264,11 +2269,11 @@ public class ManagedCursorImpl implements ManagedCursor
{
if (shouldCursorMoveForward) {
log.info("[{}] move mark-delete-position from {} to {} since
all the entries have been consumed",
- ledger.getName(), markDeletePosition, newPosition);
+ ledger.getName(), markDeletePos, newPosition);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed mark delete due to invalid
markDelete {} is ahead of last-confirmed-entry {}"
- + " for cursor [{}]", ledger.getName(), position,
ledger.getLastConfirmedEntry(), name);
+ + " for cursor [{}]", ledger.getName(), position,
lastConfirmedEntry, name);
}
callback.markDeleteFailed(new ManagedLedgerException("Invalid
mark deleted position"), ctx);
return;
@@ -2324,11 +2329,15 @@ public class ManagedCursorImpl implements ManagedCursor
{
final MarkDeleteCallback callback, final Object ctx, Runnable
alignAcknowledgeStatusAfterPersisted) {
ledger.mbean.addMarkDeleteOp();
- MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties,
callback, ctx,
- alignAcknowledgeStatusAfterPersisted);
-
// We cannot write to the ledger during the switch, need to wait until
the new metadata ledger is available
synchronized (pendingMarkDeleteOps) {
+ // use given properties or when missing, use the properties from
the previous field value
+ MarkDeleteEntry last = pendingMarkDeleteOps.peekLast();
+ Map<String, Long> propertiesToUse =
+ properties != null ? properties : (last != null ?
last.properties : getProperties());
+ MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition,
propertiesToUse, callback, ctx,
+ alignAcknowledgeStatusAfterPersisted);
+
// The state might have changed while we were waiting on the queue
mutex
switch (state) {
case Closed:
@@ -2696,17 +2705,20 @@ public class ManagedCursorImpl implements ManagedCursor
{
// update lastMarkDeleteEntry field if newPosition is later than the
current lastMarkDeleteEntry.newPosition
private void updateLastMarkDeleteEntryToLatest(final Position newPosition,
final Map<String, Long>
properties) {
- LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
- if (last != null && last.newPosition.compareTo(newPosition) > 0) {
- // keep current value, don't update
- return last;
- } else {
- // use given properties or when missing, use the properties
from the previous field value
- Map<String, Long> propertiesToUse =
- properties != null ? properties : (last != null ?
last.properties : Collections.emptyMap());
- return new MarkDeleteEntry(newPosition, propertiesToUse, null,
null);
- }
- });
+ synchronized (pendingMarkDeleteOps) {
+ // use given properties or when missing, use the properties from
the previous field value
+ MarkDeleteEntry lastPending = pendingMarkDeleteOps.peekLast();
+ Map<String, Long> propertiesToUse =
+ properties != null ? properties : (lastPending != null ?
lastPending.properties : getProperties());
+ LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
+ if (last != null && last.newPosition.compareTo(newPosition) >
0) {
+ // keep current value, don't update
+ return last;
+ } else {
+ return new MarkDeleteEntry(newPosition, propertiesToUse,
null, null);
+ }
+ });
+ }
}
/**
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 627e3225519..bcca3fb9a52 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -424,14 +424,15 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
public void initializeComplete() {
log.info("[{}] Successfully initialize managed
ledger", name);
pendingInitializeLedgers.remove(name,
pendingLedger);
- future.complete(newledger);
-
- // May need to update the cursor position
-
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
- // May need to trigger offloading
- if (config.isTriggerOffloadOnTopicLoad()) {
-
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
- }
+ // May need to update the cursor position and
wait them finished
+
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger().whenComplete((__, ex)
-> {
+ // ignore ex since it is handled in
maybeUpdateCursorBeforeTrimmingConsumedLedger
+ future.complete(newledger);
+ // May need to trigger offloading
+ if (config.isTriggerOffloadOnTopicLoad()) {
+
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
+ }
+ });
}
@Override
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index fe804179bb3..ea8fc4d3a64 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1655,11 +1655,10 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
updateLedgersIdsComplete(originalCurrentLedger);
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
- lastLedgerCreationInitiationTimestamp,
TimeUnit.MILLISECONDS);
+ // May need to update the cursor position
+ maybeUpdateCursorBeforeTrimmingConsumedLedger();
}
metadataMutex.unlock();
-
- // May need to update the cursor position
- maybeUpdateCursorBeforeTrimmingConsumedLedger();
}
@Override
@@ -2591,18 +2590,23 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
this.waitingEntryCallBacks.add(cb);
}
- public void maybeUpdateCursorBeforeTrimmingConsumedLedger() {
+ public CompletableFuture<Void>
maybeUpdateCursorBeforeTrimmingConsumedLedger() {
+ List<CompletableFuture<Void>> cursorMarkDeleteFutures = new
ArrayList<>();
for (ManagedCursor cursor : cursors) {
- Position lastAckedPosition =
cursor.getPersistentMarkDeletedPosition() != null
- ? cursor.getPersistentMarkDeletedPosition() :
cursor.getMarkDeletedPosition();
- LedgerInfo currPointedLedger =
ledgers.get(lastAckedPosition.getLedgerId());
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ cursorMarkDeleteFutures.add(future);
+
+ // Snapshot positions into a local variables to avoid race
condition.
+ Position markDeletedPosition = cursor.getMarkDeletedPosition();
+ Position lastAckedPosition = markDeletedPosition;
+ LedgerInfo curPointedLedger =
ledgers.get(lastAckedPosition.getLedgerId());
LedgerInfo nextPointedLedger =
Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId()))
.map(Map.Entry::getValue).orElse(null);
- if (currPointedLedger != null) {
+ if (curPointedLedger != null) {
if (nextPointedLedger != null) {
if (lastAckedPosition.getEntryId() != -1
- && lastAckedPosition.getEntryId() + 1 >=
currPointedLedger.getEntries()) {
+ && lastAckedPosition.getEntryId() + 1 >=
curPointedLedger.getEntries()) {
lastAckedPosition =
PositionFactory.create(nextPointedLedger.getLedgerId(), -1);
}
} else {
@@ -2612,25 +2616,37 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
log.warn("Cursor: {} does not exist in the managed-ledger.",
cursor);
}
- if (!lastAckedPosition.equals(cursor.getMarkDeletedPosition())) {
+ int compareResult =
lastAckedPosition.compareTo(markDeletedPosition);
+ if (compareResult > 0) {
Position finalPosition = lastAckedPosition;
- log.info("Reset cursor:{} to {} since ledger consumed
completely", cursor, lastAckedPosition);
- cursor.asyncMarkDelete(lastAckedPosition,
cursor.getProperties(),
- new MarkDeleteCallback() {
- @Override
- public void markDeleteComplete(Object ctx) {
- log.info("Successfully persisted cursor position
for cursor:{} to {}",
- cursor, finalPosition);
- }
+ log.info("Mark deleting cursor:{} from {} to {} since ledger
consumed completely.", cursor,
+ markDeletedPosition, lastAckedPosition);
+ cursor.asyncMarkDelete(lastAckedPosition, null, new
MarkDeleteCallback() {
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ log.info("Successfully persisted cursor position for
cursor:{} to {}", cursor, finalPosition);
+ future.complete(null);
+ }
- @Override
- public void markDeleteFailed(ManagedLedgerException
exception, Object ctx) {
- log.warn("Failed to reset cursor: {} from {} to
{}. Trimming thread will retry next time.",
- cursor, cursor.getMarkDeletedPosition(),
finalPosition, exception);
- }
- }, null);
+ @Override
+ public void markDeleteFailed(ManagedLedgerException
exception, Object ctx) {
+ log.warn("Failed to mark delete: {} from {} to {}. ",
cursor, cursor.getMarkDeletedPosition(),
+ finalPosition, exception);
+ future.completeExceptionally(exception);
+ }
+ }, null);
+ } else if (compareResult == 0) {
+ log.debug("No need to reset cursor: {}, last acked position
equals to current mark-delete position {}.",
+ cursor, markDeletedPosition);
+ future.complete(null);
+ } else {
+ // Should not happen
+ log.warn("Ledger rollover tries to mark delete an already
mark-deleted position. Current mark-delete:"
+ + " {} -- attempted position: {}",
markDeletedPosition, lastAckedPosition);
+ future.complete(null);
}
}
+ return FutureUtil.waitForAll(cursorMarkDeleteFutures);
}
private void trimConsumedLedgersInBackground() {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 246ca81c750..f06817c9408 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -102,11 +102,13 @@ public class NonDurableCursorImpl extends
ManagedCursorImpl {
protected void internalAsyncMarkDelete(final Position newPosition,
Map<String, Long> properties,
final MarkDeleteCallback callback, final Object ctx, Runnable
alignAcknowledgeStatusAfterPersisted) {
// Bypass persistence of mark-delete position and individually deleted
messages info
-
- MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties,
callback, ctx,
- alignAcknowledgeStatusAfterPersisted);
+ MarkDeleteEntry mdEntry;
lock.writeLock().lock();
try {
+ // use given properties or when missing, use the properties from
the previous field value
+ Map<String, Long> propertiesToUse = properties != null ?
properties : getProperties();
+ mdEntry = new MarkDeleteEntry(newPosition, propertiesToUse,
callback, ctx,
+ alignAcknowledgeStatusAfterPersisted);
lastMarkDeleteEntry = mdEntry;
mdEntry.alignAcknowledgeStatus();
} finally {
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 5e367e303d9..d94c2ced768 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -405,7 +405,8 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
ml.close();
ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
ManagedCursorImpl cursorRecovered = (ManagedCursorImpl)
ml.openCursor(cursorName);
- assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(),
lastEntry);
+
assertThat(cursorRecovered.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
+
assertThat(cursorRecovered.getMarkDeletedPosition()).isGreaterThan(lastEntry);
// cleanup.
ml.delete();
@@ -496,12 +497,18 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
assertTrue(slowestReadPosition.getLedgerId() >=
lastEntry.getLedgerId());
assertTrue(slowestReadPosition.getEntryId() >= lastEntry.getEntryId());
assertEquals(cursor.getPersistentMarkDeletedPosition(), lastEntry);
+
assertThat(cursor.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
+
assertThat(cursor.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
// Verify the mark delete position can be recovered properly.
ml.close();
ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
ManagedCursorImpl cursorRecovered = (ManagedCursorImpl)
ml.openCursor(cursorName);
- assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(),
lastEntry);
+
assertThat(cursorRecovered.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
+ // If previous ledger is trimmed, Cursor:
ManagedCursorImpl{ledger=ml_test, name=c1, ackPos=12:0, readPos=15:0}
+ // does not exist in the managed-ledger. Recovered cursor's position
will not be moved forward.
+ // TODO should be handled in ledger trim process.
+
assertThat(cursorRecovered.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
// cleanup.
ml.delete();
@@ -4431,7 +4438,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
ManagedLedger ledger2 =
factory2.open("testFlushCursorAfterInactivity", config);
ManagedCursor c2 = ledger2.openCursor("c");
- assertEquals(c2.getMarkDeletedPosition(),
positions.get(positions.size() - 1));
+
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size()
- 1));
});
}
@@ -4490,7 +4497,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
ManagedLedger ledger2 =
factory2.open("testFlushCursorAfterIndDelInactivity", config);
ManagedCursor c2 = ledger2.openCursor("c");
- assertEquals(c2.getMarkDeletedPosition(),
positions.get(positions.size() - 1));
+
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size()
- 1));
});
}
@@ -4542,7 +4549,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
ManagedLedger ledger2 =
factory2.open("testFlushCursorAfterInactivity", config);
ManagedCursor c2 = ledger2.openCursor("c");
- assertEquals(c2.getMarkDeletedPosition(),
positions.get(positions.size() - 1));
+
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size()
- 1));
});
}
@@ -4805,7 +4812,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
}
@Test
- public void testLazyCursorLedgerCreation() throws Exception {
+ public void testEagerCursorLedgerCreation() throws Exception {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory
.open("testLazyCursorLedgerCreation", managedLedgerConfig);
@@ -4830,8 +4837,8 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
ledger = (ManagedLedgerImpl) factory
.open("testLazyCursorLedgerCreation", managedLedgerConfig);
ManagedCursorImpl cursor1 = (ManagedCursorImpl)
ledger.openCursor("test");
- assertEquals(cursor1.getState(), "NoLedger");
- assertEquals(cursor1.getMarkDeletedPosition(), finalLastPosition);
+ assertEquals(cursor1.getState(), "Open");
+
assertThat(cursor1.getMarkDeletedPosition()).isGreaterThan(finalLastPosition);
// Verify the recovered cursor can work with new mark delete.
lastPosition = null;
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 09cab9f1bdf..c5f75150495 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -19,6 +19,7 @@
package org.apache.bookkeeper.mledger.impl;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyMap;
@@ -56,6 +57,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -69,6 +71,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
@@ -3708,6 +3711,60 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
});
}
+ @Test(timeOut = 20000)
+ public void
testNeverThrowExceptionInMaybeUpdateCursorBeforeTrimmingConsumedLedger()
+ throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(1);
+ int entryNum = 100;
+
+ ManagedLedgerImpl realManagedLedger =
+ (ManagedLedgerImpl)
factory.open("maybeUpdateCursorBeforeTrimmingConsumed_ledger", config);
+ ManagedLedgerImpl managedLedger = spy(realManagedLedger);
+ ManagedCursor cursor = managedLedger.openCursor("c1");
+
+ Deque<CompletableFuture<Void>> futures = new ConcurrentLinkedDeque<>();
+ doAnswer(invocation -> {
+ CompletableFuture<Void> result = (CompletableFuture<Void>)
invocation.callRealMethod();
+ futures.offer(result);
+ return result;
+ }).when(managedLedger).maybeUpdateCursorBeforeTrimmingConsumedLedger();
+
+ final CountDownLatch latch = new CountDownLatch(entryNum);
+ // Two asyncMarkDelete operations running concurrently:
+ // 1. ledger rollover triggered
maybeUpdateCursorBeforeTrimmingConsumedLedger.
+ // 2. user triggered asyncMarkDelete.
+ for (int i = 0; i < entryNum; i++) {
+ managedLedger.asyncAddEntry("entry".getBytes(Encoding), new
AddEntryCallback() {
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object
ctx) {
+ }
+
+ @Override
+ public void addComplete(Position position, ByteBuf entryData,
Object ctx) {
+ cursor.asyncMarkDelete(position, new MarkDeleteCallback() {
+ @Override
+ public void markDeleteFailed(ManagedLedgerException
exception, Object ctx) {
+ fail("Should never fail", exception);
+ }
+
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ latch.countDown();
+ }
+ }, null);
+
+ }
+ }, null);
+ }
+
+ latch.await();
+ assertEquals(cursor.getNumberOfEntries(), 0);
+
+ // Will not throw exception
+ FutureUtil.waitForAll(futures).get();
+ }
+
@Test(timeOut = 20000)
public void testAsyncTruncateLedgerRetention() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
@@ -4968,13 +5025,13 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
}
@Test
- public void testTrimmerRaceCondition() throws Exception {
+ public void testTrimmerRaceConditionInDurableCursor() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(1);
config.setRetentionTime(0, TimeUnit.MILLISECONDS);
config.setRetentionSizeInMB(0);
- ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("testTrimmerRaceCondition", config);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("testTrimmerRaceConditionInDurableCursor", config);
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");
// 1. Add Entry 1 (Ledger 1)
@@ -5003,20 +5060,138 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
}, null);
latch.await();
- assertEquals(cursor.getPersistentMarkDeletedPosition(), lastPosition);
- assertEquals(ledger.getCursors().getSlowestReaderPosition(),
lastPosition);
+
assertThat(cursor.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition);
assertEquals(cursor.getProperties(), properties);
- // 3. Add Entry 2. Triggers Rollover.
+ // 3. Add Entry 2. Triggers second rollover process.
// This implicitly calls maybeUpdateCursorBeforeTrimmingConsumedLedger
due to rollover
Position p = ledger.addEntry("entry-2".getBytes(Encoding));
// Wait for background tasks (metadata callback) to complete.
// We expect at least 2 ledgers (Rollover happened).
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() ->
ledger.getLedgersInfo().size() >= 2);
- assertEquals(cursor.getPersistentMarkDeletedPosition(), new
ImmutablePositionImpl(p.getLedgerId(), -1));
+ // First ledger is all consumed and trimmed, left current ledger and
next empty ledger.
+ assertEquals(cursor.getPersistentMarkDeletedPosition(),
PositionFactory.create(p.getLedgerId(), -1));
+
+ // Verify properties are preserved after cursor reset
+ assertEquals(cursor.getProperties(), properties);
+ }
+
+ @Test
+ public void testTrimmerRaceConditionInNonDurableCursor() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(1);
+ config.setRetentionTime(0, TimeUnit.MILLISECONDS);
+ config.setRetentionSizeInMB(0);
+
+ ManagedLedgerImpl ledger =
+ (ManagedLedgerImpl)
factory.open("testTrimmerRaceConditionInNonDurableCursor", config);
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
ledger.newNonDurableCursor(PositionFactory.EARLIEST);
+
+ // 1. Add Entry 1 (Ledger 1)
+ ledger.addEntry("entry-1".getBytes(Encoding));
+
+ // 2. Ack Entry 1. Verify Persistence with properties.
+ List<Entry> entries = cursor.readEntries(1);
+ assertEquals(entries.size(), 1);
+ Position lastPosition = entries.get(0).getPosition();
+ entries.forEach(Entry::release);
+
+ // Mark delete with properties
+ Map<String, Long> properties = new HashMap<>();
+ properties.put("test-property", 12345L);
+ CountDownLatch latch = new CountDownLatch(1);
+ cursor.asyncMarkDelete(lastPosition, properties, new
MarkDeleteCallback() {
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ latch.countDown();
+ }
+
+ @Override
+ public void markDeleteFailed(ManagedLedgerException exception,
Object ctx) {
+ fail("Mark delete should succeed");
+ }
+ }, null);
+
+ latch.await();
+
assertThat(cursor.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition);
+
assertThat(cursor.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition);
+ assertEquals(cursor.getProperties(), properties);
+
+ // 3. Add Entry 2. Triggers second rollover process.
+ // This implicitly calls maybeUpdateCursorBeforeTrimmingConsumedLedger
due to rollover
+ Position p = ledger.addEntry("entry-2".getBytes(Encoding));
+
+ // Wait for background tasks (metadata callback and trim) to complete.
+ // We expect only one ledger (Rollover and trim happened).
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() ->
ledger.getLedgersInfo().size() == 1);
+ // All ledgers are trimmed, left one empty ledger, trim process moves
markDeletedPosition to p.getLedgerId():0
+ assertEquals(cursor.getMarkDeletedPosition(),
PositionFactory.create(p.getLedgerId(), 0));
+ assertEquals(cursor.getPersistentMarkDeletedPosition(),
PositionFactory.create(p.getLedgerId(), 0));
// Verify properties are preserved after cursor reset
assertEquals(cursor.getProperties(), properties);
}
+
+ @Test
+ public void
testTrimmerRaceConditionWithThrottleMarkDeleteInDurableCursor() throws
Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ int maxEntriesPerLedger = 1;
+ config.setMaxEntriesPerLedger(maxEntriesPerLedger);
+ config.setThrottleMarkDelete(1);
+ config.setRetentionTime(0, TimeUnit.MILLISECONDS);
+ config.setRetentionSizeInMB(0);
+
+ ManagedLedgerImpl ledger =
+ (ManagedLedgerImpl)
factory.open("testTrimmerRaceConditionWithThrottleMarkDeleteInDurableCursor",
+ config);
+ ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<Position> lastPosition = new AtomicReference<>();
+ ledger.asyncAddEntry("entry-1".getBytes(Encoding), new
AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, ByteBuf entryData,
Object ctx) {
+ lastPosition.set(position);
+ // Mark delete with properties
+ Map<String, Long> properties = new HashMap<>();
+ properties.put("test-property", 12345L);
+ cursor.asyncMarkDelete(position, properties, new
MarkDeleteCallback() {
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ latch.countDown();
+ }
+
+ @Override
+ public void markDeleteFailed(ManagedLedgerException
exception, Object ctx) {
+ fail("Mark delete should succeed");
+ }
+ }, null);
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object
ctx) {
+ fail("Add entry should succeed");
+ }
+ }, null);
+
+ latch.await();
+
+ Map<String, Long> expectedProperties = new HashMap<>();
+ expectedProperties.put("test-property", 12345L);
+
assertThat(cursor.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition.get());
+ assertEquals(cursor.getProperties(), expectedProperties);
+
+ // 3. Add Entry 2. Triggers second rollover process.
+ // This implicitly calls maybeUpdateCursorBeforeTrimmingConsumedLedger
due to rollover
+ Position p2 = ledger.addEntry(("entry-2").getBytes(Encoding));
+
+ // Wait for background tasks (metadata callback) to complete.
+ // We expect at least 2 ledgers (Rollover happened).
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() ->
ledger.getLedgersInfo().size() >= 2);
+ assertEquals(cursor.getMarkDeletedPosition(),
PositionFactory.create(p2.getLedgerId(), -1));
+
+ // Verify properties are preserved after cursor reset
+ assertEquals(cursor.getProperties(), expectedProperties);
+ }
}