This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f427b97cd9a59a60dee22f1f244baea148885fc1 Author: zhouyifan279 <[email protected]> AuthorDate: Fri Sep 19 02:16:51 2025 +0800 [fix][broker] First entry will be skipped if opening NonDurableCursor while trimmed ledger is adding first entry. (#24738) Co-authored-by: Yunze Xu <[email protected]> Co-authored-by: Lari Hotari <[email protected]> (cherry picked from commit 7a120f310c286eed78701e10c7e6985bc3b6d001) --- .../mledger/impl/NonDurableCursorImpl.java | 5 +- .../mledger/impl/NonDurableCursorTest.java | 58 ++++++++++++++++++++++ 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java index c2398aaf6ec..96906ddcbb0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java @@ -45,11 +45,12 @@ public class NonDurableCursorImpl extends ManagedCursorImpl { // Compare with "latest" position marker by using only the ledger id. Since the C++ client is using 48bits to // store the entryId, it's not able to pass a Long.max() as entryId. In this case there's no point to require // both ledgerId and entryId to be Long.max() - if (startCursorPosition == null || startCursorPosition.compareTo(ledger.lastConfirmedEntry) > 0) { + Pair<Position, Long> lastPositionCounter = ledger.getLastPositionAndCounter(); + if (startCursorPosition == null || startCursorPosition.compareTo(lastPositionCounter.getLeft()) > 0) { // Start from last entry switch (initialPosition) { case Latest: - initializeCursorPosition(ledger.getLastPositionAndCounter()); + initializeCursorPosition(lastPositionCounter); break; case Earliest: initializeCursorPosition(ledger.getFirstPositionAndCounter()); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java index 0bb1b0fc972..79ab89fa6f4 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.mledger.impl; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; @@ -50,6 +51,11 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.common.api.proto.CommandSubscribe; +import org.awaitility.Awaitility; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -103,6 +109,58 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase { ledger.close(); } + @Test(timeOut = 20000) + void testOpenNonDurableCursorWhileLedgerIsAddingFirstEntryAfterTrimmed() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1) + .setRetentionTime(0, TimeUnit.MILLISECONDS); + config.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS); + @Cleanup + ManagedLedgerImpl ledgerSpy = + Mockito.spy((ManagedLedgerImpl) factory.open("non_durable_cursor_while_ledger_trimmed", config)); + + ledgerSpy.addEntry("message1".getBytes()); + + ledgerSpy.rollCurrentLedgerIfFull(); + Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() -> + ledgerSpy.getLedgersInfoAsList().size() > 1 + ); + CompletableFuture<Void> trimFuture = new CompletableFuture<>(); + ledgerSpy.trimConsumedLedgersInBackground(trimFuture); + trimFuture.join(); + + // Use (currentLedgerId, -1) as startCursorPosition after ledger was trimmed + Position startCursorPosition = PositionFactory.create(ledgerSpy.getCurrentLedger().getId(), -1); + assertTrue(startCursorPosition.compareTo(ledgerSpy.lastConfirmedEntry) > 0); + + CountDownLatch getLastPositionLatch = new CountDownLatch(1); + CountDownLatch newNonDurableCursorLatch = new CountDownLatch(1); + Mockito.when(ledgerSpy.getLastPositionAndCounter()).then((Answer<Pair<Position, Long>>) invocation -> { + newNonDurableCursorLatch.countDown(); + getLastPositionLatch.await(); + return Pair.of(ledgerSpy.lastConfirmedEntry, ENTRIES_ADDED_COUNTER_UPDATER.get(ledgerSpy)); + }); + + CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<ManagedCursor>() + .completeAsync(() -> + new NonDurableCursorImpl(bkc, ledgerSpy, "my_test_cursor", + startCursorPosition, CommandSubscribe.InitialPosition.Latest, false) + ); + Position oldLastConfirmedEntry = ledgerSpy.lastConfirmedEntry; + + // Wait until NonDurableCursorImpl constructor invokes ManagedLedgerImpl.getLastPositionAndCounter + newNonDurableCursorLatch.await(); + // Add first entry after ledger was trimmed + ledgerSpy.addEntry("message2".getBytes()); + assertTrue(oldLastConfirmedEntry.compareTo(ledgerSpy.lastConfirmedEntry) < 0); + + // Unblock NonDurableCursorImpl constructor + getLastPositionLatch.countDown(); + + // cursor should read from lastConfirmedEntry + ManagedCursor cursor = cursorFuture.join(); + assertEquals(cursor.getReadPosition(), ledgerSpy.lastConfirmedEntry); + } + @Test(timeOut = 20000) void testZNodeBypassed() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger");
