This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 6be74f1adaf [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries 
when recovering a terminated managed ledger (#22552)
6be74f1adaf is described below

commit 6be74f1adaf1b6d75a5072ea764cf376fdb02694
Author: Cong Zhao <zhaoc...@apache.org>
AuthorDate: Tue Apr 23 00:05:41 2024 +0800

    [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a 
terminated managed ledger (#22552)
    
    (cherry picked from commit 35599b7325347838203a92ca63b78d134b7864c2)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 61 ++++++++++++++++++++++
 2 files changed, 62 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7a9605830de..d36b85aa10a 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3646,7 +3646,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             Long nextLedgerId = 
ledgers.ceilingKey(skippedPosition.getLedgerId() + 1);
             // This means it has jumped to the last position
             if (nextLedgerId == null) {
-                if (currentLedgerEntries == 0) {
+                if (currentLedgerEntries == 0 && currentLedger != null) {
                     return PositionImpl.get(currentLedger.getId(), 0);
                 }
                 return lastConfirmedEntry.getNext();
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index c9bd64171c1..4e3f8b79084 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -4695,5 +4695,66 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
                 && cursorReadPosition.getEntryId() == 
expectReadPosition.getEntryId());
     }
 
+    @Test
+    public void testRecoverCursorWithTerminateManagedLedger() throws Exception 
{
+        String mlName = "my_test_ledger";
+        String cursorName = "c1";
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, 
config);
+        ManagedCursorImpl c1 = (ManagedCursorImpl) 
ledger.openCursor(cursorName);
+
+        // Write some data.
+        Position p0 = ledger.addEntry("entry-0".getBytes());
+        Position p1 = ledger.addEntry("entry-1".getBytes());
+
+        // Read message.
+        List<Entry> entries = c1.readEntries(2);
+        assertEquals(entries.size(), 2);
+        assertEquals(entries.get(0).getPosition(), p0);
+        assertEquals(entries.get(1).getPosition(), p1);
+        entries.forEach(Entry::release);
+
+        // Mark delete the last message.
+        c1.markDelete(p1);
+        Position markDeletedPosition = c1.getMarkDeletedPosition();
+        Assert.assertEquals(markDeletedPosition, p1);
+
+        // Terminate the managed ledger.
+        Position lastPosition = ledger.terminate();
+        assertEquals(lastPosition, p1);
+
+        // Close the ledger.
+        ledger.close();
+
+        // Reopen the ledger.
+        ledger = (ManagedLedgerImpl) factory.open(mlName, config);
+        BookKeeper mockBookKeeper = mock(BookKeeper.class);
+        final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, 
new ManagedLedgerConfig(), ledger,
+                cursorName);
+
+        CompletableFuture<Void> recoverFuture = new CompletableFuture<>();
+        // Recover the cursor.
+        cursor.recover(new VoidCallback() {
+            @Override
+            public void operationComplete() {
+                recoverFuture.complete(null);
+            }
+
+            @Override
+            public void operationFailed(ManagedLedgerException exception) {
+                recoverFuture.completeExceptionally(exception);
+            }
+        });
+
+        recoverFuture.join();
+        assertTrue(recoverFuture.isDone());
+        assertFalse(recoverFuture.isCompletedExceptionally());
+
+        // Verify the cursor state.
+        assertEquals(cursor.getMarkDeletedPosition(), markDeletedPosition);
+        assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext());
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorTest.class);
 }

Reply via email to