This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ed31d82ccca [fix][broker]Wrong backlog: expected 0 but got 1 (#24938)
ed31d82ccca is described below

commit ed31d82ccca0fed50a3977786f5b7ac306af7dbc
Author: fengyubiao <[email protected]>
AuthorDate: Mon Nov 17 03:24:48 2025 +0800

    [fix][broker]Wrong backlog: expected 0 but got 1 (#24938)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  12 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  98 ++++--
 .../mledger/impl/ManagedCursorListAckTest.java     |   2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 331 +++++++++++++++++++++
 .../client/api/SimpleProducerConsumerTest.java     |  47 +++
 5 files changed, 467 insertions(+), 23 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index d32f0c8e998..e1ee50aad0f 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1489,12 +1489,16 @@ public class ManagedCursorImpl implements ManagedCursor 
{
                 // modify mark delete and read position since we are able to 
persist new position for cursor
                 lock.writeLock().lock();
                 try {
-                    if (markDeletePosition.compareTo(newMarkDeletePosition) >= 
0) {
+                    // Correct the variable "messagesConsumedCounter".
+                    // BTW, no need to change "messagesConsumedCounter" if new 
"markDeletePosition" is the same as the
+                    // old one.
+                    int compareRes = 
ledger.comparePositions(markDeletePosition, newMarkDeletePosition);
+                    if (compareRes > 0) {
                         MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), 
-getNumberOfEntries(
-                                Range.closedOpen(newMarkDeletePosition, 
markDeletePosition)));
-                    } else {
+                                Range.openClosed(newMarkDeletePosition, 
markDeletePosition)));
+                    } else if (compareRes < 0) {
                         MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), 
getNumberOfEntries(
-                                Range.closedOpen(markDeletePosition, 
newMarkDeletePosition)));
+                                Range.openClosed(markDeletePosition, 
newMarkDeletePosition)));
                     }
                     markDeletePosition = newMarkDeletePosition;
                     lastMarkDeleteEntry = new 
MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor()
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 17ffecb5617..a42974f9b2c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3868,6 +3868,32 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 });
     }
 
