This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new d56758fc584 [fix][ml] Fix ledger trimming race causing cursor to point
to deleted ledgers (#24855)
d56758fc584 is described below
commit d56758fc584038e692e2d5dc2e61b56dd78baaeb
Author: Penghui Li <[email protected]>
AuthorDate: Wed Oct 15 01:44:42 2025 -0700
[fix][ml] Fix ledger trimming race causing cursor to point to deleted
ledgers (#24855)
Co-authored-by: Claude <[email protected]>
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 +-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 149 +++++++++++++++++++++
2 files changed, 151 insertions(+), 1 deletion(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index acd4a9860d2..f4c4ffbd077 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2593,7 +2593,8 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
public void maybeUpdateCursorBeforeTrimmingConsumedLedger() {
for (ManagedCursor cursor : cursors) {
- Position lastAckedPosition = cursor.getMarkDeletedPosition();
+ Position lastAckedPosition =
cursor.getPersistentMarkDeletedPosition() != null
+ ? cursor.getPersistentMarkDeletedPosition() :
cursor.getMarkDeletedPosition();
LedgerInfo currPointedLedger =
ledgers.get(lastAckedPosition.getLedgerId());
LedgerInfo nextPointedLedger =
Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId()))
.map(Map.Entry::getValue).orElse(null);
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 4bf8a8f7dcc..a78821b646a 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -4484,4 +4484,153 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
assertEquals(ml.currentLedgerEntries, 0);
});
}
+
+ /**
+ * Verifies that ledger trimming respects the persistent cursor position,
not just the in-memory position.
+ *
+ * <p><b>Test Flow:</b>
+ * <ol>
+ * <li><b>Setup:</b> Create 60 entries across multiple ledgers (10
entries per ledger)
+ * <li><b>Initial Acks:</b> Delete entries 0, 5-9 and wait for
persistence
+ * <ul><li>Persistent position: entry 0</li><li>In-memory position:
entry 0</li></ul>
+ * <li><b>Inject Delay:</b> Add 30-second delay to BookKeeper writes
(simulates slow ZK/BK)
+ * <li><b>Delayed Acks:</b> Asynchronously delete entries 1-4
+ * <ul><li>Persistent position: entry 0 (delayed)</li><li>In-memory
position: entry 9</li></ul>
+ * <li><b>Pre-Trim Sync:</b> Call {@code
maybeUpdateCursorBeforeTrimmingConsumedLedger()}
+ * <li><b>Trigger Trim:</b> Start ledger trimming process
+ * <li><b>Verify:</b> First ledger is preserved because persistent
position (entry 0) still points to it
+ * </ol>
+ *
+ * <p><b>Success Criteria:</b>
+ * The first ledger must NOT be deleted, preventing the cursor from
pointing to a non-existent
+ * ledger after topic reload. This avoids negative backlog calculations.
+ *
+ * <p><b>What This Tests:</b>
+ * Ensures that {@code maybeUpdateCursorBeforeTrimmingConsumedLedger()}
correctly uses the
+ * persistent cursor position (not in-memory) when determining which
ledgers are safe to trim.
+ */
+ @Test
+ public void testCursorPointsToDeletedLedgerAfterTrim() throws Exception {
+ final String ledgerName =
"testCursorPointsToDeletedLedgerAfterTrimAndReload";
+ final String cursorName = "test-cursor";
+
+ // ===== SETUP: Create managed ledger with small ledgers =====
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open(ledgerName, config);
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
ledger.openCursor(cursorName);
+
+ // ===== PHASE 1: Write entries to create multiple ledgers =====
+ int totalEntries = 60;
+ log.info("=== PHASE 1: Writing {} entries to create multiple ledgers
===", totalEntries);
+ for (int i = 0; i < totalEntries; i++) {
+ Position pos = ledger.addEntry(("message-" + i).getBytes());
+ log.info("Added entry: {}", pos);
+ }
+
+ List<LedgerInfo> ledgersAfterWrite = ledger.getLedgersInfoAsList();
+ log.info("Created {} ledgers: {}", ledgersAfterWrite.size(),
+ ledgersAfterWrite.stream()
+ .map(l -> String.format("L%d(%d entries)",
l.getLedgerId(), l.getEntries()))
+ .toArray());
+
+ assertTrue(ledgersAfterWrite.size() >= 5, "Should have at least 5
ledgers");
+ long firstLedgerId = ledgersAfterWrite.get(0).getLedgerId();
+
+ // ===== PHASE 2: Initial acknowledgments (entries 0, 5-9) and wait
for persistence =====
+ log.info("=== PHASE 2: Acknowledging initial entries in first ledger
{} ===", firstLedgerId);
+ List<Entry> entries = cursor.readEntries(10);
+
+ // Delete entries 5-9 first (out of order)
+ log.info("Deleting entries 5-9");
+ for (int i = 5; i < 10; i++) {
+ cursor.delete(entries.get(i).getPosition());
+ }
+
+ // Delete entry 0, which advances mark-delete position
+ log.info("Deleting entry 0 - this advances mark-delete position");
+ cursor.delete(entries.get(0).getPosition());
+
+ // Verify in-memory cursor position
+ Position initialMarkDelete = cursor.getMarkDeletedPosition();
+ assertEquals(initialMarkDelete.getLedgerId(), firstLedgerId,
+ "Mark-delete should be in first ledger");
+ assertEquals(initialMarkDelete.getEntryId(),
entries.get(0).getEntryId(),
+ "Mark-delete should be at entry 0");
+
+ // Wait for this position to be persisted
+ log.info("Waiting for initial mark-delete position to persist: {}",
initialMarkDelete);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(cursor.getPersistentMarkDeletedPosition(),
initialMarkDelete,
+ "Persistent position should catch up to in-memory
position");
+ });
+ log.info("Initial position persisted successfully");
+
+ // ===== PHASE 3: Inject delay to simulate slow persistence =====
+ long delay = 30;
+ log.info("=== PHASE 3: Injecting {}s delay for cursor persistence ===",
+ delay);
+ bkc.addEntryResponseDelay(delay, TimeUnit.SECONDS);
+
+ // ===== PHASE 4: Asynchronously acknowledge entries 1-4 (persistence
will be delayed) =====
+ log.info("=== PHASE 4: Asynchronously acknowledging entries 1-4 (will
be delayed) ===");
+ for (int i = 1; i < 5; i++) {
+ final int index = i;
+ cursor.asyncDelete(entries.get(i).getPosition(), new
AsyncCallbacks.DeleteCallback() {
+ @Override
+ public void deleteComplete(Object ctx) {
+ log.info("Entry {} deletion completed", index);
+ }
+
+ @Override
+ public void deleteFailed(ManagedLedgerException exception,
Object ctx) {
+ log.error("Entry {} deletion failed", index, exception);
+ }
+ }, null);
+ }
+
+ // Verify in-memory position has advanced to entry 9
+ Position newMarkDelete = cursor.getMarkDeletedPosition();
+ assertEquals(newMarkDelete.getLedgerId(), firstLedgerId,
+ "Mark-delete should still be in first ledger");
+ assertEquals(newMarkDelete.getEntryId(), entries.get(9).getEntryId(),
+ "Mark-delete should have advanced to entry 9 (in-memory)");
+ log.info("In-memory mark-delete position: {}", newMarkDelete);
+
+ // ===== PHASE 5: Update cursor before trimming (important
synchronization point) =====
+ log.info("=== PHASE 5: Calling
maybeUpdateCursorBeforeTrimmingConsumedLedger ===");
+ ledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
+
+ // ===== PHASE 6: Trigger ledger trimming =====
+ log.info("=== PHASE 6: Triggering ledger trimming ===");
+ CompletableFuture<Void> trimFuture = new CompletableFuture<>();
+ ledger.trimConsumedLedgersInBackground(trimFuture);
+ trimFuture.get();
+ log.info("Trimming completed");
+
+ // ===== VERIFICATION: Ledgers should NOT be trimmed =====
+ log.info("=== VERIFICATION ===");
+
+ // Persistent position should still be at old position (entry 0)
+ Position persistentPosition =
cursor.getPersistentMarkDeletedPosition();
+ assertEquals(persistentPosition, initialMarkDelete,
+ "Persistent position should not have advanced (delayed)");
+ log.info("Persistent mark-delete position (as expected): {}",
persistentPosition);
+ log.info("In-memory mark-delete position: {}", newMarkDelete);
+
+ // First ledger should still exist (not trimmed)
+ Awaitility.await().untilAsserted(() -> {
+ long firstRemainingLedger =
ledger.getFirstPosition().getLedgerId();
+ assertEquals(firstRemainingLedger,
ledgersAfterWrite.get(0).getLedgerId(),
+ "First ledger should NOT be trimmed because persistent
cursor position "
+ + "is still pointing to it (entry 0)");
+ });
+ log.info("SUCCESS: First ledger {} was correctly preserved",
firstLedgerId);
+
+ // ===== CLEANUP =====
+ entries.forEach(Entry::release);
+ cursor.close();
+ ledger.close();
+ }
}