This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ee1e89ba2d9efde46e24ed32d2c70c84f87c8f0b
Author: Matteo Merli <[email protected]>
AuthorDate: Sun Feb 13 03:05:52 2022 -0800

    If mark-delete operation fails, mark the cursor as "dirty" (#14256)
    
    (cherry picked from commit 8928c3496a61c588b50461d6adaab089dd421619)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  1 +
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 51 ++++++++++++++++++++++
 2 files changed, 52 insertions(+)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index b775978..f9da5c9 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1834,6 +1834,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
             @Override
             public void operationFailed(ManagedLedgerException exception) {
+                isDirty = true;
                 log.warn("[{}] Failed to mark delete position for cursor={} 
position={}", ledger.getName(),
                         ManagedCursorImpl.this, mdEntry.newPosition);
                 if (log.isDebugEnabled()) {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 84a5d66..41537d3 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -3537,6 +3537,57 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         assertEquals(c1.getMarkDeletedPosition(), 
positions.get(positions.size() -1));
     }
 
+
+
+    @Test
+    public void testFlushCursorAfterError() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setThrottleMarkDelete(1.0);
+
+        ManagedLedgerFactoryConfig factoryConfig = new 
ManagedLedgerFactoryConfig();
+        factoryConfig.setCursorPositionFlushSeconds(1);
+
+        @Cleanup("shutdown")
+        ManagedLedgerFactory factory1 = new 
ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConfig);
+        ManagedLedger ledger1 = 
factory1.open("testFlushCursorAfterInactivity", config);
+        ManagedCursor c1 = ledger1.openCursor("c");
+        List<Position> positions = new ArrayList<>();
+
+        for (int i = 0; i < 20; i++) {
+            positions.add(ledger1.addEntry(new byte[1024]));
+        }
+
+        // Simulate BK write error
+        bkc.failNow(BKException.Code.NotEnoughBookiesException);
+        metadataStore.setAlwaysFail(new 
MetadataStoreException.BadVersionException(""));
+
+        try {
+            c1.markDelete(positions.get(positions.size() - 1));
+            fail("should have failed");
+        } catch (ManagedLedgerException e) {
+            // Expected
+        }
+
+        metadataStore.unsetAlwaysFail();
+
+        // In memory position is updated
+        assertEquals(c1.getMarkDeletedPosition(), 
positions.get(positions.size() - 1));
+
+        Awaitility.await()
+                // Give chance to the flush to be automatically triggered.
+                // NOTE: this can't be set too low, or it causes issues with 
ZK thread pool rejecting
+                .pollDelay(Duration.ofMillis(2000))
+                .untilAsserted(() -> {
+                    // Abruptly re-open the managed ledger without graceful 
close
+                    @Cleanup("shutdown")
+                    ManagedLedgerFactory factory2 = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
+                    ManagedLedger ledger2 = 
factory2.open("testFlushCursorAfterInactivity", config);
+                    ManagedCursor c2 = ledger2.openCursor("c");
+
+                    assertEquals(c2.getMarkDeletedPosition(), 
positions.get(positions.size() - 1));
+                });
+    }
+
     @Test
     public void testCursorCheckReadPositionChanged() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger", new 
ManagedLedgerConfig());

Reply via email to