This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d9a43dd2160 [fix][test] Flaky-test: 
testMessageExpiryWithTimestampNonRecoverableException and 
testIncorrectClientClock (#22489)
d9a43dd2160 is described below

commit d9a43dd21605930e16bb038095e36fceff3a4a40
Author: Baodi Shi <[email protected]>
AuthorDate: Mon Apr 15 13:55:34 2024 +0800

    [fix][test] Flaky-test: 
testMessageExpiryWithTimestampNonRecoverableException and 
testIncorrectClientClock (#22489)
---
 .../service/PersistentMessageFinderTest.java       | 42 +++++++++++-----------
 1 file changed, 22 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 6965ac28068..0972c9098b5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.service;
 
-import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doAnswer;
@@ -383,7 +382,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
      *
      * @throws Exception
      */
-    @Test(groups = "flaky")
+    @Test
     void testMessageExpiryWithTimestampNonRecoverableException() throws 
Exception {
 
         final String ledgerAndCursorName = 
"testPersistentMessageExpiryWithNonRecoverableLedgers";
@@ -402,11 +401,15 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         for (int i = 0; i < totalEntries; i++) {
             ledger.addEntry(createMessageWrittenToLedger("msg" + i));
         }
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(ledger.getState(), 
ManagedLedgerImpl.State.LedgerOpened));
 
         List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();
         LedgerInfo lastLedgerInfo = ledgers.get(ledgers.size() - 1);
-
-        assertEquals(ledgers.size(), totalEntries / entriesPerLedger);
+        // The `lastLedgerInfo` should be newly opened, and it does not 
contain any entries. 
+        // Please refer to: https://github.com/apache/pulsar/pull/22034
+        assertEquals(lastLedgerInfo.getEntries(), 0);
+        assertEquals(ledgers.size(), totalEntries / entriesPerLedger + 1);
 
         // this will make sure that all entries should be deleted
         Thread.sleep(TimeUnit.SECONDS.toMillis(ttlSeconds));
@@ -420,19 +423,13 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
 
         PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
-        Position previousMarkDelete = null;
-        for (int i = 0; i < totalEntries; i++) {
-            monitor.expireMessages(1);
-            Position previousPos = previousMarkDelete;
-            retryStrategically(
-                    (test) -> c1.getMarkDeletedPosition() != null && 
!c1.getMarkDeletedPosition().equals(previousPos),
-                    5, 100);
-            previousMarkDelete = c1.getMarkDeletedPosition();
-        }
-
-        PositionImpl markDeletePosition = (PositionImpl) 
c1.getMarkDeletedPosition();
-        assertEquals(lastLedgerInfo.getLedgerId(), 
markDeletePosition.getLedgerId());
-        assertEquals(lastLedgerInfo.getEntries() - 1, 
markDeletePosition.getEntryId());
+        assertTrue(monitor.expireMessages(ttlSeconds));
+        Awaitility.await().untilAsserted(() -> {
+            PositionImpl markDeletePosition = (PositionImpl) 
c1.getMarkDeletedPosition();
+            // The markDeletePosition points to the last entry of the previous 
ledger in lastLedgerInfo.
+            assertEquals(markDeletePosition.getLedgerId(), 
lastLedgerInfo.getLedgerId() - 1);
+            assertEquals(markDeletePosition.getEntryId(), entriesPerLedger - 
1); 
+        });
 
         c1.close();
         ledger.close();
@@ -440,20 +437,25 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
 
     }
 
-    @Test(groups = "flaky")
+    @Test
     public void testIncorrectClientClock() throws Exception {
         final String ledgerAndCursorName = "testIncorrectClientClock";
         int maxTTLSeconds = 1;
+        int entriesNum = 10;
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setMaxEntriesPerLedger(1);
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open(ledgerAndCursorName, config);
         ManagedCursorImpl c1 = (ManagedCursorImpl) 
ledger.openCursor(ledgerAndCursorName);
         // set client clock to 10 days later
         long incorrectPublishTimestamp = System.currentTimeMillis() + 
TimeUnit.DAYS.toMillis(10);
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < entriesNum; i++) {
             ledger.addEntry(createMessageWrittenToLedger("msg" + i, 
incorrectPublishTimestamp));
         }
-        assertEquals(ledger.getLedgersInfoAsList().size(), 10);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(ledger.getState(), 
ManagedLedgerImpl.State.LedgerOpened));
+        // The number of ledgers should be (entriesNum / MaxEntriesPerLedger) 
+ 1
+        // Please refer to: https://github.com/apache/pulsar/pull/22034
+        assertEquals(ledger.getLedgersInfoAsList().size(), entriesNum + 1);
         PersistentTopic mock = mock(PersistentTopic.class);
         when(mock.getName()).thenReturn("topicname");
         when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);

Reply via email to