This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1adf0427c9dd0e1c2e56ec1a3bb1ad2c2110ca54
Author: Jiwei Guo <[email protected]>
AuthorDate: Thu Oct 13 14:46:14 2022 +0800

    [fix][broker] Fix `getPositionAfterN` infinite loop. (#17971)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java      | 17 +++++------------
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java      | 17 +++++++++++++++++
 2 files changed, 22 insertions(+), 12 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 263a3612ceb..230deb27d17 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3274,20 +3274,16 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
         long entriesToSkip = n;
         long currentLedgerId;
         long currentEntryId;
-
         if (startRange == PositionBound.startIncluded) {
             currentLedgerId = startPosition.getLedgerId();
             currentEntryId = startPosition.getEntryId();
         } else {
-            // e.g. a mark-delete position
             PositionImpl nextValidPosition = 
getNextValidPosition(startPosition);
             currentLedgerId = nextValidPosition.getLedgerId();
             currentEntryId = nextValidPosition.getEntryId();
         }
-
         boolean lastLedger = false;
         long totalEntriesInCurrentLedger;
-
         while (entriesToSkip >= 0) {
             // for the current ledger, the number of entries written is 
deduced from the lastConfirmedEntry
             // for previous ledgers, LedgerInfo in ZK has the number of entries
@@ -3302,10 +3298,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 LedgerInfo ledgerInfo = ledgers.get(currentLedgerId);
                 totalEntriesInCurrentLedger = ledgerInfo != null ? 
ledgerInfo.getEntries() : 0;
             }
-
-
-            long unreadEntriesInCurrentLedger = totalEntriesInCurrentLedger - 
currentEntryId;
-
+            long unreadEntriesInCurrentLedger = totalEntriesInCurrentLedger > 0
+                    ? totalEntriesInCurrentLedger - currentEntryId : 0;
             if (unreadEntriesInCurrentLedger >= entriesToSkip) {
                 // if the current ledger has more entries than what we need to 
skip
                 // then the return position is in the same ledger
@@ -3318,11 +3312,10 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
                     // there are no more ledgers, return the last position
                     currentEntryId = totalEntriesInCurrentLedger;
                     break;
-                } else {
-                    Long lid = ledgers.ceilingKey(currentLedgerId + 1);
-                    currentLedgerId = lid != null ? lid : (ledgers.lastKey() + 
1);
-                    currentEntryId = 0;
                 }
+                Long lid = ledgers.ceilingKey(currentLedgerId + 1);
+                currentLedgerId = lid != null ? lid : ledgers.lastKey();
+                currentEntryId = 0;
             }
         }
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 37fa56cd989..7eee82be9dd 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2567,6 +2567,23 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         log.info("Target position is {}", targetPosition);
         assertEquals(targetPosition.getLedgerId(), secondLedger);
         assertEquals(targetPosition.getEntryId(), 4);
+
+        // test for n > NumberOfEntriesInStorage
+        searchPosition = new PositionImpl(secondLedger, 0);
+        targetPosition = managedLedger.getPositionAfterN(searchPosition, 100, 
ManagedLedgerImpl.PositionBound.startIncluded);
+        assertEquals(targetPosition.getLedgerId(), secondLedger);
+        assertEquals(targetPosition.getEntryId(), 4);
+
+        // test for startPosition > current ledger
+        searchPosition = new PositionImpl(999, 0);
+        targetPosition = managedLedger.getPositionAfterN(searchPosition, 0, 
ManagedLedgerImpl.PositionBound.startIncluded);
+        assertEquals(targetPosition.getLedgerId(), secondLedger);
+        assertEquals(targetPosition.getEntryId(), 4);
+
+        searchPosition = new PositionImpl(999, 0);
+        targetPosition = managedLedger.getPositionAfterN(searchPosition, 10, 
ManagedLedgerImpl.PositionBound.startExcluded);
+        assertEquals(targetPosition.getLedgerId(), secondLedger);
+        assertEquals(targetPosition.getEntryId(), 4);
     }
 
     @Test

Reply via email to