This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1adf0427c9dd0e1c2e56ec1a3bb1ad2c2110ca54 Author: Jiwei Guo <[email protected]> AuthorDate: Thu Oct 13 14:46:14 2022 +0800 [fix][broker] Fix `getPositionAfterN` infinite loop. (#17971) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 17 +++++------------ .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 17 +++++++++++++++++ 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 263a3612ceb..230deb27d17 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3274,20 +3274,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { long entriesToSkip = n; long currentLedgerId; long currentEntryId; - if (startRange == PositionBound.startIncluded) { currentLedgerId = startPosition.getLedgerId(); currentEntryId = startPosition.getEntryId(); } else { - // e.g. a mark-delete position PositionImpl nextValidPosition = getNextValidPosition(startPosition); currentLedgerId = nextValidPosition.getLedgerId(); currentEntryId = nextValidPosition.getEntryId(); } - boolean lastLedger = false; long totalEntriesInCurrentLedger; - while (entriesToSkip >= 0) { // for the current ledger, the number of entries written is deduced from the lastConfirmedEntry // for previous ledgers, LedgerInfo in ZK has the number of entries @@ -3302,10 +3298,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { LedgerInfo ledgerInfo = ledgers.get(currentLedgerId); totalEntriesInCurrentLedger = ledgerInfo != null ? ledgerInfo.getEntries() : 0; } - - - long unreadEntriesInCurrentLedger = totalEntriesInCurrentLedger - currentEntryId; - + long unreadEntriesInCurrentLedger = totalEntriesInCurrentLedger > 0 + ? totalEntriesInCurrentLedger - currentEntryId : 0; if (unreadEntriesInCurrentLedger >= entriesToSkip) { // if the current ledger has more entries than what we need to skip // then the return position is in the same ledger @@ -3318,11 +3312,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // there are no more ledgers, return the last position currentEntryId = totalEntriesInCurrentLedger; break; - } else { - Long lid = ledgers.ceilingKey(currentLedgerId + 1); - currentLedgerId = lid != null ? lid : (ledgers.lastKey() + 1); - currentEntryId = 0; } + Long lid = ledgers.ceilingKey(currentLedgerId + 1); + currentLedgerId = lid != null ? lid : ledgers.lastKey(); + currentEntryId = 0; } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 37fa56cd989..7eee82be9dd 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -2567,6 +2567,23 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { log.info("Target position is {}", targetPosition); assertEquals(targetPosition.getLedgerId(), secondLedger); assertEquals(targetPosition.getEntryId(), 4); + + // test for n > NumberOfEntriesInStorage + searchPosition = new PositionImpl(secondLedger, 0); + targetPosition = managedLedger.getPositionAfterN(searchPosition, 100, ManagedLedgerImpl.PositionBound.startIncluded); + assertEquals(targetPosition.getLedgerId(), secondLedger); + assertEquals(targetPosition.getEntryId(), 4); + + // test for startPosition > current ledger + searchPosition = new PositionImpl(999, 0); + targetPosition = managedLedger.getPositionAfterN(searchPosition, 0, ManagedLedgerImpl.PositionBound.startIncluded); + assertEquals(targetPosition.getLedgerId(), secondLedger); + assertEquals(targetPosition.getEntryId(), 4); + + searchPosition = new PositionImpl(999, 0); + targetPosition = managedLedger.getPositionAfterN(searchPosition, 10, ManagedLedgerImpl.PositionBound.startExcluded); + assertEquals(targetPosition.getLedgerId(), secondLedger); + assertEquals(targetPosition.getEntryId(), 4); } @Test
