This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a7e1fcd0c50 [fix] [ml] Mark delete stuck due to switching cursor 
ledger fails (#22662)
a7e1fcd0c50 is described below

commit a7e1fcd0c508d2a0ee1e6b0fbffa5ae397db5948
Author: fengyubiao <[email protected]>
AuthorDate: Tue May 7 14:54:31 2024 +0800

    [fix] [ml] Mark delete stuck due to switching cursor ledger fails (#22662)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 19 +++++++---
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 42 ++++++++++++++++++++++
 2 files changed, 56 insertions(+), 5 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index c2f33639c3d..3671385e60f 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2187,8 +2187,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             if (ledger.isNoMessagesAfterPos(mdEntry.newPosition)) {
                 persistPositionToMetaStore(mdEntry, cb);
             } else {
-                mdEntry.callback.markDeleteFailed(new 
ManagedLedgerException("Create new cursor ledger failed"),
-                        mdEntry.ctx);
+                cb.operationFailed(new ManagedLedgerException("Switch new 
cursor ledger failed"));
             }
         } else {
             persistPositionToLedger(cursorLedger, mdEntry, cb);
@@ -2861,9 +2860,19 @@ public class ManagedCursorImpl implements ManagedCursor {
                 synchronized (pendingMarkDeleteOps) {
                     // At this point we don't have a ledger ready
                     STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger);
-                    // Note: if the stat is NoLedger, will persist the mark 
deleted position to metadata store.
-                    // Before giving up, try to persist the position in the 
metadata store.
-                    flushPendingMarkDeletes();
+                    // There are two case may cause switch ledger fails.
+                    // 1. No enough BKs; BKs are in read-only mode...
+                    // 2. Write ZK fails.
+                    // Regarding the case "No enough BKs", try to persist the 
position in the metadata store before
+                    // giving up.
+                    if (!(exception instanceof MetaStoreException)) {
+                        flushPendingMarkDeletes();
+                    } else {
+                        while (!pendingMarkDeleteOps.isEmpty()) {
+                            MarkDeleteEntry entry = 
pendingMarkDeleteOps.poll();
+                            entry.callback.markDeleteFailed(exception, 
entry.ctx);
+                        }
+                    }
                 }
             }
         });
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 4e3f8b79084..5c10533e247 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -270,6 +270,48 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         ml.delete();
     }
 
+    @Test
+    void testSwitchLedgerFailed() throws Exception {
+        final String cursorName = "c1";
+        final String mlName = UUID.randomUUID().toString().replaceAll("-", "");
+        final ManagedLedgerConfig mlConfig = new ManagedLedgerConfig();
+        mlConfig.setMaxEntriesPerLedger(1);
+        mlConfig.setMetadataMaxEntriesPerLedger(1);
+        mlConfig.setThrottleMarkDelete(Double.MAX_VALUE);
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, 
mlConfig);
+        ManagedCursor cursor = ml.openCursor(cursorName);
+
+        List<Position> positionList = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            positionList.add(ml.addEntry(("entry-" + i).getBytes(Encoding)));
+        }
+
+        // Inject an error when persistent at the third time.
+        AtomicInteger persistentCounter = new AtomicInteger();
+        metadataStore.failConditional(new 
MetadataStoreException.BadVersionException("mock error"), (op, path) -> {
+            if (path.equals(String.format("/managed-ledgers/%s/%s", mlName, 
cursorName))
+                    && persistentCounter.incrementAndGet() == 3) {
+                log.info("Trigger an error");
+                return true;
+            }
+            return false;
+        });
+
+        // Verify: the cursor can be recovered after it fails once.
+        int failedCount = 0;
+        for (Position position : positionList) {
+            try {
+                cursor.markDelete(position);
+            } catch (Exception ex) {
+                failedCount++;
+            }
+        }
+        assertEquals(failedCount, 1);
+
+        // cleanup.
+        ml.delete();
+    }
+
     @Test
     void testPersistentMarkDeleteIfSwitchCursorLedgerFailed() throws Exception 
{
         final int entryCount = 10;

Reply via email to