This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 9bafb849834 [fix][test] Fix ManagedCursorTest and NonDurableCursorTest 
flaky tests (#25101)
9bafb849834 is described below

commit 9bafb849834199a6300ef6a760ad73c39112c251
Author: Oneby Wang <[email protected]>
AuthorDate: Wed Dec 24 00:02:01 2025 +0800

    [fix][test] Fix ManagedCursorTest and NonDurableCursorTest flaky tests 
(#25101)
    
    (cherry picked from commit 9c5e1c310e20d15141da6792fbf3434007278d48)
---
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 194 +++++++++++++++++++--
 .../mledger/impl/NonDurableCursorTest.java         |  10 +-
 .../mledger/util/ManagedLedgerTestUtil.java        |  54 ++++++
 3 files changed, 239 insertions(+), 19 deletions(-)

diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 711c70b5a17..72b5aeccd87 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.impl;
 
 import static 
org.apache.bookkeeper.mledger.impl.EntryCountEstimator.estimateEntryCountByBytesSize;
 import static 
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.any;
@@ -49,6 +50,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -60,6 +62,7 @@ import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
@@ -110,6 +113,7 @@ import 
org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
+import org.apache.bookkeeper.mledger.util.ManagedLedgerTestUtil;
 import org.apache.bookkeeper.mledger.util.ManagedLedgerUtils;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.commons.collections4.iterators.EmptyIterator;
@@ -577,7 +581,10 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
 
         @Cleanup("shutdown")
         ManagedLedgerFactory factory2 = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
-        ledger = factory2.open("my_test_ledger", new 
ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+        // Add retry logic here to prove open operation will finally success 
despite race condition.
+        ledger = ManagedLedgerTestUtil.retry(
+                () -> factory2.open("my_test_ledger", new 
ManagedLedgerConfig().setMaxEntriesPerLedger(1)));
+
 
         c1 = ledger.openCursor("c1");
         c2 = ledger.openCursor("c2");
@@ -1246,7 +1253,8 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
 
     @Test(timeOut = 20000)
     void cursorPersistence() throws Exception {
-        ManagedLedger ledger = factory.open("my_test_ledger");
+        // Open cursor_persistence_ledger ledger, create ledger 3.
+        ManagedLedger ledger = factory.open("cursor_persistence_ledger");
         ManagedCursor c1 = ledger.openCursor("c1");
         ManagedCursor c2 = ledger.openCursor("c2");
         ledger.addEntry("dummy-entry-1".getBytes(Encoding));
@@ -1258,26 +1266,28 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
 
         List<Entry> entries = c1.readEntries(3);
         Position p1 = entries.get(2).getPosition();
+        // Mark delete, create ledger 4 due to cursor ledger state is NoLedger.
         c1.markDelete(p1);
         entries.forEach(Entry::release);
 
         entries = c1.readEntries(4);
         Position p2 = entries.get(2).getPosition();
+        // Mark delete, create ledger 5 due to cursor ledger state is NoLedger.
         c2.markDelete(p2);
         entries.forEach(Entry::release);
 
         // Reopen
-
         @Cleanup("shutdown")
         ManagedLedgerFactory factory2 = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
-        ledger = factory2.open("my_test_ledger");
+        // Recovery open cursor_persistence_ledger ledger, create ledger 6, 
and move mark delete position to 6:-1.
+        // See PR https://github.com/apache/pulsar/pull/25087.
+        ledger = factory2.open("cursor_persistence_ledger");
         c1 = ledger.openCursor("c1");
         c2 = ledger.openCursor("c2");
 
         assertEquals(c1.getMarkDeletedPosition(), p1);
-        // move mark-delete-position from 3:5 to 6:-1 since all the entries 
have been consumed
         ManagedCursor finalC2 = c2;
-        Awaitility.await().untilAsserted(() -> 
assertNotEquals(finalC2.getMarkDeletedPosition(), p2));
+        Awaitility.await().untilAsserted(() -> 
assertThat(finalC2.getMarkDeletedPosition()).isGreaterThan(p2));
     }
 
     @Test(timeOut = 20000)
@@ -1348,7 +1358,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         assertEquals(c4.getMarkDeletedPosition(), p1);
     }
 
-    @Test
+    @Test(timeOut = 20000)
     public void asyncMarkDeleteBlocking() throws Exception {
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setMaxEntriesPerLedger(10);
@@ -1383,16 +1393,166 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         }
 
         latch.await();
+        assertEquals(c1.getNumberOfEntries(), 0);
+
+        // Reopen
+        @Cleanup("shutdown") ManagedLedgerFactory factory2 = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
+        // flaky test case: factory2.open() may throw 
MetadataStoreException$BadVersionException, race condition:
+        // 1. factory2.open() triggers ledger recovery, read versionA 
ManagedLedgerInfo of my_test_ledger ledger.
+        // 2. my_test_ledger ledger rollover triggers 
MetaStoreImpl.asyncUpdateLedgerIds(), update versionB
+        //    ManagedLedgerInfo into metaStore.
+        // 3. factory2.open() triggers MetaStoreImpl.asyncUpdateLedgerIds(), 
update versionA ManagedLedgerInfo
+        //    into metaStore, then throws BadVersionException and moves 
my_test_ledger ledger to fenced state.
+        // Recovery open async_mark_delete_blocking_test_ledger ledger, 
ledgerId++
+        // Add retry logic here to prove open operation will finally success 
despite race condition.
+        ledger = ManagedLedgerTestUtil.retry(() -> 
factory2.open("my_test_ledger"));
+        ManagedCursor c2 = ledger.openCursor("c1");
+
+        // Three cases:
+        // 1. cursor recovered with lastPosition markDeletePosition
+        // 2. cursor recovered with (lastPositionLedgerId+1:-1) 
markDeletePosition, cursor ledger not rolled over, we
+        //    move markDeletePosition to (lastPositionLedgerId+2:-1)
+        // 3. cursor recovered with (lastPositionLedgerId+1:-1) 
markDeletePosition, cursor ledger rolled over, we
+        //    move markDeletePosition to (lastPositionLedgerId+3:-1)
+        // See PR https://github.com/apache/pulsar/pull/25087.
+        
assertThat(c2.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition.get());
+    }
 
+    @Test(timeOut = 20000)
+    public void asyncMarkDeleteBlockingWithOneShot() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        // open async_mark_delete_blocking_test_ledger ledger, create ledger 3.
+        ManagedLedger ledger = 
factory.open("async_mark_delete_blocking_test_ledger", config);
+        final ManagedCursor c1 = ledger.openCursor("c1");
+        final AtomicReference<Position> lastPosition = new AtomicReference<>();
+        // just for log debug purpose
+        Deque<Position> positions = new ConcurrentLinkedDeque<>();
+
+        // In previous flaky test, we set num=100, PR 
https://github.com/apache/pulsar/pull/25087 will make the test
+        // more flaky. Flaky case:
+        //   1. cursor recovered with markDeletePosition 12:9, 
persistentMarkDeletePosition 12:9.
+        //   2. cursor recovered with mark markDeletePosition 13:-1, 
persistentMarkDeletePosition 13:-1.
+        // Here, we set num to 101, make sure the ledger 13 is created and 
become the active(last) ledger,
+        // and cursor will always be recovered with markDeletePosition 13:0, 
persistentMarkDeletePosition 13:0.
+        final int num = 101;
+        final CountDownLatch addEntryLatch = new CountDownLatch(num);
+        // 10 entries per ledger, create ledger 4~13
+        for (int i = 0; i < num; i++) {
+            String entryStr = "entry-" + i;
+            ledger.asyncAddEntry(entryStr.getBytes(Encoding), new 
AddEntryCallback() {
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                }
+
+                @Override
+                public void addComplete(Position position, ByteBuf entryData, 
Object ctx) {
+                    lastPosition.set(position);
+                    positions.offer(position);
+                    addEntryLatch.countDown();
+                }
+            }, null);
+        }
+        addEntryLatch.await();
+
+        // If we set num=100, to avoid flaky test, we should add 
Thread.sleep(1000) here to make sure ledger rollover
+        // is finished, but this sleep can not guarantee c1 always recovered 
with markDeletePosition 12:9.
+        // Thread.sleep(1000);
+
+        final CountDownLatch markDeleteLatch = new CountDownLatch(1);
+        // Mark delete, create ledger 14 due to cursor ledger state is 
NoLedger.
+        // The num=100 flaky test case, markDelete operation is triggered 
twice:
+        //   1. first is triggered by c1.asyncMarkDelete(), markDeletePosition 
is 12:9.
+        //   2. second is triggered by 
ManagedLedgerImpl.updateLedgersIdsComplete() due to ledger full rollover,
+        //      The entries in ledger 12 are all consumed, and we move 
persistentMarkDeletePosition and
+        //      markDeletePosition to 13:-1 due to PR 
https://github.com/apache/pulsar/pull/25087.
+        //      Before this pr, we will not move persistentMarkDeletePosition.
+        // Two markDelete operations is almost triggered at the same time 
without order guarantee:
+        //   1. main thread triggered c1.asyncMarkDelete.
+        //   2. bookkeeper-ml-scheduler-OrderedScheduler-0-0 thread triggered 
create ledger 13 due to ledger full
+        //      rollover by OpAddEntry.
+        // OpAddEntry will close and create a new ledger when closeWhenDone is 
true.
+        // In ManagedLedgerImpl class, MetaStoreCallback cb calls 
maybeUpdateCursorBeforeTrimmingConsumedLedger(),
+        // which calls cursor.asyncMarkDelete(), so markDelete operation in 
ledger rollover may execute after
+        // AddEntryCallback.addComplete(). The root cause is 
cursor.asyncMarkDelete() does not propagate completion or
+        // failure to it caller callback
+        c1.asyncMarkDelete(lastPosition.get(), new MarkDeleteCallback() {
+            @Override
+            public void markDeleteFailed(ManagedLedgerException exception, 
Object ctx) {
+            }
+
+            @Override
+            public void markDeleteComplete(Object ctx) {
+                markDeleteLatch.countDown();
+            }
+        }, null);
+        markDeleteLatch.await();
         assertEquals(c1.getNumberOfEntries(), 0);
 
         // Reopen
