lhotari commented on code in PR #25101:
URL: https://github.com/apache/pulsar/pull/25101#discussion_r2639617609
##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java:
##########
@@ -1385,16 +1391,182 @@ public void markDeleteComplete(Object ctx) {
}
latch.await();
-
assertEquals(c1.getNumberOfEntries(), 0);
+ // Sleep 1s here to wait ledger rollover finished
+ Thread.sleep(1000);
+
// Reopen
- @Cleanup("shutdown")
- ManagedLedgerFactory factory2 = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
+ @Cleanup("shutdown") ManagedLedgerFactory factory2 = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
+ // flaky test case: factory2.open() may throw
MetadataStoreException$BadVersionException, race condition:
+ // 1. my_test_ledger ledger rollover triggers cursor.asyncMarkDelete()
operation.
+ // 2. factory2.open() triggers ledger recovery, read versionA
ManagedLedgerInfo of my_test_ledger ledger.
+ // 3. cursor.asyncMarkDelete() triggers
MetaStoreImpl.asyncUpdateLedgerIds(), update versionB ManagedLedgerInfo
+ // into metaStore.
+ // 4. factory2.open() triggers MetaStoreImpl.asyncUpdateLedgerIds(),
update versionA ManagedLedgerInfo
+ // into metaStore, then throws BadVersionException and moves
my_test_ledger ledger to fenced state.
+ // See PR https://github.com/apache/pulsar/pull/25087.
+ // Recovery open async_mark_delete_blocking_test_ledger ledger,
ledgerId++
ledger = factory2.open("my_test_ledger");
ManagedCursor c2 = ledger.openCursor("c1");
- assertEquals(c2.getMarkDeletedPosition(), lastPosition.get());
+ // Three cases:
+ // 1. cursor recovered with lastPosition markDeletePosition
+ // 2. cursor recovered with (lastPositionLedgerId+1:-1)
markDeletePosition, cursor ledger not rolled over, we
+ // move markDeletePosition to (lastPositionLegderId+2:-1)
+ // 3. cursor recovered with (lastPositionLedgerId+1:-1)
markDeletePosition, cursor ledger rolled over, we
+ // move markDeletePosition to (lastPositionLegderId+3:-1)
+ // See PR https://github.com/apache/pulsar/pull/25087.
+ log.info("c2 markDeletePosition: {}, lastPosition: {}",
c2.getMarkDeletedPosition(), lastPosition);
+ long lastPositionLedgerId = lastPosition.get().getLedgerId();
+ Awaitility.await().untilAsserted(() ->
+ assertTrue(
+ c2.getMarkDeletedPosition().equals(lastPosition.get())
+ ||
c2.getMarkDeletedPosition().equals(PositionFactory.create(lastPositionLedgerId
+ 2, -1))
+ ||
c2.getMarkDeletedPosition().equals(PositionFactory.create(lastPositionLedgerId
+ 3, -1))
+ )
+ );
+ }
+
+ @Test(timeOut = 20000)
+ public void asyncMarkDeleteBlockingWithOneShot() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ // open async_mark_delete_blocking_test_ledger ledger, create ledger 3.
+ ManagedLedger ledger =
factory.open("async_mark_delete_blocking_test_ledger", config);
+ final ManagedCursor c1 = ledger.openCursor("c1");
+ final AtomicReference<Position> lastPosition = new AtomicReference<>();
+ // just for log debug purpose
+ Deque<Position> positions = new ConcurrentLinkedDeque<>();
+
+ // In previous flaky test, we set num=100, PR
https://github.com/apache/pulsar/pull/25087 will make the test
+ // more flaky. Flaky case:
+ // 1. cursor recovered with markDeletePosition 12:9,
persistentMarkDeletePosition 12:9.
+ // 2. cursor recovered with mark markDeletePosition 13:-1,
persistentMarkDeletePosition 13:-1.
+ // Here, we set num to 101, make sure the ledger 13 is created and
become the active(last) ledger,
+ // and cursor will always be recovered with markDeletePosition 13:0,
persistentMarkDeletePosition 13:0.
+ final int num = 101;
+ final CountDownLatch addEntryLatch = new CountDownLatch(num);
+ // 10 entries per ledger, create ledger 4~13
+ for (int i = 0; i < num; i++) {
+ String entryStr = "entry-" + i;
+ ledger.asyncAddEntry(entryStr.getBytes(Encoding), new
AddEntryCallback() {
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object
ctx) {
+ }
+
+ @Override
+ public void addComplete(Position position, ByteBuf entryData,
Object ctx) {
+ lastPosition.set(position);
+ positions.offer(position);
+ addEntryLatch.countDown();
+ }
+ }, null);
+ }
+ addEntryLatch.await();
+ assertEquals(lastPosition.get(), PositionFactory.create(13, 0));
+
+ // If we set num=100, to avoid flaky test, we should add
Thread.sleep(1000) here to make sure ledger rollover
+ // is finished, but this sleep can not guarantee c1 always recovered
with markDeletePosition 12:9.
+ // Thread.sleep(1000);
+
+ final CountDownLatch markDeleteLatch = new CountDownLatch(1);
+ // Mark delete, create ledger 14 due to cursor ledger state is
NoLedger.
+ // The num=100 flaky test case, markDelete operation is triggered
twice:
+ // 1. first is triggered by c1.asyncMarkDelete(), markDeletePosition
is 12:9.
+ // 2. second is triggered by
ManagedLedgerImpl.updateLedgersIdsComplete() due to ledger full rollover,
+ // The entries in ledger 12 are all consumed, and we move
persistentMarkDeletePosition and
+ // markDeletePosition to 13:-1 due to PR
https://github.com/apache/pulsar/pull/25087.
+ // Before this pr, we will not move persistentMarkDeletePosition.
+ // Two markDelete operations is almost triggered at the same time
without order guarantee:
+ // 1. main thread triggered c1.asyncMarkDelete.
+ // 2. bookkeeper-ml-scheduler-OrderedScheduler-0-0 thread triggered
create ledger 13 due to ledger full
+ // rollover by OpAddEntry.
+ // OpAddEntry will close and create a new ledger when closeWhenDone is
true.
+ // In ManagedLedgerImpl class, MetaStoreCallback cb calls
maybeUpdateCursorBeforeTrimmingConsumedLedger(),
+ // which calls cursor.asyncMarkDelete(), so markDelete operation in
ledger rollover may execute after
+ // AddEntryCallback.addComplete(). The root cause is
cursor.asyncMarkDelete() does not propagate completion or
+ // failure to it caller callback
+ c1.asyncMarkDelete(lastPosition.get(), new MarkDeleteCallback() {
+ @Override
+ public void markDeleteFailed(ManagedLedgerException exception,
Object ctx) {
+ }
+
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ markDeleteLatch.countDown();
+ }
+ }, null);
+ markDeleteLatch.await();
+ assertEquals(c1.getNumberOfEntries(), 0);
+
+ // Reopen
+ @Cleanup("shutdown") ManagedLedgerFactory factory2 = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
+ // Recovery open async_mark_delete_blocking_test_ledger ledger, create
ledger 15.
+ // When executing
ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger(), the
curPointedLedger is 13,
+ // the nextPointedLedger is 15, ledger 13 only has 1 consumed entry
13:0,
+ // so we will move markDeletePosition to 15:-1, see PR
https://github.com/apache/pulsar/pull/25087.
+ ledger = factory2.open("async_mark_delete_blocking_test_ledger");
+ ManagedCursor c2 = ledger.openCursor("c1");
+
+ log.info("positions size: {}, positions: {}", positions.size(),
positions);
+ // To make sure
ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger() is completed,
we should
+ // wait until c2.getMarkDeletedPosition() equals 15:-1, see PR
https://github.com/apache/pulsar/pull/25087.
+ Awaitility.await()
+ .untilAsserted(() -> assertEquals(c2.getMarkDeletedPosition(),
PositionFactory.create(15, -1)));
Review Comment:
remove assertions of specific positions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]