This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new aa408ff fix ttl expiry does not take effect (#12266)
aa408ff is described below
commit aa408fff0244b4e4a4d1a8b1475b50704c422a63
Author: Aloys <[email protected]>
AuthorDate: Sat Oct 9 22:18:16 2021 +0800
fix ttl expiry does not take effect (#12266)
Fixes #12265
### Motivation
fix ttl expiry does not take effect
### Modifications
When `ManagedLedgerImpl.getPositionAfterN`
1. check whether the `currentLeder` contains entry when count the entries
of `currentLeder`
2. check the result and `LAC` first before return
### Verifying this change
add test in `ManagedLedgerTest.testGetPositionAfterN`
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 9 +++-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 52 ++++++++++++++++++++++
2 files changed, 60 insertions(+), 1 deletion(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 47f38a9..5dd98cb 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3171,7 +3171,11 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
// for previous ledgers, LedgerInfo in ZK has the number of entries
if (currentLedger != null && currentLedgerId ==
currentLedger.getId()) {
lastLedger = true;
- totalEntriesInCurrentLedger = lastConfirmedEntry.getEntryId()
+ 1;
+ if (currentLedgerEntries > 0) {
+ totalEntriesInCurrentLedger =
lastConfirmedEntry.getEntryId() + 1;
+ } else {
+ totalEntriesInCurrentLedger = 0;
+ }
} else {
totalEntriesInCurrentLedger =
ledgers.get(currentLedgerId).getEntries();
}
@@ -3199,6 +3203,9 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
PositionImpl positionToReturn =
getPreviousPosition(PositionImpl.get(currentLedgerId, currentEntryId));
+ if (positionToReturn.compareTo(lastConfirmedEntry) > 0) {
+ positionToReturn = lastConfirmedEntry;
+ }
if (log.isDebugEnabled()) {
log.debug("getPositionAfterN: Start position {}:{}, startIncluded:
{}, Return position {}:{}",
startPosition.getLedgerId(), startPosition.getEntryId(),
startRange, positionToReturn.getLedgerId(),
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index f8776fd..4358c2f 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2182,6 +2182,58 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
assertTrue(mlInfo.stream().allMatch(ledgerInfo ->
ledgerInfo.hasTimestamp()));
}
+
+
+ @Test
+ public void testGetPositionAfterN() throws Exception {
+ ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ managedLedgerConfig.setMaxEntriesPerLedger(5);
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
factory.open("testGetPositionAfterN", managedLedgerConfig);
+
+ // open cursor to prevent ledger to be deleted when ledger rollover
+ ManagedCursorImpl managedCursor = (ManagedCursorImpl)
managedLedger.openCursor("cursor");
+ Position positionMarkDelete = null;
+ for (int i = 0; i < 10; i ++) {
+ if (i == 3) {
+ positionMarkDelete = managedLedger.addEntry(("entry-" +
i).getBytes(Encoding));
+ continue;
+ }
+ managedLedger.addEntry(("entry-" + i).getBytes(Encoding));
+ }
+
+ managedCursor.markDelete(positionMarkDelete);
+
+ //trigger ledger rollover and wait for the new ledger created
+ managedLedger.rollCurrentLedgerIfFull();
+ Awaitility.await().untilAsserted(() ->
assertEquals(managedLedger.getLedgersInfo().size(), 3));
+ assertEquals(5,
managedLedger.getLedgersInfoAsList().get(0).getEntries());
+ assertEquals(5,
managedLedger.getLedgersInfoAsList().get(1).getEntries());
+ assertEquals(0,
managedLedger.getLedgersInfoAsList().get(2).getEntries());
+ log.info("### ledgers {}", managedLedger.getLedgersInfo());
+
+ long firstLedger = managedLedger.getLedgersInfo().firstKey();
+ long secondLedger =
managedLedger.getLedgersInfoAsList().get(1).getLedgerId();
+
+ PositionImpl startPosition = new PositionImpl(firstLedger, 0);
+
+ PositionImpl targetPosition =
managedLedger.getPositionAfterN(startPosition, 1,
ManagedLedgerImpl.PositionBound.startExcluded);
+ assertEquals(targetPosition.getLedgerId(), firstLedger);
+ assertEquals(targetPosition.getEntryId(), 1);
+
+ targetPosition = managedLedger.getPositionAfterN(startPosition, 4,
ManagedLedgerImpl.PositionBound.startExcluded);
+ assertEquals(targetPosition.getLedgerId(), firstLedger);
+ assertEquals(targetPosition.getEntryId(), 4);
+
+ // test for expiry situation
+ PositionImpl searchPosition =
managedLedger.getNextValidPosition((PositionImpl)
managedCursor.getMarkDeletedPosition());
+ long length = managedCursor.getNumberOfEntriesInStorage();
+ // return the last confirm entry position if searchPosition is exceed
the last confirm entry
+ targetPosition = managedLedger.getPositionAfterN(searchPosition,
length, ManagedLedgerImpl.PositionBound.startExcluded);
+ log.info("Target position is {}", targetPosition);
+ assertEquals(targetPosition.getLedgerId(), secondLedger);
+ assertEquals(targetPosition.getEntryId(), 4);
+ }
+
@Test
public void testEstimatedBacklogSize() throws Exception {
ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("testEstimatedBacklogSize");