-        @Cleanup("shutdown")
-        ManagedLedgerFactory factory2 = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
-        ledger = factory2.open("my_test_ledger");
+        @Cleanup("shutdown") ManagedLedgerFactory factory2 = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
+        // Recovery open async_mark_delete_blocking_test_ledger ledger, create 
ledger 15.
+        // When executing 
ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger(), the 
curPointedLedger is 13,
+        // the nextPointedLedger is 15, ledger 13 only has 1 consumed entry 
13:0,
+        // so we will move markDeletePosition to 15:-1, see PR 
https://github.com/apache/pulsar/pull/25087.
+        ledger = factory2.open("async_mark_delete_blocking_test_ledger");
         ManagedCursor c2 = ledger.openCursor("c1");
 
-        assertEquals(c2.getMarkDeletedPosition(), lastPosition.get());
+        log.info("positions size: {}, positions: {}", positions.size(), 
positions);
+        // To make sure 
ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger() is completed, 
we should
+        // wait until c2.getMarkDeletedPosition() equals 15:-1, see PR 
https://github.com/apache/pulsar/pull/25087.
+        Awaitility.await()
+                .untilAsserted(() -> 
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(lastPosition.get()));
+    }
+
+    @Test(timeOut = 20000)
+    public void asyncMarkDeleteBlockingWithMultiShots() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        config.setMetadataMaxEntriesPerLedger(5);
+        ManagedLedger ledger = 
factory.open("async_mark_delete_blocking_test_ledger", config);
+        final ManagedCursor c1 = ledger.openCursor("c1");
+        final AtomicReference<Position> lastPosition = new AtomicReference<>();
+
+        final int num = 101;
+        final CountDownLatch addEntryLatch = new CountDownLatch(num);
+        for (int i = 0; i < num; i++) {
+            String entryStr = "entry-" + i;
+            ledger.asyncAddEntry(entryStr.getBytes(Encoding), new 
AddEntryCallback() {
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                }
+
+                @Override
+                public void addComplete(Position position, ByteBuf entryData, 
Object ctx) {
+                    lastPosition.set(position);
+                    c1.asyncMarkDelete(lastPosition.get(), new 
MarkDeleteCallback() {
+                        @Override
+                        public void markDeleteFailed(ManagedLedgerException 
exception, Object ctx) {
+                        }
+
+                        @Override
+                        public void markDeleteComplete(Object ctx) {
+                            addEntryLatch.countDown();
+                        }
+                    }, null);
+
+                }
+            }, null);
+        }
+        addEntryLatch.await();
+        assertEquals(c1.getNumberOfEntries(), 0);
+
+        // Reopen
+        @Cleanup("shutdown") ManagedLedgerFactory factory2 = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
+        ledger = factory2.open("async_mark_delete_blocking_test_ledger");
+        ManagedCursor c2 = ledger.openCursor("c1");
+
+        // flaky test case: c2.getMarkDeletedPosition() may be equals 
lastPositionLedgerId+1 or lastPositionLedgerId+2,
+        // the last c1.asyncMarkDelete() operation may trigger a cursor ledger 
rollover
+        // See PR https://github.com/apache/pulsar/pull/25087.
+        Awaitility.await()
+                .untilAsserted(() -> 
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(lastPosition.get()));
     }
 
     @Test(timeOut = 20000)
