This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new d56758fc584 [fix][ml] Fix ledger trimming race causing cursor to point 
to deleted ledgers (#24855)
d56758fc584 is described below

commit d56758fc584038e692e2d5dc2e61b56dd78baaeb
Author: Penghui Li <[email protected]>
AuthorDate: Wed Oct 15 01:44:42 2025 -0700

    [fix][ml] Fix ledger trimming race causing cursor to point to deleted 
ledgers (#24855)
    
    Co-authored-by: Claude <[email protected]>
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   3 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 149 +++++++++++++++++++++
 2 files changed, 151 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index acd4a9860d2..f4c4ffbd077 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2593,7 +2593,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
     public void maybeUpdateCursorBeforeTrimmingConsumedLedger() {
         for (ManagedCursor cursor : cursors) {
-            Position lastAckedPosition = cursor.getMarkDeletedPosition();
+            Position lastAckedPosition = 
cursor.getPersistentMarkDeletedPosition() != null
+                    ? cursor.getPersistentMarkDeletedPosition() : 
cursor.getMarkDeletedPosition();
             LedgerInfo currPointedLedger = 
ledgers.get(lastAckedPosition.getLedgerId());
             LedgerInfo nextPointedLedger = 
Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId()))
                     .map(Map.Entry::getValue).orElse(null);
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 4bf8a8f7dcc..a78821b646a 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -4484,4 +4484,153 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
             assertEquals(ml.currentLedgerEntries, 0);
         });
     }
