This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 019c920d178 [fix] [ml] messagesConsumedCounter of NonDurableCursor was
initialized incorrectly (#19355)
019c920d178 is described below
commit 019c920d178776046757846b3ea09b45accbf07b
Author: fengyubiao <[email protected]>
AuthorDate: Tue Jan 31 10:26:06 2023 +0800
[fix] [ml] messagesConsumedCounter of NonDurableCursor was initialized
incorrectly (#19355)
(cherry picked from commit fc9e8bf310185de3685addd439edaee427f532b0)
---
.../bookkeeper/mledger/impl/NonDurableCursorImpl.java | 2 +-
.../bookkeeper/mledger/impl/NonDurableCursorTest.java | 15 +++++++++++++++
2 files changed, 16 insertions(+), 1 deletion(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 918cc22978b..c8908013347 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -75,7 +75,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
// Initialize the counter such that the difference between the
messages written on the ML and the
// messagesConsumed is equal to the current backlog (negated).
if (null != this.readPosition) {
- long initialBacklog =
readPosition.compareTo(lastEntryAndCounter.getLeft()) < 0
+ long initialBacklog =
readPosition.compareTo(lastEntryAndCounter.getLeft()) <= 0
? ledger.getNumberOfEntries(Range.closed(readPosition,
lastEntryAndCounter.getLeft())) : 0;
messagesConsumedCounter = lastEntryAndCounter.getRight() -
initialBacklog;
} else {
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 437ca9937af..462a242888c 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -823,5 +823,20 @@ public class NonDurableCursorTest extends
MockedBookKeeperTestCase {
assertEquals(Iterables.size(ledger.getCursors()), 0);
}
+ @Test
+ public void testMessagesConsumedCounterInitializedCorrect() throws
Exception {
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("testMessagesConsumedCounterInitializedCorrect",
+ new ManagedLedgerConfig().setRetentionTime(1,
TimeUnit.HOURS).setRetentionSizeInMB(1));
+ Position position = ledger.addEntry("1".getBytes(Encoding));
+ NonDurableCursorImpl cursor = (NonDurableCursorImpl)
ledger.newNonDurableCursor(PositionImpl.EARLIEST);
+ cursor.delete(position);
+ assertEquals(cursor.getMessagesConsumedCounter(), 1);
+ assertTrue(cursor.getMessagesConsumedCounter() <=
ledger.getEntriesAddedCounter());
+ // cleanup.
+ cursor.close();
+ ledger.close();
+ }
+
+
private static final Logger log =
LoggerFactory.getLogger(NonDurableCursorTest.class);
}