This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0f17a2483a30864b806e53253da9938e612249cd
Author: Penghui Li <[email protected]>
AuthorDate: Fri Jan 2 05:28:22 2026 -0800

    [fix][ml] Fix cursor backlog size to account for individual acks (#25089)
    
    (cherry picked from commit 793e9472eebcbfc142c3f0e2df2b7be2de21bdca)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 64 +++++++++++++++++-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 75 ++++++++++++++++++++++
 2 files changed, 138 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8b2c525af0c..397aee49efc 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1281,7 +1281,69 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     @Override
     public long getEstimatedSizeSinceMarkDeletePosition() {
-        return ledger.estimateBacklogFromPosition(markDeletePosition);
+        long totalSize = 
ledger.estimateBacklogFromPosition(markDeletePosition);
+
+        // Need to subtract size of individual deleted messages
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Calculating backlog size for cursor {} from 
position {}, totalSize: {}",
+                    ledger.getName(), name, markDeletePosition, totalSize);
+        }
+
+        // Get count of individually deleted entries in the backlog range
+        long deletedCount = 0;
+        lock.readLock().lock();
+        try {
+            Range<Position> backlogRange = 
Range.openClosed(markDeletePosition, ledger.getLastPosition());
+
+            if (getConfig().isUnackedRangesOpenCacheSetEnabled()) {
+                deletedCount = individualDeletedMessages.cardinality(
+                        backlogRange.lowerEndpoint().getLedgerId(), 
backlogRange.lowerEndpoint().getEntryId(),
+                        backlogRange.upperEndpoint().getLedgerId(), 
backlogRange.upperEndpoint().getEntryId());
+            } else {
+                AtomicLong deletedCounter = new AtomicLong(0);
+                individualDeletedMessages.forEach((r) -> {
+                    if (r.isConnected(backlogRange)) {
+                        Range<Position> intersection = 
r.intersection(backlogRange);
+                        long countInRange = 
ledger.getNumberOfEntries(intersection);
+                        deletedCounter.addAndGet(countInRange);
+                    }
+                    return true;
+                }, recyclePositionRangeConverter);
+                deletedCount = deletedCounter.get();
+            }
+        } finally {
+            lock.readLock().unlock();
+        }
+
+        if (deletedCount == 0) {
+            return totalSize;
+        }
+
+        // Estimate size by using average entry size from the backlog range
+        Range<Position> backlogRange = Range.openClosed(markDeletePosition, 
ledger.getLastPosition());
+        long totalEntriesInBacklog = ledger.getNumberOfEntries(backlogRange);
+
+        if (totalEntriesInBacklog <= deletedCount || totalEntriesInBacklog == 
0) {
+            // Should not happen, but avoid division by zero
+            log.warn("[{}] [{}] Inconsistent state: totalEntriesInBacklog={}, 
deletedCount={}",
+                    ledger.getName(), name, totalEntriesInBacklog, 
deletedCount);
+            return Math.max(0, totalSize);  // Return the total size and log 
the issue
+        }
+
+        // Calculate average size in the backlog range
+        long averageSize = totalSize / totalEntriesInBacklog;
+
+        // Subtract size of deleted entries
+        long deletedSize = deletedCount * averageSize;
+        long adjustedSize = totalSize - deletedSize;
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] [{}] Adjusted backlog size: totalSize={}, 
deletedCount={}, averageSize={}, "
+                            + "deletedSize={}, adjustedSize={}",
+                    ledger.getName(), name, totalSize, deletedCount, 
averageSize, deletedSize, adjustedSize);
+        }
+
+        return adjustedSize;
     }
 
     private long getNumberOfEntriesInBacklog() {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 72b5aeccd87..7523934ec10 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -3675,6 +3675,81 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         assertEquals(cursor.getEstimatedSizeSinceMarkDeletePosition(), 10 * 
entryData.length);
     }
 
+    /**
+     * Test that cursor.getEstimatedSizeSinceMarkDeletePosition() correctly 
accounts for individual
+     * message deletions (asyncDelete/individual ack).
+     *
+     * This verifies the fix: when messages are acknowledged out of order 
using asyncDelete,
+     * the backlog size is now correctly adjusted to reflect the individually 
deleted messages.
+     */
+    @Test(timeOut = 20000)
+    public void testEstimatedSizeWithIndividualAcks() throws Exception {
+        ManagedLedger ledger = 
factory.open("test_cursor_backlog_size_individual_acks");
+        ManagedCursor cursor = ledger.openCursor("c1");
+
+        // Each entry is 100 bytes
+        byte[] entryData = new byte[100];
+
+        // Add 5 entries: positions should be 0:0, 0:1, 0:2, 0:3, 0:4
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            positions.add(ledger.addEntry(entryData));
+        }
+
+        // Initial state: 5 entries * 100 bytes = 500 bytes
+        assertEquals(cursor.getEstimatedSizeSinceMarkDeletePosition(), 500);
+
+        // Read all entries so they can be acknowledged
+        List<Entry> entries = cursor.readEntries(5);
+        assertEquals(entries.size(), 5);
+        entries.forEach(Entry::release);
+
+        // Individual acknowledge positions 1, 3, 4 (leaving 0:0 and 0:2 
unacknowledged)
+        AtomicInteger callbackCount = new AtomicInteger(0);
+        CountDownLatch latch = new CountDownLatch(3);
+
+        DeleteCallback callback = new DeleteCallback() {
+            @Override
+            public void deleteComplete(Object ctx) {
+                callbackCount.incrementAndGet();
+                latch.countDown();
+            }
+
+            @Override
+            public void deleteFailed(ManagedLedgerException exception, Object 
ctx) {
+                latch.countDown();
+            }
+        };
+
+        cursor.asyncDelete(positions.get(1), callback, null);
+        cursor.asyncDelete(positions.get(3), callback, null);
+        cursor.asyncDelete(positions.get(4), callback, null);
+
+        // Wait for async operations to complete
+        assertTrue(latch.await(5, TimeUnit.SECONDS), "Deletes should 
complete");
+        assertEquals(callbackCount.get(), 3, "All 3 deletes should succeed");
+
+        // Get current state
+        // After fix: should now account for individual deleted messages
+        long expectedBacklogSize = 200;  // 2 remaining entries (0:0, 0:2) * 
100 bytes
+        long actualBacklogSize = 
cursor.getEstimatedSizeSinceMarkDeletePosition();
+        Position markDeletePos = cursor.getMarkDeletedPosition();
+
+        log.info("Backlog size after individual acks:");
+        log.info("  Expected: {}. Actual: {}", expectedBacklogSize, 
actualBacklogSize);
+        log.info("  Mark delete position: {}", markDeletePos);
+        log.info("  Individual deleted: {}", ((ManagedCursorImpl) 
cursor).getIndividuallyDeletedMessagesSet());
+
+        // After fix: backlog size should now correctly account for individual 
deletions
+        assertEquals(actualBacklogSize, expectedBacklogSize,
+                "Backlog size should account for individual deletions");
+
+        // Verify both count and size are correct
+        assertEquals(cursor.getNumberOfEntriesInBacklog(true), 2, "Backlog 
count should be 2");
+
+        ledger.close();
+    }
+
     @Test(timeOut = 20000)
     public void testRecoverCursorAheadOfLastPosition() throws Exception {
         final String mlName = "my_test_ledger";

Reply via email to