oneby-wang commented on code in PR #25101:
URL: https://github.com/apache/pulsar/pull/25101#discussion_r2639836537


##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java:
##########
@@ -1385,16 +1391,182 @@ public void markDeleteComplete(Object ctx) {
         }
 
         latch.await();
-
         assertEquals(c1.getNumberOfEntries(), 0);
 
+        // Sleep 1s here to wait ledger rollover finished
+        Thread.sleep(1000);
+
         // Reopen
-        @Cleanup("shutdown")
-        ManagedLedgerFactory factory2 = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
+        @Cleanup("shutdown") ManagedLedgerFactory factory2 = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
+        // flaky test case: factory2.open() may throw 
MetadataStoreException$BadVersionException, race condition:
+        // 1. my_test_ledger ledger rollover triggers cursor.asyncMarkDelete() 
operation.
+        // 2. factory2.open() triggers ledger recovery, read versionA 
ManagedLedgerInfo of my_test_ledger ledger.
+        // 3. cursor.asyncMarkDelete() triggers 
MetaStoreImpl.asyncUpdateLedgerIds(), update versionB ManagedLedgerInfo
+        //    into metaStore.
+        // 4. factory2.open() triggers MetaStoreImpl.asyncUpdateLedgerIds(), 
update versionA ManagedLedgerInfo
+        //    into metaStore, then throws BadVersionException and moves 
my_test_ledger ledger to fenced state.
+        // See PR https://github.com/apache/pulsar/pull/25087.
+        // Recovery open async_mark_delete_blocking_test_ledger ledger, 
ledgerId++
         ledger = factory2.open("my_test_ledger");
         ManagedCursor c2 = ledger.openCursor("c1");
 
-        assertEquals(c2.getMarkDeletedPosition(), lastPosition.get());
+        // Three cases:
+        // 1. cursor recovered with lastPosition markDeletePosition
+        // 2. cursor recovered with (lastPositionLedgerId+1:-1) 
markDeletePosition, cursor ledger not rolled over, we
+        //    move markDeletePosition to (lastPositionLegderId+2:-1)
+        // 3. cursor recovered with (lastPositionLedgerId+1:-1) 
markDeletePosition, cursor ledger rolled over, we
+        //    move markDeletePosition to (lastPositionLegderId+3:-1)
+        // See PR https://github.com/apache/pulsar/pull/25087.
+        log.info("c2 markDeletePosition: {}, lastPosition: {}", 
c2.getMarkDeletedPosition(), lastPosition);
+        long lastPositionLedgerId = lastPosition.get().getLedgerId();
+        Awaitility.await().untilAsserted(() ->
+                assertTrue(
+                        c2.getMarkDeletedPosition().equals(lastPosition.get())
+                        || 
c2.getMarkDeletedPosition().equals(PositionFactory.create(lastPositionLedgerId 
+ 2, -1))
+                        || 
c2.getMarkDeletedPosition().equals(PositionFactory.create(lastPositionLedgerId 
+ 3, -1))
+                )
+        );
+    }
+
+    @Test(timeOut = 20000)
+    public void asyncMarkDeleteBlockingWithOneShot() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        // open async_mark_delete_blocking_test_ledger ledger, create ledger 3.
+        ManagedLedger ledger = 
factory.open("async_mark_delete_blocking_test_ledger", config);
+        final ManagedCursor c1 = ledger.openCursor("c1");
+        final AtomicReference<Position> lastPosition = new AtomicReference<>();
+        // just for log debug purpose
+        Deque<Position> positions = new ConcurrentLinkedDeque<>();
+
+        // In previous flaky test, we set num=100, PR 
https://github.com/apache/pulsar/pull/25087 will make the test
+        // more flaky. Flaky case:
+        //   1. cursor recovered with markDeletePosition 12:9, 
persistentMarkDeletePosition 12:9.
+        //   2. cursor recovered with mark markDeletePosition 13:-1, 
persistentMarkDeletePosition 13:-1.
+        // Here, we set num to 101, make sure the ledger 13 is created and 
become the active(last) ledger,
+        // and cursor will always be recovered with markDeletePosition 13:0, 
persistentMarkDeletePosition 13:0.
+        final int num = 101;
+        final CountDownLatch addEntryLatch = new CountDownLatch(num);
+        // 10 entries per ledger, create ledger 4~13
+        for (int i = 0; i < num; i++) {
+            String entryStr = "entry-" + i;
+            ledger.asyncAddEntry(entryStr.getBytes(Encoding), new 
AddEntryCallback() {
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                }
+
+                @Override
+                public void addComplete(Position position, ByteBuf entryData, 
Object ctx) {
+                    lastPosition.set(position);
+                    positions.offer(position);
+                    addEntryLatch.countDown();
+                }
+            }, null);
+        }
+        addEntryLatch.await();
+        assertEquals(lastPosition.get(), PositionFactory.create(13, 0));

Review Comment:
   > Unit tests should be tied to implementation details too closely. 
   
   Got. Thanks for clearing up. Seems typo? Unit tests shouldn't be tied to 
implementation details too closely. 
   
   I'll modify all these assetions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to