merlimat commented on a change in pull request #7236:
URL: https://github.com/apache/pulsar/pull/7236#discussion_r439014421
##########
File path:
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
##########
@@ -678,6 +681,48 @@ public void testGetSlowestConsumer() throws Exception {
ledger.close();
}
+ @Test
+ public void testBacklogStatsWhenDroppingData() throws Exception {
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("testBacklogStatsWhenDroppingData",
+ new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+ ManagedCursor c1 = ledger.openCursor("c1");
+ ManagedCursor nonDurableCursor =
ledger.newNonDurableCursor(PositionImpl.earliest);
+
+ assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
+ assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
+
+ List<Position> positions = Lists.newArrayList();
+ for (int i = 0; i < 10; i++) {
+ positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+ }
+
+ assertEquals(nonDurableCursor.getNumberOfEntries(), 10);
+ assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 10);
+
+ c1.markDelete(positions.get(4));
+ assertEquals(c1.getNumberOfEntries(), 5);
+ assertEquals(c1.getNumberOfEntriesInBacklog(true), 5);
+
+ // Since the durable cursor has moved, the data will be trimmed
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ ledger.internalTrimConsumedLedgers(promise);
+ promise.join();
+
+ assertEquals(nonDurableCursor.getNumberOfEntries(), 6);
+ assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 6);
Review comment:
That's because the other cursor is positioned at the end of the 5th
ledger, but not on the 6th. That means that only 4 ledgers are deleted. That
cursor would move forward on the next mark-delete.
When advancing the non-durable cursor, we advance to the first available
ledger and that might be before the durable cursor mark-delete position, but
that's ok.
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2125,6 +2128,37 @@ public void operationFailed(MetaStoreException e) {
}
}
+ /**
+ * Non-durable cursors have to be moved forward when data is trimmed since
they are not retain that data.
+ * This is to make sure that the `consumedEntries` counter is correctly
updated with the number of skipped
+ * entries and the stats are reported correctly.
+ */
+ private void advanceNonDurableCursors(List<LedgerInfo> ledgersToDelete) {
+ if (ledgersToDelete.isEmpty()) {
+ return;
+ }
+
+ long firstNonDeletedLedger = ledgers
+ .ceilingKey(ledgersToDelete.get(ledgersToDelete.size() -
1).getLedgerId() + 1);
+ PositionImpl highestPositionToDelete = new
PositionImpl(firstNonDeletedLedger, -1);
+
+ cursors.forEach(cursor -> {
+ if (highestPositionToDelete.compareTo((PositionImpl)
cursor.getMarkDeletedPosition()) > 0) {
Review comment:
No need for that, a durable cursor would have been already moved ahead,
otherwise we wouldn't be trimming that ledger.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]