This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2bdfe881f3d4156feca0098aa83ef7a5c914ed79 Author: Matteo Merli <[email protected]> AuthorDate: Sun Feb 13 03:05:52 2022 -0800 If mark-delete operation fails, mark the cursor as "dirty" (#14256) (cherry picked from commit 8928c3496a61c588b50461d6adaab089dd421619) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 1 + .../bookkeeper/mledger/impl/ManagedCursorTest.java | 52 ++++++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index f4c450883a6..cb4847ba679 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1750,6 +1750,7 @@ public class ManagedCursorImpl implements ManagedCursor { @Override public void operationFailed(ManagedLedgerException exception) { + isDirty = true; log.warn("[{}] Failed to mark delete position for cursor={} position={}", ledger.getName(), ManagedCursorImpl.this, mdEntry.newPosition); if (log.isDebugEnabled()) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index b6c40bcfc69..d9f629d5ac4 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -57,6 +57,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import lombok.Cleanup; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper.DigestType; @@ -3424,6 +3425,57 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { dirtyFactory.shutdown(); } + + + @Test + public void testFlushCursorAfterError() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setThrottleMarkDelete(1.0); + + ManagedLedgerFactoryConfig factoryConfig = new ManagedLedgerFactoryConfig(); + factoryConfig.setCursorPositionFlushSeconds(1); + + @Cleanup("shutdown") + ManagedLedgerFactory factory1 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle(), factoryConfig); + ManagedLedger ledger1 = factory1.open("testFlushCursorAfterInactivity", config); + ManagedCursor c1 = ledger1.openCursor("c"); + List<Position> positions = new ArrayList<>(); + + for (int i = 0; i < 20; i++) { + positions.add(ledger1.addEntry(new byte[1024])); + } + + // Simulate BK write error + bkc.failNow(BKException.Code.NotEnoughBookiesException); + zkc.setAlwaysFail(Code.BADVERSION); + + try { + c1.markDelete(positions.get(positions.size() - 1)); + fail("should have failed"); + } catch (ManagedLedgerException e) { + // Expected + } + + zkc.unsetAlwaysFail(); + + // In memory position is updated + assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() - 1)); + + Awaitility.await() + // Give chance to the flush to be automatically triggered. + // NOTE: this can't be set too low, or it causes issues with ZK thread pool rejecting + .pollDelay(Duration.ofMillis(2000)) + .untilAsserted(() -> { + // Abruptly re-open the managed ledger without graceful close + @Cleanup("shutdown") + ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); + ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config); + ManagedCursor c2 = ledger2.openCursor("c"); + + assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1)); + }); + } + @Test public void testConsistencyOfIndividualMessages() throws Exception { ManagedLedger ledger1 = factory.open("testConsistencyOfIndividualMessages");
