This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 12712b03f3b61c51646b3d24dd0ce61ad0c189fe Author: Aloys <[email protected]> AuthorDate: Sat Oct 9 22:18:16 2021 +0800 fix ttl expiry does not take effect (#12266) Fixes #12265 ### Motivation fix ttl expiry does not take effect ### Modifications When `ManagedLedgerImpl.getPositionAfterN` 1. check whether the `currentLeder` contains entry when count the entries of `currentLeder` 2. check the result and `LAC` first before return ### Verifying this change add test in `ManagedLedgerTest.testGetPositionAfterN` (cherry picked from commit aa408fff0244b4e4a4d1a8b1475b50704c422a63) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 9 +++- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 52 ++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 4cb554c..d93ad09 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3165,7 +3165,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // for previous ledgers, LedgerInfo in ZK has the number of entries if (currentLedger != null && currentLedgerId == currentLedger.getId()) { lastLedger = true; - totalEntriesInCurrentLedger = lastConfirmedEntry.getEntryId() + 1; + if (currentLedgerEntries > 0) { + totalEntriesInCurrentLedger = lastConfirmedEntry.getEntryId() + 1; + } else { + totalEntriesInCurrentLedger = 0; + } } else { totalEntriesInCurrentLedger = ledgers.get(currentLedgerId).getEntries(); } @@ -3193,6 +3197,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } PositionImpl positionToReturn = getPreviousPosition(PositionImpl.get(currentLedgerId, currentEntryId)); + if (positionToReturn.compareTo(lastConfirmedEntry) > 0) { + positionToReturn = lastConfirmedEntry; + } if (log.isDebugEnabled()) { log.debug("getPositionAfterN: Start position {}:{}, startIncluded: {}, Return position {}:{}", startPosition.getLedgerId(), startPosition.getEntryId(), startRange, positionToReturn.getLedgerId(), diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index ba6e877..eda4848 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -2126,6 +2126,58 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { assertTrue(mlInfo.stream().allMatch(ledgerInfo -> ledgerInfo.hasTimestamp())); } + + + @Test + public void testGetPositionAfterN() throws Exception { + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setMaxEntriesPerLedger(5); + ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) factory.open("testGetPositionAfterN", managedLedgerConfig); + + // open cursor to prevent ledger to be deleted when ledger rollover + ManagedCursorImpl managedCursor = (ManagedCursorImpl) managedLedger.openCursor("cursor"); + Position positionMarkDelete = null; + for (int i = 0; i < 10; i ++) { + if (i == 3) { + positionMarkDelete = managedLedger.addEntry(("entry-" + i).getBytes(Encoding)); + continue; + } + managedLedger.addEntry(("entry-" + i).getBytes(Encoding)); + } + + managedCursor.markDelete(positionMarkDelete); + + //trigger ledger rollover and wait for the new ledger created + managedLedger.rollCurrentLedgerIfFull(); + Awaitility.await().untilAsserted(() -> assertEquals(managedLedger.getLedgersInfo().size(), 3)); + assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries()); + assertEquals(5, managedLedger.getLedgersInfoAsList().get(1).getEntries()); + assertEquals(0, managedLedger.getLedgersInfoAsList().get(2).getEntries()); + log.info("### ledgers {}", managedLedger.getLedgersInfo()); + + long firstLedger = managedLedger.getLedgersInfo().firstKey(); + long secondLedger = managedLedger.getLedgersInfoAsList().get(1).getLedgerId(); + + PositionImpl startPosition = new PositionImpl(firstLedger, 0); + + PositionImpl targetPosition = managedLedger.getPositionAfterN(startPosition, 1, ManagedLedgerImpl.PositionBound.startExcluded); + assertEquals(targetPosition.getLedgerId(), firstLedger); + assertEquals(targetPosition.getEntryId(), 1); + + targetPosition = managedLedger.getPositionAfterN(startPosition, 4, ManagedLedgerImpl.PositionBound.startExcluded); + assertEquals(targetPosition.getLedgerId(), firstLedger); + assertEquals(targetPosition.getEntryId(), 4); + + // test for expiry situation + PositionImpl searchPosition = managedLedger.getNextValidPosition((PositionImpl) managedCursor.getMarkDeletedPosition()); + long length = managedCursor.getNumberOfEntriesInStorage(); + // return the last confirm entry position if searchPosition is exceed the last confirm entry + targetPosition = managedLedger.getPositionAfterN(searchPosition, length, ManagedLedgerImpl.PositionBound.startExcluded); + log.info("Target position is {}", targetPosition); + assertEquals(targetPosition.getLedgerId(), secondLedger); + assertEquals(targetPosition.getEntryId(), 4); + } + @Test public void testEstimatedBacklogSize() throws Exception { ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testEstimatedBacklogSize");
