This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a7e1fcd0c50 [fix] [ml] Mark delete stuck due to switching cursor
ledger fails (#22662)
a7e1fcd0c50 is described below
commit a7e1fcd0c508d2a0ee1e6b0fbffa5ae397db5948
Author: fengyubiao <[email protected]>
AuthorDate: Tue May 7 14:54:31 2024 +0800
[fix] [ml] Mark delete stuck due to switching cursor ledger fails (#22662)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 19 +++++++---
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 42 ++++++++++++++++++++++
2 files changed, 56 insertions(+), 5 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index c2f33639c3d..3671385e60f 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2187,8 +2187,7 @@ public class ManagedCursorImpl implements ManagedCursor {
if (ledger.isNoMessagesAfterPos(mdEntry.newPosition)) {
persistPositionToMetaStore(mdEntry, cb);
} else {
- mdEntry.callback.markDeleteFailed(new
ManagedLedgerException("Create new cursor ledger failed"),
- mdEntry.ctx);
+ cb.operationFailed(new ManagedLedgerException("Switch new
cursor ledger failed"));
}
} else {
persistPositionToLedger(cursorLedger, mdEntry, cb);
@@ -2861,9 +2860,19 @@ public class ManagedCursorImpl implements ManagedCursor {
synchronized (pendingMarkDeleteOps) {
// At this point we don't have a ledger ready
STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger);
- // Note: if the stat is NoLedger, will persist the mark
deleted position to metadata store.
- // Before giving up, try to persist the position in the
metadata store.
- flushPendingMarkDeletes();
+ // There are two case may cause switch ledger fails.
+ // 1. No enough BKs; BKs are in read-only mode...
+ // 2. Write ZK fails.
+ // Regarding the case "No enough BKs", try to persist the
position in the metadata store before
+ // giving up.
+ if (!(exception instanceof MetaStoreException)) {
+ flushPendingMarkDeletes();
+ } else {
+ while (!pendingMarkDeleteOps.isEmpty()) {
+ MarkDeleteEntry entry =
pendingMarkDeleteOps.poll();
+ entry.callback.markDeleteFailed(exception,
entry.ctx);
+ }
+ }
}
}
});
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 4e3f8b79084..5c10533e247 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -270,6 +270,48 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
ml.delete();
}
+ @Test
+ void testSwitchLedgerFailed() throws Exception {
+ final String cursorName = "c1";
+ final String mlName = UUID.randomUUID().toString().replaceAll("-", "");
+ final ManagedLedgerConfig mlConfig = new ManagedLedgerConfig();
+ mlConfig.setMaxEntriesPerLedger(1);
+ mlConfig.setMetadataMaxEntriesPerLedger(1);
+ mlConfig.setThrottleMarkDelete(Double.MAX_VALUE);
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName,
mlConfig);
+ ManagedCursor cursor = ml.openCursor(cursorName);
+
+ List<Position> positionList = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ positionList.add(ml.addEntry(("entry-" + i).getBytes(Encoding)));
+ }
+
+ // Inject an error when persistent at the third time.
+ AtomicInteger persistentCounter = new AtomicInteger();
+ metadataStore.failConditional(new
MetadataStoreException.BadVersionException("mock error"), (op, path) -> {
+ if (path.equals(String.format("/managed-ledgers/%s/%s", mlName,
cursorName))
+ && persistentCounter.incrementAndGet() == 3) {
+ log.info("Trigger an error");
+ return true;
+ }
+ return false;
+ });
+
+ // Verify: the cursor can be recovered after it fails once.
+ int failedCount = 0;
+ for (Position position : positionList) {
+ try {
+ cursor.markDelete(position);
+ } catch (Exception ex) {
+ failedCount++;
+ }
+ }
+ assertEquals(failedCount, 1);
+
+ // cleanup.
+ ml.delete();
+ }
+
@Test
void testPersistentMarkDeleteIfSwitchCursorLedgerFailed() throws Exception
{
final int entryCount = 10;