This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 12712b03f3b61c51646b3d24dd0ce61ad0c189fe
Author: Aloys <[email protected]>
AuthorDate: Sat Oct 9 22:18:16 2021 +0800

    fix ttl expiry does not take effect (#12266)
    
    Fixes #12265
    
    ### Motivation
    fix ttl expiry does not take effect
    
    ### Modifications
    When `ManagedLedgerImpl.getPositionAfterN`
    1. check whether the  `currentLeder` contains entry when count the entries 
of `currentLeder`
    2. check the result and `LAC` first before return
    
    ### Verifying this change
    add test in `ManagedLedgerTest.testGetPositionAfterN`
    
    (cherry picked from commit aa408fff0244b4e4a4d1a8b1475b50704c422a63)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  9 +++-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 52 ++++++++++++++++++++++
 2 files changed, 60 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 4cb554c..d93ad09 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3165,7 +3165,11 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             // for previous ledgers, LedgerInfo in ZK has the number of entries
             if (currentLedger != null && currentLedgerId == 
currentLedger.getId()) {
                 lastLedger = true;
-                totalEntriesInCurrentLedger = lastConfirmedEntry.getEntryId() 
+ 1;
+                if (currentLedgerEntries > 0) {
+                    totalEntriesInCurrentLedger = 
lastConfirmedEntry.getEntryId() + 1;
+                } else {
+                    totalEntriesInCurrentLedger = 0;
+                }
             } else {
                 totalEntriesInCurrentLedger = 
ledgers.get(currentLedgerId).getEntries();
             }
@@ -3193,6 +3197,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
 
         PositionImpl positionToReturn = 
getPreviousPosition(PositionImpl.get(currentLedgerId, currentEntryId));
+        if (positionToReturn.compareTo(lastConfirmedEntry) > 0) {
+            positionToReturn = lastConfirmedEntry;
+        }
         if (log.isDebugEnabled()) {
             log.debug("getPositionAfterN: Start position {}:{}, startIncluded: 
{}, Return position {}:{}",
                     startPosition.getLedgerId(), startPosition.getEntryId(), 
startRange, positionToReturn.getLedgerId(),
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index ba6e877..eda4848 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2126,6 +2126,58 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         assertTrue(mlInfo.stream().allMatch(ledgerInfo -> 
ledgerInfo.hasTimestamp()));
     }
 
+
+
+    @Test
+    public void testGetPositionAfterN() throws Exception {
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setMaxEntriesPerLedger(5);
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
factory.open("testGetPositionAfterN", managedLedgerConfig);
+
+        // open cursor to prevent ledger to be deleted when ledger rollover
+        ManagedCursorImpl managedCursor = (ManagedCursorImpl) 
managedLedger.openCursor("cursor");
+        Position positionMarkDelete = null;
+        for (int i = 0; i < 10; i ++) {
+            if (i == 3) {
+                positionMarkDelete = managedLedger.addEntry(("entry-" + 
i).getBytes(Encoding));
+                continue;
+            }
+            managedLedger.addEntry(("entry-" + i).getBytes(Encoding));
+        }
+
+        managedCursor.markDelete(positionMarkDelete);
+
+        //trigger ledger rollover and wait for the new ledger created
+        managedLedger.rollCurrentLedgerIfFull();
+        Awaitility.await().untilAsserted(() -> 
assertEquals(managedLedger.getLedgersInfo().size(), 3));
+        assertEquals(5, 
managedLedger.getLedgersInfoAsList().get(0).getEntries());
+        assertEquals(5, 
managedLedger.getLedgersInfoAsList().get(1).getEntries());
+        assertEquals(0, 
managedLedger.getLedgersInfoAsList().get(2).getEntries());
+        log.info("### ledgers {}", managedLedger.getLedgersInfo());
+
+        long firstLedger = managedLedger.getLedgersInfo().firstKey();
+        long secondLedger = 
managedLedger.getLedgersInfoAsList().get(1).getLedgerId();
+
+        PositionImpl startPosition = new PositionImpl(firstLedger, 0);
+
+        PositionImpl targetPosition = 
managedLedger.getPositionAfterN(startPosition, 1, 
ManagedLedgerImpl.PositionBound.startExcluded);
+        assertEquals(targetPosition.getLedgerId(), firstLedger);
+        assertEquals(targetPosition.getEntryId(), 1);
+
+        targetPosition = managedLedger.getPositionAfterN(startPosition, 4, 
ManagedLedgerImpl.PositionBound.startExcluded);
+        assertEquals(targetPosition.getLedgerId(), firstLedger);
+        assertEquals(targetPosition.getEntryId(), 4);
+
+        // test for expiry situation
+        PositionImpl searchPosition = 
managedLedger.getNextValidPosition((PositionImpl) 
managedCursor.getMarkDeletedPosition());
+        long length = managedCursor.getNumberOfEntriesInStorage();
+        // return the last confirm entry position if searchPosition is exceed 
the last confirm entry
+        targetPosition = managedLedger.getPositionAfterN(searchPosition, 
length, ManagedLedgerImpl.PositionBound.startExcluded);
+        log.info("Target position is {}", targetPosition);
+        assertEquals(targetPosition.getLedgerId(), secondLedger);
+        assertEquals(targetPosition.getEntryId(), 4);
+    }
+
     @Test
     public void testEstimatedBacklogSize() throws Exception {
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("testEstimatedBacklogSize");

Reply via email to