This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 019c920d178 [fix] [ml] messagesConsumedCounter of NonDurableCursor was 
initialized incorrectly (#19355)
019c920d178 is described below

commit 019c920d178776046757846b3ea09b45accbf07b
Author: fengyubiao <[email protected]>
AuthorDate: Tue Jan 31 10:26:06 2023 +0800

    [fix] [ml] messagesConsumedCounter of NonDurableCursor was initialized 
incorrectly (#19355)
    
    (cherry picked from commit fc9e8bf310185de3685addd439edaee427f532b0)
---
 .../bookkeeper/mledger/impl/NonDurableCursorImpl.java     |  2 +-
 .../bookkeeper/mledger/impl/NonDurableCursorTest.java     | 15 +++++++++++++++
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 918cc22978b..c8908013347 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -75,7 +75,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
         // Initialize the counter such that the difference between the 
messages written on the ML and the
         // messagesConsumed is equal to the current backlog (negated).
         if (null != this.readPosition) {
-            long initialBacklog = 
readPosition.compareTo(lastEntryAndCounter.getLeft()) < 0
+            long initialBacklog = 
readPosition.compareTo(lastEntryAndCounter.getLeft()) <= 0
                 ? ledger.getNumberOfEntries(Range.closed(readPosition, 
lastEntryAndCounter.getLeft())) : 0;
             messagesConsumedCounter = lastEntryAndCounter.getRight() - 
initialBacklog;
         } else {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 437ca9937af..462a242888c 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -823,5 +823,20 @@ public class NonDurableCursorTest extends 
MockedBookKeeperTestCase {
         assertEquals(Iterables.size(ledger.getCursors()), 0);
     }
 
+    @Test
+    public void testMessagesConsumedCounterInitializedCorrect() throws 
Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("testMessagesConsumedCounterInitializedCorrect",
+                        new ManagedLedgerConfig().setRetentionTime(1, 
TimeUnit.HOURS).setRetentionSizeInMB(1));
+        Position position = ledger.addEntry("1".getBytes(Encoding));
+        NonDurableCursorImpl cursor = (NonDurableCursorImpl) 
ledger.newNonDurableCursor(PositionImpl.EARLIEST);
+        cursor.delete(position);
+        assertEquals(cursor.getMessagesConsumedCounter(), 1);
+        assertTrue(cursor.getMessagesConsumedCounter() <= 
ledger.getEntriesAddedCounter());
+        // cleanup.
+        cursor.close();
+        ledger.close();
+    }
+
+
     private static final Logger log = 
LoggerFactory.getLogger(NonDurableCursorTest.class);
 }

Reply via email to