This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f427b97cd9a59a60dee22f1f244baea148885fc1
Author: zhouyifan279 <[email protected]>
AuthorDate: Fri Sep 19 02:16:51 2025 +0800

    [fix][broker] First entry will be skipped if opening NonDurableCursor while 
trimmed ledger is adding first entry. (#24738)
    
    Co-authored-by: Yunze Xu <[email protected]>
    Co-authored-by: Lari Hotari <[email protected]>
    (cherry picked from commit 7a120f310c286eed78701e10c7e6985bc3b6d001)
---
 .../mledger/impl/NonDurableCursorImpl.java         |  5 +-
 .../mledger/impl/NonDurableCursorTest.java         | 58 ++++++++++++++++++++++
 2 files changed, 61 insertions(+), 2 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index c2398aaf6ec..96906ddcbb0 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -45,11 +45,12 @@ public class NonDurableCursorImpl extends ManagedCursorImpl 
{
         // Compare with "latest" position marker by using only the ledger id. 
Since the C++ client is using 48bits to
         // store the entryId, it's not able to pass a Long.max() as entryId. 
In this case there's no point to require
         // both ledgerId and entryId to be Long.max()
-        if (startCursorPosition == null || 
startCursorPosition.compareTo(ledger.lastConfirmedEntry) > 0) {
+        Pair<Position, Long> lastPositionCounter = 
ledger.getLastPositionAndCounter();
+        if (startCursorPosition == null || 
startCursorPosition.compareTo(lastPositionCounter.getLeft()) > 0) {
             // Start from last entry
             switch (initialPosition) {
                 case Latest:
-                    
initializeCursorPosition(ledger.getLastPositionAndCounter());
+                    initializeCursorPosition(lastPositionCounter);
                     break;
                 case Earliest:
                     
initializeCursorPosition(ledger.getFirstPositionAndCounter());
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 0bb1b0fc972..79ab89fa6f4 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
@@ -50,6 +51,11 @@ import 
org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -103,6 +109,58 @@ public class NonDurableCursorTest extends 
MockedBookKeeperTestCase {
         ledger.close();
     }
 
+    @Test(timeOut = 20000)
+    void testOpenNonDurableCursorWhileLedgerIsAddingFirstEntryAfterTrimmed() 
throws Exception {
+        ManagedLedgerConfig config = new 
ManagedLedgerConfig().setMaxEntriesPerLedger(1)
+                .setRetentionTime(0, TimeUnit.MILLISECONDS);
+        config.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+        @Cleanup
+        ManagedLedgerImpl ledgerSpy =
+                Mockito.spy((ManagedLedgerImpl) 
factory.open("non_durable_cursor_while_ledger_trimmed", config));
+
+        ledgerSpy.addEntry("message1".getBytes());
+
+        ledgerSpy.rollCurrentLedgerIfFull();
+        Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() ->
+                ledgerSpy.getLedgersInfoAsList().size() > 1
+        );
+        CompletableFuture<Void> trimFuture = new CompletableFuture<>();
+        ledgerSpy.trimConsumedLedgersInBackground(trimFuture);
+        trimFuture.join();
+
+        // Use (currentLedgerId, -1) as startCursorPosition after ledger was 
trimmed
+        Position startCursorPosition = 
PositionFactory.create(ledgerSpy.getCurrentLedger().getId(), -1);
+        assertTrue(startCursorPosition.compareTo(ledgerSpy.lastConfirmedEntry) 
> 0);
+
+        CountDownLatch getLastPositionLatch = new CountDownLatch(1);
+        CountDownLatch newNonDurableCursorLatch = new CountDownLatch(1);
+        
Mockito.when(ledgerSpy.getLastPositionAndCounter()).then((Answer<Pair<Position, 
Long>>) invocation -> {
+            newNonDurableCursorLatch.countDown();
+            getLastPositionLatch.await();
+            return Pair.of(ledgerSpy.lastConfirmedEntry, 
ENTRIES_ADDED_COUNTER_UPDATER.get(ledgerSpy));
+        });
+
+        CompletableFuture<ManagedCursor> cursorFuture = new 
CompletableFuture<ManagedCursor>()
+                .completeAsync(() ->
+                        new NonDurableCursorImpl(bkc, ledgerSpy, 
"my_test_cursor",
+                                startCursorPosition, 
CommandSubscribe.InitialPosition.Latest, false)
+                );
+        Position oldLastConfirmedEntry = ledgerSpy.lastConfirmedEntry;
+
+        // Wait until NonDurableCursorImpl constructor invokes 
ManagedLedgerImpl.getLastPositionAndCounter
+        newNonDurableCursorLatch.await();
+        // Add first entry after ledger was trimmed
+        ledgerSpy.addEntry("message2".getBytes());
+        
assertTrue(oldLastConfirmedEntry.compareTo(ledgerSpy.lastConfirmedEntry) < 0);
+
+        // Unblock NonDurableCursorImpl constructor
+        getLastPositionLatch.countDown();
+
+        // cursor should read from lastConfirmedEntry
+        ManagedCursor cursor = cursorFuture.join();
+        assertEquals(cursor.getReadPosition(), ledgerSpy.lastConfirmedEntry);
+    }
+
     @Test(timeOut = 20000)
     void testZNodeBypassed() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger");

Reply via email to