This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d9a43dd2160 [fix][test] Flaky-test:
testMessageExpiryWithTimestampNonRecoverableException and
testIncorrectClientClock (#22489)
d9a43dd2160 is described below
commit d9a43dd21605930e16bb038095e36fceff3a4a40
Author: Baodi Shi <[email protected]>
AuthorDate: Mon Apr 15 13:55:34 2024 +0800
[fix][test] Flaky-test:
testMessageExpiryWithTimestampNonRecoverableException and
testIncorrectClientClock (#22489)
---
.../service/PersistentMessageFinderTest.java | 42 +++++++++++-----------
1 file changed, 22 insertions(+), 20 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 6965ac28068..0972c9098b5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.service;
-import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
@@ -383,7 +382,7 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
*
* @throws Exception
*/
- @Test(groups = "flaky")
+ @Test
void testMessageExpiryWithTimestampNonRecoverableException() throws
Exception {
final String ledgerAndCursorName =
"testPersistentMessageExpiryWithNonRecoverableLedgers";
@@ -402,11 +401,15 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
for (int i = 0; i < totalEntries; i++) {
ledger.addEntry(createMessageWrittenToLedger("msg" + i));
}
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(ledger.getState(),
ManagedLedgerImpl.State.LedgerOpened));
List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();
LedgerInfo lastLedgerInfo = ledgers.get(ledgers.size() - 1);
-
- assertEquals(ledgers.size(), totalEntries / entriesPerLedger);
+ // The `lastLedgerInfo` should be newly opened, and it does not
contain any entries.
+ // Please refer to: https://github.com/apache/pulsar/pull/22034
+ assertEquals(lastLedgerInfo.getEntries(), 0);
+ assertEquals(ledgers.size(), totalEntries / entriesPerLedger + 1);
// this will make sure that all entries should be deleted
Thread.sleep(TimeUnit.SECONDS.toMillis(ttlSeconds));
@@ -420,19 +423,13 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
PersistentMessageExpiryMonitor monitor = new
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
- Position previousMarkDelete = null;
- for (int i = 0; i < totalEntries; i++) {
- monitor.expireMessages(1);
- Position previousPos = previousMarkDelete;
- retryStrategically(
- (test) -> c1.getMarkDeletedPosition() != null &&
!c1.getMarkDeletedPosition().equals(previousPos),
- 5, 100);
- previousMarkDelete = c1.getMarkDeletedPosition();
- }
-
- PositionImpl markDeletePosition = (PositionImpl)
c1.getMarkDeletedPosition();
- assertEquals(lastLedgerInfo.getLedgerId(),
markDeletePosition.getLedgerId());
- assertEquals(lastLedgerInfo.getEntries() - 1,
markDeletePosition.getEntryId());
+ assertTrue(monitor.expireMessages(ttlSeconds));
+ Awaitility.await().untilAsserted(() -> {
+ PositionImpl markDeletePosition = (PositionImpl)
c1.getMarkDeletedPosition();
+ // The markDeletePosition points to the last entry of the previous
ledger in lastLedgerInfo.
+ assertEquals(markDeletePosition.getLedgerId(),
lastLedgerInfo.getLedgerId() - 1);
+ assertEquals(markDeletePosition.getEntryId(), entriesPerLedger -
1);
+ });
c1.close();
ledger.close();
@@ -440,20 +437,25 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
}
- @Test(groups = "flaky")
+ @Test
public void testIncorrectClientClock() throws Exception {
final String ledgerAndCursorName = "testIncorrectClientClock";
int maxTTLSeconds = 1;
+ int entriesNum = 10;
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(1);
ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open(ledgerAndCursorName, config);
ManagedCursorImpl c1 = (ManagedCursorImpl)
ledger.openCursor(ledgerAndCursorName);
// set client clock to 10 days later
long incorrectPublishTimestamp = System.currentTimeMillis() +
TimeUnit.DAYS.toMillis(10);
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < entriesNum; i++) {
ledger.addEntry(createMessageWrittenToLedger("msg" + i,
incorrectPublishTimestamp));
}
- assertEquals(ledger.getLedgersInfoAsList().size(), 10);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(ledger.getState(),
ManagedLedgerImpl.State.LedgerOpened));
+ // The number of ledgers should be (entriesNum / MaxEntriesPerLedger)
+ 1
+ // Please refer to: https://github.com/apache/pulsar/pull/22034
+ assertEquals(ledger.getLedgersInfoAsList().size(), entriesNum + 1);
PersistentTopic mock = mock(PersistentTopic.class);
when(mock.getName()).thenReturn("topicname");
when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);