@@ -1402,7 +1562,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         final ManagedCursor c1 = ledger.openCursor("c1");
 
         final int num = 100;
-        List<Position> positions = new ArrayList();
+        List<Position> positions = new ArrayList<>();
         for (int i = 0; i < num; i++) {
             Position p = ledger.addEntry("dummy-entry".getBytes(Encoding));
             positions.add(p);
@@ -1431,10 +1591,11 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         // Reopen
         @Cleanup("shutdown")
         ManagedLedgerFactory factory2 = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
-        ledger = factory2.open("my_test_ledger");
+        // Add retry logic here to prove open operation will finally success 
despite race condition.
+        ledger = ManagedLedgerTestUtil.retry(() -> 
factory2.open("my_test_ledger"));
         ManagedCursor c2 = ledger.openCursor("c1");
 
-        assertEquals(c2.getMarkDeletedPosition(), lastPosition);
+        
assertThat(c2.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition);
     }
 
     @Test(timeOut = 20000)
@@ -4499,7 +4660,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         ledger.close();
     }
 
-    @Test
+    @Test(timeOut = 20000)
     public void testLazyCursorLedgerCreationForSubscriptionCreation() throws 
Exception {
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
         ManagedLedgerImpl ledger =
@@ -4511,7 +4672,8 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         ledger = (ManagedLedgerImpl) 
factory2.open("testLazyCursorLedgerCreation", managedLedgerConfig);
         assertNotNull(ledger.getCursors().get("test"));
         ManagedCursorImpl cursor1 = (ManagedCursorImpl) 
ledger.openCursor("test");
-        assertEquals(cursor1.getMarkDeletedPosition(), p1);
+        // Reopen ledger may move cursor to next position. See PR 
https://github.com/apache/pulsar/pull/25087.
+        
assertThat(cursor1.getMarkDeletedPosition()).isGreaterThanOrEqualTo(p1);
         factory2.shutdown();
     }
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 79ab89fa6f4..afd560afa7d 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.impl;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
@@ -50,6 +51,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.bookkeeper.mledger.util.ManagedLedgerTestUtil;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
@@ -567,16 +569,18 @@ public class NonDurableCursorTest extends 
MockedBookKeeperTestCase {
         }
 
         latch.await();
