This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ddd77c985252a7f9ec87005f616a41a4ea1128f6
Author: Addison Higham <[email protected]>
AuthorDate: Sun Feb 28 22:54:05 2021 -0700

    Fix marking individual deletes as dirty (#9732)
    
    * Fix marking individual deletes as dirty
    
    When we mark cursors as dirty, we aren't marking when individual acks
    cause a dirty cursor.
    
    This results in cursors not being flushed and causing redelivery.
    
    This one line fix will ensure we mark the cursor as dirty in this
    situation as well
    
    * add a test
    
    * improve tests to not use sleep
    
    * make the polling rate be slower
    
    (cherry picked from commit 34ca8938ca13ea60b05d164c27d9755855caf87c)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  4 +
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 87 ++++++++++++++++++++--
 2 files changed, 83 insertions(+), 8 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8e02b89..4ff912f 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1942,6 +1942,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
         // Apply rate limiting to mark-delete operations
         if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
+            isDirty = true;
             PositionImpl finalNewMarkDeletePosition = newMarkDeletePosition;
             LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this,
                     last -> new MarkDeleteEntry(finalNewMarkDeletePosition, 
last.properties, null, null));
@@ -2929,6 +2930,9 @@ public class ManagedCursorImpl implements ManagedCursor {
         asyncMarkDelete(lastMarkDeleteEntry.newPosition, 
lastMarkDeleteEntry.properties, new MarkDeleteCallback() {
             @Override
             public void markDeleteComplete(Object ctx) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Flushed dirty mark-delete position", 
ledger.getName(), name);
+                }
             }
 
             @Override
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index bcdbeca..4f6e1a0 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -36,6 +36,7 @@ import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.nio.charset.Charset;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -3336,18 +3337,88 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
 
         assertEquals(c1.getMarkDeletedPosition(), 
positions.get(positions.size() - 1));
 
-        // Give chance to the flush to be automatically triggered.
-        Thread.sleep(3000);
+        Awaitility.await()
+                // Give chance to the flush to be automatically triggered.
+                // NOTE: this can't be set too low, or it causes issues with 
ZK thread pool rejecting
+                .pollDelay(Duration.ofMillis(2000))
+                .untilAsserted(() -> {
+                    // Abruptly re-open the managed ledger without graceful 
close
+                    ManagedLedgerFactory factory2 = new 
ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+                    try {
+                        ManagedLedger ledger2 = 
factory2.open("testFlushCursorAfterInactivity", config);
+                        ManagedCursor c2 = ledger2.openCursor("c");
 
-        // Abruptly re-open the managed ledger without graceful close
-        ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, 
bkc.getZkHandle());
-        ManagedLedger ledger2 = 
factory2.open("testFlushCursorAfterInactivity", config);
-        ManagedCursor c2 = ledger2.openCursor("c");
+                        assertEquals(c2.getMarkDeletedPosition(), 
positions.get(positions.size() - 1));
 
-        assertEquals(c2.getMarkDeletedPosition(), 
positions.get(positions.size() - 1));
+                    } finally {
+                        factory2.shutdown();
+                    }
+                });
 
         factory1.shutdown();
-        factory2.shutdown();
+    }
+
+    @Test
+    public void testFlushCursorAfterIndividualDeleteInactivity() throws 
Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setThrottleMarkDelete(1.0);
+
+        ManagedLedgerFactoryConfig factoryConfig = new 
ManagedLedgerFactoryConfig();
+        factoryConfig.setCursorPositionFlushSeconds(1);
+        ManagedLedgerFactory factory1 = new ManagedLedgerFactoryImpl(bkc, 
bkc.getZkHandle(), factoryConfig);
+        ManagedLedger ledger1 = 
factory1.open("testFlushCursorAfterIndDelInactivity", config);
+        ManagedCursor c1 = ledger1.openCursor("c");
+        List<Position> positions = new ArrayList<Position>();
+
+        for (int i = 0; i < 20; i++) {
+            positions.add(ledger1.addEntry(new byte[1024]));
+        }
+
+        CountDownLatch latch = new CountDownLatch(positions.size());
+
+        positions.forEach(p -> c1.asyncDelete(p, new DeleteCallback() {
+            @Override
+            public void deleteComplete(Object ctx) {
+                latch.countDown();
+            }
+
+            @Override
+            public void deleteFailed(ManagedLedgerException exception, Object 
ctx) {
+                throw new RuntimeException(exception);
+            }
+        }, null));
+
+        latch.await();
+
+        assertEquals(c1.getMarkDeletedPosition(), 
positions.get(positions.size() - 1));
+
+        // reopen the cursor and we should see entries not be flushed
+        ManagedLedgerFactory dirtyFactory = new ManagedLedgerFactoryImpl(bkc, 
bkc.getZkHandle());
+        ManagedLedger ledgerDirty = 
dirtyFactory.open("testFlushCursorAfterIndDelInactivity", config);
+        ManagedCursor dirtyCursor = ledgerDirty.openCursor("c");
+
+        assertNotEquals(dirtyCursor.getMarkDeletedPosition(), 
positions.get(positions.size() - 1));
+
+        Awaitility.await()
+                // Give chance to the flush to be automatically triggered.
+                // NOTE: this can't be set too low, or it causes issues with 
ZK thread pool rejecting
+                .pollDelay(Duration.ofMillis(2000))
+                .untilAsserted(() -> {
+                    // Abruptly re-open the managed ledger without graceful 
close
+                    ManagedLedgerFactory factory2 = new 
ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+                    try {
+                        ManagedLedger ledger2 = 
factory2.open("testFlushCursorAfterIndDelInactivity", config);
+                        ManagedCursor c2 = ledger2.openCursor("c");
+
+                        assertEquals(c2.getMarkDeletedPosition(), 
positions.get(positions.size() - 1));
+
+                    } finally {
+                        factory2.shutdown();
+                    }
+                });
+
+        factory1.shutdown();
+        dirtyFactory.shutdown();
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorTest.class);

Reply via email to