This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0ddaa2f Fix process of calculating msgBacklog included in stats
(#3092)
0ddaa2f is described below
commit 0ddaa2f92c72cc3c1a19fcb90a6412eb0a3799de
Author: massakam <[email protected]>
AuthorDate: Fri Nov 30 02:35:20 2018 +0900
Fix process of calculating msgBacklog included in stats (#3092)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 31 ++++++++++++++++++++++
2 files changed, 32 insertions(+), 1 deletion(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 4085f80..35cd1b7 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -694,7 +694,7 @@ public class ManagedCursorImpl implements ManagedCursor {
long backlog =
ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger) -
messagesConsumedCounter;
if (backlog < 0) {
// In some case the counters get incorrect values, fall back to
the precise backlog count
- backlog = getNumberOfEntries(Range.closed(markDeletePosition,
ledger.getLastPosition()));
+ backlog = getNumberOfEntries(Range.closed(markDeletePosition,
ledger.getLastPosition())) - 1;
}
return backlog;
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index d8c48de..6d90016 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -28,6 +28,7 @@ import static org.testng.Assert.*;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
@@ -277,6 +278,36 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
}
@Test(timeOut = 20000)
+ void testNumberOfEntriesInBacklogWithFallback() throws Exception {
+ ManagedLedger ledger = factory.open("my_test_ledger", new
ManagedLedgerConfig().setMaxEntriesPerLedger(2));
+
+ ManagedCursor c1 = ledger.openCursor("c1");
+ ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+ ManagedCursor c2 = ledger.openCursor("c2");
+ ledger.addEntry("dummy-entry-2".getBytes(Encoding));
+ ManagedCursor c3 = ledger.openCursor("c3");
+ ledger.addEntry("dummy-entry-3".getBytes(Encoding));
+ ManagedCursor c4 = ledger.openCursor("c4");
+ ledger.addEntry("dummy-entry-4".getBytes(Encoding));
+ ManagedCursor c5 = ledger.openCursor("c5");
+
+ Field field =
ManagedCursorImpl.class.getDeclaredField("messagesConsumedCounter");
+ field.setAccessible(true);
+ long counter = ((ManagedLedgerImpl) ledger).getEntriesAddedCounter() +
1;
+ field.setLong(c1, counter);
+ field.setLong(c2, counter);
+ field.setLong(c3, counter);
+ field.setLong(c4, counter);
+ field.setLong(c5, counter);
+
+ assertEquals(c1.getNumberOfEntriesInBacklog(), 4);
+ assertEquals(c2.getNumberOfEntriesInBacklog(), 3);
+ assertEquals(c3.getNumberOfEntriesInBacklog(), 2);
+ assertEquals(c4.getNumberOfEntriesInBacklog(), 1);
+ assertEquals(c5.getNumberOfEntriesInBacklog(), 0);
+ }
+
+ @Test(timeOut = 20000)
void testNumberOfEntriesWithReopen() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new
ManagedLedgerConfig().setMaxEntriesPerLedger(1));