merlimat commented on a change in pull request #7236:
URL: https://github.com/apache/pulsar/pull/7236#discussion_r439014421



##########
File path: 
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
##########
@@ -678,6 +681,48 @@ public void testGetSlowestConsumer() throws Exception {
         ledger.close();
     }
 
+    @Test
+    public void testBacklogStatsWhenDroppingData() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("testBacklogStatsWhenDroppingData",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+        ManagedCursor c1 = ledger.openCursor("c1");
+        ManagedCursor nonDurableCursor = 
ledger.newNonDurableCursor(PositionImpl.earliest);
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
+
+        List<Position> positions = Lists.newArrayList();
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+        }
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 10);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 10);
+
+        c1.markDelete(positions.get(4));
+        assertEquals(c1.getNumberOfEntries(), 5);
+        assertEquals(c1.getNumberOfEntriesInBacklog(true), 5);
+
+        // Since the durable cursor has moved, the data will be trimmed
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 6);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 6);

Review comment:
       That's because the other cursor is positioned at the end of the 5th 
ledger, but not on the 6th. That means that only 4 ledgers are deleted. That 
cursor would move forward on the next mark-delete.
   
   When advancing the non-durable cursor, we advance to the first available 
ledger and that might be before the durable cursor mark-delete position, but 
that's ok.
   

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2125,6 +2128,37 @@ public void operationFailed(MetaStoreException e) {
         }
     }
 
+    /**
+     * Non-durable cursors have to be moved forward when data is trimmed since 
they are not retain that data.
+     * This is to make sure that the `consumedEntries` counter is correctly 
updated with the number of skipped
+     * entries and the stats are reported correctly.
+     */
+    private void advanceNonDurableCursors(List<LedgerInfo> ledgersToDelete) {
+        if (ledgersToDelete.isEmpty()) {
+            return;
+        }
+
+        long firstNonDeletedLedger = ledgers
+                .ceilingKey(ledgersToDelete.get(ledgersToDelete.size() - 
1).getLedgerId() + 1);
+        PositionImpl highestPositionToDelete = new 
PositionImpl(firstNonDeletedLedger, -1);
+
+        cursors.forEach(cursor -> {
+            if (highestPositionToDelete.compareTo((PositionImpl) 
cursor.getMarkDeletedPosition()) > 0) {

Review comment:
       No need for that, a durable cursor would have been already moved ahead, 
otherwise we wouldn't be trimming that ledger. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to