+    /**
+     * Compare two positions. It is different with {@link 
Position#compareTo(Position)} when the params are invalid.
+     * For example: position-1 is "1:{latest entry}", and position-2 is 
"2:-1", they are the same position.
+     */
+    @VisibleForTesting
+    int comparePositions(Position pos1, Position pos2) {
+        if (pos1 == null || pos2 == null) {
+            throw new IllegalArgumentException("Positions must not be null");
+        }
+        if (ledgers.isEmpty() || pos1.getLedgerId() < 
getFirstPosition().getLedgerId()
+                || pos2.getLedgerId() < getFirstPosition().getLedgerId()
+                || pos1.getLedgerId() > getLastPosition().getLedgerId()
+                || pos2.getLedgerId() > getLastPosition().getLedgerId()) {
+            log.warn("[{}] Comparing un-exist position {} and {}", name, pos1, 
pos2,
+                    new IllegalArgumentException("Comparing un-exist 
position"));
+            return pos1.compareTo(pos2);
+        }
+        if (pos1.getLedgerId() == pos2.getLedgerId()) {
+            return Long.compare(pos1.getEntryId(), pos2.getEntryId());
+        }
+        if (!isValidPosition(pos1) || !isValidPosition(pos2)) {
+            return 
getNextValidPosition(pos1).compareTo(getNextValidPosition(pos2));
+        }
+        return pos1.compareTo(pos2);
+    }
+
     /**
      * Get the number of entries between a contiguous range of two positions.
      *
@@ -3880,41 +3906,77 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
         boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED;
         Position toPosition = range.upperEndpoint();
         boolean toIncluded = range.upperBoundType() == BoundType.CLOSED;
+        if (comparePositions(fromPosition, toPosition) > 0) {
+            log.warn("[{}] Getting number of entries with an invalid range {} 
and {}", name, fromPosition, toPosition);
+            throw new IllegalArgumentException("Invalid range " + range);
+        }
+
+        // 1. If the "fromPosition" is after "toPosition", then there is no 
entry in the range.
+        // 2. If both "formPosition" and "toPosition" have negative entry id 
amd in the same ledger, then there is no
+        //    entry in the range.
+        if (fromPosition.getLedgerId() > toPosition.getLedgerId()
+            || (fromPosition.getLedgerId() == toPosition.getLedgerId()
+                && fromPosition.getEntryId() > toPosition.getEntryId())
+            || (fromPosition.getLedgerId() == toPosition.getLedgerId()
+                && fromPosition.getEntryId() < 0 && toPosition.getEntryId() < 
0)) {
+            return 0;
+        }
 
+        // If the 2 positions are in the same ledger.
         if (fromPosition.getLedgerId() == toPosition.getLedgerId()) {
             LedgerInfo li = ledgers.get(toPosition.getLedgerId());
             if (li != null) {
                 // If the 2 positions are in the same ledger
                 long count = toPosition.getEntryId() - 
fromPosition.getEntryId() - 1;
-                count += fromIncluded ? 1 : 0;
-                count += toIncluded ? 1 : 0;
+                count += fromIncluded && fromPosition.getEntryId() >= 0 ? 1 : 
0;
+                count += toIncluded && toPosition.getEntryId() >= 0 ? 1 : 0;
                 return count;
             } else {
                 // if the ledgerId is not in the ledgers, it means it has been 
deleted
                 return 0;
             }
-        } else {
-            long count = 0;
-            // If the from & to are pointing to different ledgers, then we 
need to :
-            // 1. Add the entries in the ledger pointed by toPosition
-            count += toPosition.getEntryId();
+        }
+
+        // If the "fromPosition.ledgerId" is larger than "toPosition.ledgerId".
+        // 1. Add the entries in the ledger pointed by toPosition.
+        // 2. Add the entries in the ledger pointed by fromPosition.
+        // 3. Add the whole ledgers entries in between.
+        long count = 0;
+
+        // 1. Add the entries in the ledger pointed by toPosition.
+        //    Add nothing if "toPosition" does not exit in "ledgers".
+        //    Add nothing if "toPosition.entryId < 0".
+        LedgerInfo toLedger = ledgers.get(toPosition.getLedgerId());
+        if (toPosition.getEntryId() >= 0 && toLedger != null) {
+            // To support the use case "cursor.getNumberOfEntries()", which 
will use a "toPosition" that is larger
+            // than the LAC.
+            // To support this case, use "Long.MAX_VALUE" if the ledger is the 
last one.
+            long entriesInLedger = comparePositions(toPosition, 
lastConfirmedEntry) >= 0
+                    ? Long.MAX_VALUE : toLedger.getEntries();
+            count += Math.min(toPosition.getEntryId(), entriesInLedger - 1);
             count += toIncluded ? 1 : 0;
+        }
 
-            // 2. Add the entries in the ledger pointed by fromPosition
-            LedgerInfo li = ledgers.get(fromPosition.getLedgerId());
-            if (li != null) {
-                count += li.getEntries() - (fromPosition.getEntryId() + 1);
+        // 2. Add the entries in the ledger pointed by fromPosition.
+        //    Add nothing if "toPosition.entryId < 0".
+        //    Add nothing if "toPosition" does not exit in "ledgers".
+        LedgerInfo formLedger = ledgers.get(fromPosition.getLedgerId());
+        if (formLedger != null) {
+            if (fromPosition.getEntryId() < 0) {
+                count += formLedger.getEntries();
+            } else {
+                count += formLedger.getEntries() - (fromPosition.getEntryId() 
+ 1);
                 count += fromIncluded ? 1 : 0;
             }
+        }
 
-            // 3. Add the whole ledgers entries in between
-            for (LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(), 
false, toPosition.getLedgerId(), false)
-                    .values()) {
-                count += ls.getEntries();
-            }
-
-            return count;
+        // 3. Add the whole ledgers entries in between
+        for (LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(), false, 
toPosition.getLedgerId(), false)
+                .values()) {
+            count += ls.getEntries();
         }
+
+        return count;
     }
 
     /**
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
index c4d3b076ba3..a4895b2624b 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
@@ -33,7 +33,7 @@ public class ManagedCursorListAckTest extends 
MockedBookKeeperTestCase {
 
     private static final Charset Encoding = StandardCharsets.UTF_8;
 
-    @Test(timeOut = 20000)
+    @Test(timeOut = 20000 * 1000)
     void testMultiPositionDelete() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger", new 
ManagedLedgerConfig().setMaxEntriesPerLedger(2));
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index eb65cbd2c22..9216bd60ed4 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -4774,4 +4774,335 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         cursor.close();
         ledger.close();
     }
+
+    @Test
+    public void testGetNumberOfEntriesWithRangeParam() throws Exception {
+        final String ledgerName = "ml_" + 
UUID.randomUUID().toString().replaceAll("-", "");
+        final String cursorName = "test-cursor";
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(ledgerName, 
config);
+        // Create a cursor to avoid entries being trimmed.
+        ml.openCursor(cursorName);
+        int totalEntries = 35;
+        List<Position> positions = new ArrayList<>(totalEntries);
+        for (int i = 0; i < totalEntries; i++) {
+            Position pos = ml.addEntry(("entry-" + i).getBytes());
+            positions.add(pos);
+        }
+        Iterator<LedgerInfo> iterator = 
ml.getLedgersInfo().values().iterator();
+        LedgerInfo ledger1 = iterator.next();
+        LedgerInfo ledger2 = iterator.next();
+        LedgerInfo ledger3 = iterator.next();
+        LedgerInfo ledger4 = iterator.next();
+        assertEquals(ledger1.getEntries(), 10);
+        assertEquals(ledger2.getEntries(), 10);
+        assertEquals(ledger3.getEntries(), 10);
+        assertEquals(ledger4.getLedgerId(), ml.getCurrentLedger().getId());
+
+        // Normal case: same ledger.
+        Range<Position> range11 = Range.closed(positions.get(0), 
positions.get(9));
+        assertEquals(ml.getNumberOfEntries(range11), 10);
+        Range<Position> range12 = Range.openClosed(positions.get(1), 
positions.get(9));
+        assertEquals(ml.getNumberOfEntries(range12), 8);
+        Range<Position> range13 = Range.closedOpen(positions.get(2), 
positions.get(9));
+        assertEquals(ml.getNumberOfEntries(range13), 7);
+
+        // Normal case: crosses ledgers.
+        Range<Position> range21 = Range.closed(positions.get(0), 
positions.get(19));
+        assertEquals(ml.getNumberOfEntries(range21), 20);
+        Range<Position> range22 = Range.openClosed(positions.get(0), 
positions.get(19));
+        assertEquals(ml.getNumberOfEntries(range22), 19);
+        Range<Position> range23 = Range.closedOpen(positions.get(0), 
positions.get(19));
+        assertEquals(ml.getNumberOfEntries(range23), 19);
+        Range<Position> range24 = Range.closed(positions.get(0), 
positions.get(29));
+        assertEquals(ml.getNumberOfEntries(range24), 30);
+        Range<Position> range25 = Range.openClosed(positions.get(0), 
positions.get(29));
+        assertEquals(ml.getNumberOfEntries(range25), 29);
+        Range<Position> range26 = Range.closedOpen(positions.get(0), 
positions.get(29));
+        assertEquals(ml.getNumberOfEntries(range26), 29);
+
+        // Normal case: end with current ledger.
+        Range<Position> range27 = Range.closed(positions.get(0), 
positions.get(34));
+        assertEquals(ml.getNumberOfEntries(range27), 35);
+        // Cover the following case.
+        // The use case "cursor.getNumberOfEntries()", which will use a 
"toPosition" that with an entry
+        // id that is larger than the LAC.
+        Range<Position> range28 = Range.closed(positions.get(0), 
PositionFactory.create(ledger4.getLedgerId(), 100));
+        assertEquals(ml.getNumberOfEntries(range28), 131);
+
+        // From position that entry id is "-1" & positions in the same ledger.
+        Range<Position> range31 = 
Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1),
+                positions.get(9));
+        assertEquals(ml.getNumberOfEntries(range31), 10);
+        Range<Position> range32 = 
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1),
+                positions.get(9));
+        assertEquals(ml.getNumberOfEntries(range32), 10);
+        Range<Position> range33 = 
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1),
+                positions.get(9));
+        assertEquals(ml.getNumberOfEntries(range33), 9);
+
+        // From position that entry id is "-1" & crosses ledgers.
+        Range<Position> range41 = 
Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1),
+                positions.get(15));
+        assertEquals(ml.getNumberOfEntries(range41), 16);
+        Range<Position> range42 = 
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1),
+                positions.get(15));
+        assertEquals(ml.getNumberOfEntries(range42), 16);
+        Range<Position> range43 = 
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1),
+                positions.get(15));
+        assertEquals(ml.getNumberOfEntries(range43), 15);
+        Range<Position> range44 = 
Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1),
+                positions.get(25));
+        assertEquals(ml.getNumberOfEntries(range44), 26);
+        Range<Position> range45 = 
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1),
+                positions.get(25));
+        assertEquals(ml.getNumberOfEntries(range45), 26);
+        Range<Position> range46 = 
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1),
+                positions.get(25));
+        assertEquals(ml.getNumberOfEntries(range46), 25);
+
+        // Invalid range.
+        try {
+            Range.closed(positions.get(1), 
PositionFactory.create(ledger1.getLedgerId(), -1));
+            fail("Should have failed because the range is invalid.");
+        } catch (IllegalArgumentException ex) {
+            assertTrue(ex.getMessage().contains("Invalid range"));
+        }
+        try {
+            Range.closed(positions.get(29), positions.get(0));
+            fail("Should have failed because the range is invalid.");
+        } catch (IllegalArgumentException ex) {
+            assertTrue(ex.getMessage().contains("Invalid range"));
+        }
+
+        // "To position" that entry id is "-1" & crosses ledgers.
+        Range<Position> range61 = Range.closed(positions.get(1), 
PositionFactory.create(ledger2.getLedgerId(), -1));
+        assertEquals(ml.getNumberOfEntries(range61), 9);
+        Range<Position> range62 = Range.closedOpen(positions.get(1), 
PositionFactory.create(ledger2.getLedgerId(), -1));
+        assertEquals(ml.getNumberOfEntries(range62), 9);
+        Range<Position> range63 = Range.openClosed(positions.get(1), 
PositionFactory.create(ledger2.getLedgerId(), -1));
+        assertEquals(ml.getNumberOfEntries(range63), 8);
+        Range<Position> range64 = Range.closed(positions.get(1), 
PositionFactory.create(ledger3.getLedgerId(), -1));
+        assertEquals(ml.getNumberOfEntries(range64), 19);
+        Range<Position> range65 = Range.closedOpen(positions.get(1), 
PositionFactory.create(ledger3.getLedgerId(), -1));
+        assertEquals(ml.getNumberOfEntries(range65), 19);
+        Range<Position> range66 = Range.openClosed(positions.get(1), 
PositionFactory.create(ledger3.getLedgerId(), -1));
+        assertEquals(ml.getNumberOfEntries(range66), 18);
+
+        // "From position" is the latest entry of a ledger.
+        Range<Position> range71 = 
Range.closed(PositionFactory.create(ledger1.getLedgerId(), 9), 
positions.get(10));
+        assertEquals(ml.getNumberOfEntries(range71), 2);
+        Range<Position> range72 = 
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), 9), 
positions.get(10));
+        assertEquals(ml.getNumberOfEntries(range72), 1);
+        Range<Position> range73 = 
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), 9), 
positions.get(10));
+        assertEquals(ml.getNumberOfEntries(range73), 1);
+
+        // "From position" is the latest entry of a ledger, and "to position" 
has a negative entry id.
+        Range<Position> range81 = 
Range.closed(PositionFactory.create(ledger1.getLedgerId(), 9),
+                PositionFactory.create(ledger2.getLedgerId(), -1));
+        assertEquals(ml.getNumberOfEntries(range81), 1);
+        Range<Position> range82 = 
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), 9),
+                PositionFactory.create(ledger2.getLedgerId(), -1));
+        assertEquals(ml.getNumberOfEntries(range82), 0);
+        Range<Position> range83 = 
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), 9),
+                PositionFactory.create(ledger2.getLedgerId(), -1));
+        assertEquals(ml.getNumberOfEntries(range83), 1);
+
+        // "From position" is the latest entry of a ledger, and "to position" 
has a negative entry id & crosses ledgers.
+        Range<Position> range91 = 
Range.closed(PositionFactory.create(ledger1.getLedgerId(), 9),
+                PositionFactory.create(ledger3.getLedgerId(), -1));
+        assertEquals(ml.getNumberOfEntries(range91), 11);
+        Range<Position> range92 = 
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), 9),
+                PositionFactory.create(ledger3.getLedgerId(), -1));
+        assertEquals(ml.getNumberOfEntries(range92), 10);
+        Range<Position> range93 = 
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), 9),
+                PositionFactory.create(ledger3.getLedgerId(), -1));
+        assertEquals(ml.getNumberOfEntries(range93), 11);
+
+        // "To Position" is larger than LAC.
+        Range<Position> range101 = 
Range.closed(PositionFactory.create(ledger1.getLedgerId(), 9),
+                PositionFactory.create(ledger3.getLedgerId(), 100));
+        assertEquals(ml.getNumberOfEntries(range101), 21);
+        Range<Position> range102 = 
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), 9),
+                PositionFactory.create(ledger3.getLedgerId(), 100));
+        assertEquals(ml.getNumberOfEntries(range102), 20);
+        Range<Position> range103 = 
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), 9),
+                PositionFactory.create(ledger3.getLedgerId(), 100));
+        assertEquals(ml.getNumberOfEntries(range103), 20);
+
+        // "From position" is smaller than the first one.
+        Range<Position> range111 = 
Range.closed(PositionFactory.create(ledger1.getLedgerId() - 1, 9),
+                PositionFactory.create(ledger3.getLedgerId(), 100));
+        assertEquals(ml.getNumberOfEntries(range111), 30);
+        Range<Position> range112 = 
Range.openClosed(PositionFactory.create(ledger1.getLedgerId() - 1, 9),
+                PositionFactory.create(ledger3.getLedgerId(), 100));
+        assertEquals(ml.getNumberOfEntries(range112), 30);
+        Range<Position> range113 = 
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId() - 1, 9),
+                PositionFactory.create(ledger3.getLedgerId(), 100));
+        assertEquals(ml.getNumberOfEntries(range113), 29);
+
+        // Both "fromPosition" and "toPosition" have negative entry id & in 
the same ledger.
+        Range<Position> range121 = 
Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1),
+                PositionFactory.create(ledger1.getLedgerId(), -1));
+        assertEquals(ml.getNumberOfEntries(range121), 0);
+        Range<Position> range122 = 
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -10),
+                PositionFactory.create(ledger1.getLedgerId(), -1));
+        assertEquals(ml.getNumberOfEntries(range122), 0);
+        // Both "fromPosition" and "toPosition" have negative entry id & 
crosses ledgers.
+        Range<Position> range123 = 
Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1),
+                PositionFactory.create(ledger2.getLedgerId(), -1));
+        assertEquals(ml.getNumberOfEntries(range123), 10);
+        Range<Position> range124 = 
Range.closed(PositionFactory.create(ledger1.getLedgerId(), -10),
+                PositionFactory.create(ledger3.getLedgerId(), -1));
+        assertEquals(ml.getNumberOfEntries(range124), 20);
+        Range<Position> range125 = 
Range.closed(PositionFactory.create(ledger1.getLedgerId(), -10),
+                PositionFactory.create(ledger3.getLedgerId(), -1000));
+        assertEquals(ml.getNumberOfEntries(range125), 20);
+        Range<Position> range126 = 
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1),
+                PositionFactory.create(ledger2.getLedgerId(), -1));
+        assertEquals(ml.getNumberOfEntries(range126), 10);
+        Range<Position> range127 = 
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -10),
+                PositionFactory.create(ledger3.getLedgerId(), -1));
+        assertEquals(ml.getNumberOfEntries(range127), 20);
+        Range<Position> range128 = 
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -10),
+                PositionFactory.create(ledger3.getLedgerId(), -1000));
+        assertEquals(ml.getNumberOfEntries(range128), 20);
+        Range<Position> range129 = 
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1),
+                PositionFactory.create(ledger2.getLedgerId(), -1));
+        assertEquals(ml.getNumberOfEntries(range129), 10);
+        Range<Position> range1210 = 
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -10),
+                PositionFactory.create(ledger3.getLedgerId(), -1));
+        assertEquals(ml.getNumberOfEntries(range1210), 20);
+        Range<Position> range1211 = 
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -10),
+                PositionFactory.create(ledger3.getLedgerId(), -1000));
+        assertEquals(ml.getNumberOfEntries(range1211), 20);
+        try {
+            Range.openClosed(PositionFactory.create(ledger2.getLedgerId(), 
-10),
+            PositionFactory.create(ledger1.getLedgerId(), -1));
+            fail("Should have failed because the range is invalid.");
+        } catch (IllegalArgumentException ex) {
+            assertTrue(ex.getMessage().contains("Invalid range"));
+        }
+
+        // Cover the following case.
+        // The use case "cursor.getNumberOfEntries()", which will use a 
"toPosition" that with an entry
+        // id that is larger than the LAC.
+        // The difference with above one: the LAC is not in the latest ledger.
+        ml.close();
+        ManagedLedgerImpl ml2 =  (ManagedLedgerImpl) factory.open(ledgerName, 
config);
+        assertNotEquals(ledger4.getLedgerId(), ml2.currentLedger.getId());
+        Range<Position> range131 = Range.closed(positions.get(0), 
PositionFactory.create(ledger4.getLedgerId(), 100));
+        assertEquals(ml2.getNumberOfEntries(range131), 131);
+        Range<Position> range132 = Range.openClosed(positions.get(0), 
PositionFactory.create(ledger4.getLedgerId(),
+            100));
+        assertEquals(ml2.getNumberOfEntries(range132), 130);
+        Range<Position> range133 = Range.closedOpen(positions.get(0), 
PositionFactory.create(ledger4.getLedgerId(),
+            100));
+        assertEquals(ml2.getNumberOfEntries(range133), 130);
+
+        // cleanup.
+        ml2.delete();
+    }
+
+    @Test
+    public void testComparePositions() throws Exception {
+        final String ledgerName = "ml_" + 
UUID.randomUUID().toString().replaceAll("-", "");
+        final String cursorName = "test-cursor";
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(ledgerName, 
config);
+        // Create a cursor to avoid entries being trimmed.
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ml.openCursor(cursorName);
+        int totalEntries = 30;
+        List<Position> positions = new ArrayList<>(totalEntries);
+        for (int i = 0; i < totalEntries; i++) {
+            Position pos = ml.addEntry(("entry-" + i).getBytes());
+            positions.add(pos);
+        }
+        Iterator<LedgerInfo> iterator = 
ml.getLedgersInfo().values().iterator();
+        LedgerInfo ledger1 = iterator.next();
+        LedgerInfo ledger2 = iterator.next();
+        LedgerInfo ledger3 = iterator.next();
+        assertEquals(ledger1.getEntries(), 10);
+        assertEquals(ledger2.getEntries(), 10);
+
+        // Normal case: pos1 == pos2.
+        assertEquals(ml.comparePositions(positions.get(0), positions.get(0)), 
0);
+        assertEquals(ml.comparePositions(positions.get(9), positions.get(9)), 
0);
+        assertEquals(ml.comparePositions(positions.get(29), 
positions.get(29)), 0);
+        
assertEquals(ml.comparePositions(PositionFactory.create(ledger2.getLedgerId(), 
-1),
+                PositionFactory.create(ledger2.getLedgerId(), -1)), 0);
+
+        // Normal case: pos1 < pos2.
+        assertEquals(ml.comparePositions(positions.get(0), positions.get(1)), 
-1);
+        assertEquals(ml.comparePositions(positions.get(0), positions.get(9)), 
-1);
+        assertEquals(ml.comparePositions(positions.get(0), positions.get(10)), 
-1);
+        assertEquals(ml.comparePositions(positions.get(0), positions.get(19)), 
-1);
+        assertEquals(ml.comparePositions(positions.get(0), positions.get(20)), 
-1);
+        assertEquals(ml.comparePositions(positions.get(0), positions.get(29)), 
-1);
+
+        // Normal case: pos1 > pos2.
+        assertEquals(ml.comparePositions(positions.get(1), positions.get(0)), 
1);
+        assertEquals(ml.comparePositions(positions.get(9), positions.get(0)), 
1);
+        assertEquals(ml.comparePositions(positions.get(10), positions.get(0)), 
1);
+        assertEquals(ml.comparePositions(positions.get(19), positions.get(0)), 
1);
+        assertEquals(ml.comparePositions(positions.get(20), positions.get(0)), 
1);
+        assertEquals(ml.comparePositions(positions.get(29), positions.get(0)), 
1);
+
+        // Pos1 has negative entry id & both positions in the same ledger.
+        
assertEquals(ml.comparePositions(PositionFactory.create(ledger1.getLedgerId(), 
-1),
+                positions.get(0)), -1);
+        
assertEquals(ml.comparePositions(PositionFactory.create(ledger2.getLedgerId(), 
-1),
+                positions.get(10)), -1);
+        
assertEquals(ml.comparePositions(PositionFactory.create(ledger3.getLedgerId(), 
-1),
+                positions.get(20)), -1);
+        
assertEquals(ml.comparePositions(PositionFactory.create(ledger1.getLedgerId(), 
-1),
+                positions.get(0)), -1);
+        // Pos1 has negative entry id & crosses ledgers.
+        
assertEquals(ml.comparePositions(PositionFactory.create(ledger2.getLedgerId(), 
-1),
+                positions.get(0)), 1);
+        
assertEquals(ml.comparePositions(PositionFactory.create(ledger3.getLedgerId(), 
-1),
+                positions.get(0)), 1);
+        // Pos1 has negative entry id & the same value.
+        
assertEquals(ml.comparePositions(PositionFactory.create(ledger2.getLedgerId(), 
-1), positions.get(9)),
+                0);
+        
assertEquals(ml.comparePositions(PositionFactory.create(ledger3.getLedgerId(), 
-1), positions.get(19)),
+                0);
+
+        // Pos2 has negative entry id & both positions in the same ledger.
+        assertEquals(ml.comparePositions(positions.get(0), 
PositionFactory.create(ledger1.getLedgerId(), -1)),
+                 1);
+        assertEquals(ml.comparePositions(positions.get(10), 
PositionFactory.create(ledger2.getLedgerId(), -1)),
+                 1);
+        assertEquals(ml.comparePositions(positions.get(20), 
PositionFactory.create(ledger3.getLedgerId(), -1)),
+                 1);
+        assertEquals(ml.comparePositions(positions.get(0), 
PositionFactory.create(ledger1.getLedgerId(), -1)),
+                 1);
+        // Pos2 has negative entry id & crosses ledgers.
+        assertEquals(ml.comparePositions(positions.get(0), 
PositionFactory.create(ledger2.getLedgerId(), -1)),
+                -1);
+        assertEquals(ml.comparePositions(positions.get(0), 
PositionFactory.create(ledger3.getLedgerId(), -1)),
+                -1);
+        // Pos2 has negative entry id & the same value.
+        assertEquals(ml.comparePositions(positions.get(9), 
PositionFactory.create(ledger2.getLedgerId(), -1)),
+                0);
+        assertEquals(ml.comparePositions(positions.get(19), 
PositionFactory.create(ledger3.getLedgerId(), -1)),
+                0);
+
+        // Pos1 does not exist in ledgers.
+        
assertEquals(ml.comparePositions(PositionFactory.create(ledger1.getLedgerId() - 
1, 100),
+                        positions.get(0)), -1);
+        
assertEquals(ml.comparePositions(PositionFactory.create(ledger3.getLedgerId() + 
1, 0),
+                        positions.get(29)), 1);
+
+        // Pos2 does not exist in ledgers.
+        assertEquals(ml.comparePositions(positions.get(0),
+                        PositionFactory.create(ledger1.getLedgerId() - 1, 
100)), 1);
+        assertEquals(ml.comparePositions(positions.get(29),
+                PositionFactory.create(ledger3.getLedgerId() + 1, 0)), -1);
+
+        // cleanup.
+        ml.delete();
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 39f5a62306e..b76328f252f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -81,6 +81,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import org.apache.avro.Schema.Parser;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
@@ -5386,4 +5387,50 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
 
         log.info("-- Exiting {} test --", methodName);
     }
+
+    @DataProvider
+    public Object[][] trimLedgerBeforeGetStats() {
+        return new Object[][] {
+                {true},
+                {false}
+        };
+    }
+
+    @Test(dataProvider = "trimLedgerBeforeGetStats")
+    public void testBacklogAfterCreatedSubscription(boolean 
trimLegderBeforeGetStats) throws Exception {
+        String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp");
+        String mlName = TopicName.get(topic).getPersistenceNamingEncoding();
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(2);
+        config.setMinimumRolloverTime(1, TimeUnit.SECONDS);
+        if (!trimLegderBeforeGetStats) {
+            config.setRetentionTime(3600, TimeUnit.SECONDS);
+        }
+        ManagedLedgerFactory factory = pulsar.getDefaultManagedLedgerFactory();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, 
config);
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+        for (int i = 0; i < 4; i++) {
+            producer.send("message-" + i);
+            Thread.sleep(1000);
+        }
+        producer.close();
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topic).get();
+        assertEquals(persistentTopic.getManagedLedger(), ml);
+
+        if (trimLegderBeforeGetStats) {
+            CompletableFuture<Void> trimLedgerFuture = new 
CompletableFuture<>();
+            ml.trimConsumedLedgersInBackground(trimLedgerFuture);
+            trimLedgerFuture.join();
+            assertEquals(ml.getLedgersInfo().size(), 1);
+            assertEquals(ml.getCurrentLedgerEntries(), 0);
+        }
+
+        admin.topics().createSubscription(topic, "sub1", MessageId.latest);
+        
assertEquals(admin.topics().getStats(topic).getSubscriptions().get("sub1").getMsgBacklog(),
 0);
+
+        // cleanup
+        admin.topics().delete(topic, false);
+    }
 }

Reply via email to