This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f8ac8ec0df51917475957a9d5dde36da1b4293d3 Author: lipenghui <[email protected]> AuthorDate: Tue Jul 12 08:40:18 2022 +0800 [fix][flaky-test] MessageTTLTest.testMessageExpiryAfterTopicUnload (#16462) (cherry picked from commit 7c9ad1c6df971c06ca0da0688959a36914db7fde) --- .../pulsar/broker/service/MessageTTLTest.java | 33 +++++++++++++--------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java index 31556197486..76f09377edc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java @@ -26,16 +26,20 @@ import static org.testng.Assert.assertNotNull; import com.google.common.collect.Lists; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + import lombok.Cleanup; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterClass; @@ -83,6 +87,8 @@ public class MessageTTLTest extends BrokerTestBase { sendFutureList.add(producer.sendAsync(message)); } FutureUtil.waitForAll(sendFutureList).get(); + MessageIdImpl firstMessageId = (MessageIdImpl) sendFutureList.get(0).get(); + MessageIdImpl lastMessageId = (MessageIdImpl) sendFutureList.get(sendFutureList.size() - 1).get(); producer.close(); // unload a reload the topic // this action created a new ledger @@ -94,19 +100,20 @@ public class MessageTTLTest extends BrokerTestBase { PersistentTopicInternalStats internalStatsBeforeExpire = admin.topics().getInternalStats(topicName); CursorStats statsBeforeExpire = internalStatsBeforeExpire.cursors.get(subscriptionName); log.info("markDeletePosition before expire {}", statsBeforeExpire.markDeletePosition); - assertEquals(statsBeforeExpire.markDeletePosition, PositionImpl.get(3, -1).toString()); - - // wall clock time, we have to make the message to be considered "expired" - Thread.sleep(this.conf.getTtlDurationDefaultInSeconds() * 2000L); - log.info("***** run message expiry now"); - this.runMessageExpiryCheck(); - - // verify that the markDeletePosition was moved forward, and exacly to the last message - PersistentTopicInternalStats internalStatsAfterExpire = admin.topics().getInternalStats(topicName); - CursorStats statsAfterExpire = internalStatsAfterExpire.cursors.get(subscriptionName); - log.info("markDeletePosition after expire {}", statsAfterExpire.markDeletePosition); - assertEquals(statsAfterExpire.markDeletePosition, PositionImpl.get(3, numMsgs - 1 ).toString()); - + assertEquals(statsBeforeExpire.markDeletePosition, + PositionImpl.get(firstMessageId.getLedgerId(), -1).toString()); + + Awaitility.await().timeout(30, TimeUnit.SECONDS) + .pollDelay(3, TimeUnit.SECONDS).untilAsserted(() -> { + this.runMessageExpiryCheck(); + log.info("***** run message expiry now"); + // verify that the markDeletePosition was moved forward, and exacly to the last message + PersistentTopicInternalStats internalStatsAfterExpire = admin.topics().getInternalStats(topicName); + CursorStats statsAfterExpire = internalStatsAfterExpire.cursors.get(subscriptionName); + log.info("markDeletePosition after expire {}", statsAfterExpire.markDeletePosition); + assertEquals(statsAfterExpire.markDeletePosition, PositionImpl.get(lastMessageId.getLedgerId(), + lastMessageId.getEntryId() ).toString()); + }); } @Test