+
+    /**
+     * Verifies that ledger trimming respects the persistent cursor position, 
not just the in-memory position.
+     *
+     * <p><b>Test Flow:</b>
+     * <ol>
+     *   <li><b>Setup:</b> Create 60 entries across multiple ledgers (10 
entries per ledger)
+     *   <li><b>Initial Acks:</b> Delete entries 0, 5-9 and wait for 
persistence
+     *       <ul><li>Persistent position: entry 0</li><li>In-memory position: 
entry 0</li></ul>
+     *   <li><b>Inject Delay:</b> Add 30-second delay to BookKeeper writes 
(simulates slow ZK/BK)
+     *   <li><b>Delayed Acks:</b> Asynchronously delete entries 1-4
+     *       <ul><li>Persistent position: entry 0 (delayed)</li><li>In-memory 
position: entry 9</li></ul>
+     *   <li><b>Pre-Trim Sync:</b> Call {@code 
maybeUpdateCursorBeforeTrimmingConsumedLedger()}
+     *   <li><b>Trigger Trim:</b> Start ledger trimming process
+     *   <li><b>Verify:</b> First ledger is preserved because persistent 
position (entry 0) still points to it
+     * </ol>
+     *
+     * <p><b>Success Criteria:</b>
+     * The first ledger must NOT be deleted, preventing the cursor from 
pointing to a non-existent
+     * ledger after topic reload. This avoids negative backlog calculations.
+     *
+     * <p><b>What This Tests:</b>
+     * Ensures that {@code maybeUpdateCursorBeforeTrimmingConsumedLedger()} 
correctly uses the
+     * persistent cursor position (not in-memory) when determining which 
ledgers are safe to trim.
+     */
+    @Test
+    public void testCursorPointsToDeletedLedgerAfterTrim() throws Exception {
+        final String ledgerName = 
"testCursorPointsToDeletedLedgerAfterTrimAndReload";
+        final String cursorName = "test-cursor";
+
+        // ===== SETUP: Create managed ledger with small ledgers =====
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open(ledgerName, config);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ledger.openCursor(cursorName);
+
+        // ===== PHASE 1: Write entries to create multiple ledgers =====
+        int totalEntries = 60;
+        log.info("=== PHASE 1: Writing {} entries to create multiple ledgers 
===", totalEntries);
+        for (int i = 0; i < totalEntries; i++) {
+            Position pos = ledger.addEntry(("message-" + i).getBytes());
+            log.info("Added entry: {}", pos);
+        }
+
+        List<LedgerInfo> ledgersAfterWrite = ledger.getLedgersInfoAsList();
+        log.info("Created {} ledgers: {}", ledgersAfterWrite.size(),
+                ledgersAfterWrite.stream()
+                        .map(l -> String.format("L%d(%d entries)", 
l.getLedgerId(), l.getEntries()))
+                        .toArray());
+
+        assertTrue(ledgersAfterWrite.size() >= 5, "Should have at least 5 
ledgers");
+        long firstLedgerId = ledgersAfterWrite.get(0).getLedgerId();
+
+        // ===== PHASE 2: Initial acknowledgments (entries 0, 5-9) and wait 
for persistence =====
+        log.info("=== PHASE 2: Acknowledging initial entries in first ledger 
{} ===", firstLedgerId);
+        List<Entry> entries = cursor.readEntries(10);
+
+        // Delete entries 5-9 first (out of order)
+        log.info("Deleting entries 5-9");
+        for (int i = 5; i < 10; i++) {
+            cursor.delete(entries.get(i).getPosition());
+        }
+
+        // Delete entry 0, which advances mark-delete position
+        log.info("Deleting entry 0 - this advances mark-delete position");
+        cursor.delete(entries.get(0).getPosition());
+
+        // Verify in-memory cursor position
+        Position initialMarkDelete = cursor.getMarkDeletedPosition();
+        assertEquals(initialMarkDelete.getLedgerId(), firstLedgerId,
+                "Mark-delete should be in first ledger");
+        assertEquals(initialMarkDelete.getEntryId(), 
entries.get(0).getEntryId(),
+                "Mark-delete should be at entry 0");
+
+        // Wait for this position to be persisted
+        log.info("Waiting for initial mark-delete position to persist: {}", 
initialMarkDelete);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(cursor.getPersistentMarkDeletedPosition(), 
initialMarkDelete,
+                    "Persistent position should catch up to in-memory 
position");
+        });
+        log.info("Initial position persisted successfully");
+
+        // ===== PHASE 3: Inject delay to simulate slow persistence =====
+        long delay = 30;
+        log.info("=== PHASE 3: Injecting {}s delay for cursor persistence ===",
+                delay);
+        bkc.addEntryResponseDelay(delay, TimeUnit.SECONDS);
+
+        // ===== PHASE 4: Asynchronously acknowledge entries 1-4 (persistence 
will be delayed) =====
+        log.info("=== PHASE 4: Asynchronously acknowledging entries 1-4 (will 
be delayed) ===");
+        for (int i = 1; i < 5; i++) {
+            final int index = i;
+            cursor.asyncDelete(entries.get(i).getPosition(), new 
AsyncCallbacks.DeleteCallback() {
+                @Override
+                public void deleteComplete(Object ctx) {
+                    log.info("Entry {} deletion completed", index);
+                }
+
+                @Override
+                public void deleteFailed(ManagedLedgerException exception, 
Object ctx) {
+                    log.error("Entry {} deletion failed", index, exception);
+                }
+            }, null);
+        }
+
+        // Verify in-memory position has advanced to entry 9
+        Position newMarkDelete = cursor.getMarkDeletedPosition();
+        assertEquals(newMarkDelete.getLedgerId(), firstLedgerId,
+                "Mark-delete should still be in first ledger");
+        assertEquals(newMarkDelete.getEntryId(), entries.get(9).getEntryId(),
+                "Mark-delete should have advanced to entry 9 (in-memory)");
+        log.info("In-memory mark-delete position: {}", newMarkDelete);
+
+        // ===== PHASE 5: Update cursor before trimming (important 
synchronization point) =====
+        log.info("=== PHASE 5: Calling 
maybeUpdateCursorBeforeTrimmingConsumedLedger ===");
+        ledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
+
+        // ===== PHASE 6: Trigger ledger trimming =====
+        log.info("=== PHASE 6: Triggering ledger trimming ===");
+        CompletableFuture<Void> trimFuture = new CompletableFuture<>();
+        ledger.trimConsumedLedgersInBackground(trimFuture);
+        trimFuture.get();
+        log.info("Trimming completed");
+
+        // ===== VERIFICATION: Ledgers should NOT be trimmed =====
+        log.info("=== VERIFICATION ===");
+
+        // Persistent position should still be at old position (entry 0)
+        Position persistentPosition = 
cursor.getPersistentMarkDeletedPosition();
+        assertEquals(persistentPosition, initialMarkDelete,
+                "Persistent position should not have advanced (delayed)");
+        log.info("Persistent mark-delete position (as expected): {}", 
persistentPosition);
+        log.info("In-memory mark-delete position: {}", newMarkDelete);
+
+        // First ledger should still exist (not trimmed)
+        Awaitility.await().untilAsserted(() -> {
+            long firstRemainingLedger = 
ledger.getFirstPosition().getLedgerId();
+            assertEquals(firstRemainingLedger, 
ledgersAfterWrite.get(0).getLedgerId(),
+                    "First ledger should NOT be trimmed because persistent 
cursor position "
+                            + "is still pointing to it (entry 0)");
+        });
+        log.info("SUCCESS: First ledger {} was correctly preserved", 
firstLedgerId);
+
+        // ===== CLEANUP =====
+        entries.forEach(Entry::release);
+        cursor.close();
+        ledger.close();
+    }
 }

Reply via email to