codelipenghui commented on a change in pull request #7236:
URL: https://github.com/apache/pulsar/pull/7236#discussion_r438490315
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2125,6 +2128,37 @@ public void operationFailed(MetaStoreException e) {
}
}
+ /**
+ * Non-durable cursors have to be moved forward when data is trimmed since
they are not retain that data.
+ * This is to make sure that the `consumedEntries` counter is correctly
updated with the number of skipped
+ * entries and the stats are reported correctly.
+ */
+ private void advanceNonDurableCursors(List<LedgerInfo> ledgersToDelete) {
+ if (ledgersToDelete.isEmpty()) {
+ return;
+ }
+
+ long firstNonDeletedLedger = ledgers
+ .ceilingKey(ledgersToDelete.get(ledgersToDelete.size() -
1).getLedgerId() + 1);
Review comment:
```suggestion
long firstNonDeletedLedger = ledgers
.higherKey(ledgersToDelete.get(ledgersToDelete.size() -
1).getLedgerId());
```
##########
File path:
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
##########
@@ -678,6 +681,48 @@ public void testGetSlowestConsumer() throws Exception {
ledger.close();
}
+ @Test
+ public void testBacklogStatsWhenDroppingData() throws Exception {
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("testBacklogStatsWhenDroppingData",
+ new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+ ManagedCursor c1 = ledger.openCursor("c1");
+ ManagedCursor nonDurableCursor =
ledger.newNonDurableCursor(PositionImpl.earliest);
+
+ assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
+ assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
+
+ List<Position> positions = Lists.newArrayList();
+ for (int i = 0; i < 10; i++) {
+ positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+ }
+
+ assertEquals(nonDurableCursor.getNumberOfEntries(), 10);
+ assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 10);
+
+ c1.markDelete(positions.get(4));
+ assertEquals(c1.getNumberOfEntries(), 5);
+ assertEquals(c1.getNumberOfEntriesInBacklog(true), 5);
+
+ // Since the durable cursor has moved, the data will be trimmed
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ ledger.internalTrimConsumedLedgers(promise);
+ promise.join();
+
+ assertEquals(nonDurableCursor.getNumberOfEntries(), 6);
+ assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 6);
Review comment:
Why durable cursor has 5 backlogs, non-durable cursor has 6 backlogs?
Shouldn't they be the same?
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2125,6 +2128,37 @@ public void operationFailed(MetaStoreException e) {
}
}
+ /**
+ * Non-durable cursors have to be moved forward when data is trimmed since
they are not retain that data.
+ * This is to make sure that the `consumedEntries` counter is correctly
updated with the number of skipped
+ * entries and the stats are reported correctly.
+ */
+ private void advanceNonDurableCursors(List<LedgerInfo> ledgersToDelete) {
+ if (ledgersToDelete.isEmpty()) {
+ return;
+ }
+
+ long firstNonDeletedLedger = ledgers
+ .ceilingKey(ledgersToDelete.get(ledgersToDelete.size() -
1).getLedgerId() + 1);
+ PositionImpl highestPositionToDelete = new
PositionImpl(firstNonDeletedLedger, -1);
+
+ cursors.forEach(cursor -> {
+ if (highestPositionToDelete.compareTo((PositionImpl)
cursor.getMarkDeletedPosition()) > 0) {
Review comment:
Shall we need to add check for non-durable cursor?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]