-
         assertEquals(c1.getNumberOfEntries(), 0);
 
         // Reopen
         @Cleanup("shutdown")
         ManagedLedgerFactory factory2 = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
-        ledger = factory2.open("my_test_ledger");
+        ledger = ManagedLedgerTestUtil.retry(() -> 
factory2.open("my_test_ledger"));
         ManagedCursor c2 = ledger.openCursor("c1");
 
-        assertEquals(c2.getMarkDeletedPosition(), lastPosition.get());
+        // Since all entries are consumed, we should move mark delete position 
to nextLedgerId:-1.
+        // See PR https://github.com/apache/pulsar/pull/25087.
+        Awaitility.await().untilAsserted(
+                () -> 
assertThat(c2.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition.get()));
     }
 
     @Test(timeOut = 20000)
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerTestUtil.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerTestUtil.java
new file mode 100644
index 00000000000..0acca723f8f
--- /dev/null
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerTestUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.util;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public abstract class ManagedLedgerTestUtil {
+
+    public static <T> T retry(ThrowingSupplier<T> supplier) {
+        return retry(10, supplier);
+    }
+
+    public static <T> T retry(int retryCount, ThrowingSupplier<T> supplier) {
+        for (int i = 0; i < retryCount; i++) {
+            if (i > 0) {
+                try {
+                    log.info("Retrying after 100ms {}/{}", i, retryCount);
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+            try {
+                return supplier.get();
+            } catch (Exception e) {
+                log.warn("Failed to execute supplier: {}", supplier, e);
+            }
+        }
+        throw new RuntimeException("Failed to execute supplier after " + 
retryCount  + " retries");
+    }
+
+    @FunctionalInterface
+    public interface ThrowingSupplier<T> {
+        T get() throws Exception;
+    }
+
+}

Reply via email to