This is an automated email from the ASF dual-hosted git repository. bogong pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6ce9de86014624bf082940248b1a54ac8d1877a3 Author: lipenghui <[email protected]> AuthorDate: Tue Jul 12 08:40:18 2022 +0800 [fix][flaky-test] MessageTTLTest.testMessageExpiryAfterTopicUnload (#16462) (cherry picked from commit 7c9ad1c6df971c06ca0da0688959a36914db7fde) --- .../pulsar/broker/service/MessageTTLTest.java | 31 +++++++++++++--------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java index e05ec328b41..e3bf1e17749 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java @@ -22,12 +22,16 @@ import com.google.common.collect.Lists; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.testng.Assert.assertEquals; @@ -76,6 +80,8 @@ public class MessageTTLTest extends BrokerTestBase { sendFutureList.add(producer.sendAsync(message)); } FutureUtil.waitForAll(sendFutureList).get(); + MessageIdImpl firstMessageId = (MessageIdImpl) sendFutureList.get(0).get(); + MessageIdImpl lastMessageId = (MessageIdImpl) sendFutureList.get(sendFutureList.size() - 1).get(); producer.close(); // unload a reload the topic // this action created a new ledger @@ -87,19 +93,20 @@ public class MessageTTLTest extends BrokerTestBase { PersistentTopicInternalStats internalStatsBeforeExpire = admin.topics().getInternalStats(topicName); CursorStats statsBeforeExpire = internalStatsBeforeExpire.cursors.get(subscriptionName); log.info("markDeletePosition before expire {}", statsBeforeExpire.markDeletePosition); - assertEquals(statsBeforeExpire.markDeletePosition, PositionImpl.get(3, -1).toString()); - - // wall clock time, we have to make the message to be considered "expired" - Thread.sleep(this.conf.getTtlDurationDefaultInSeconds() * 2000L); - log.info("***** run message expiry now"); - this.runMessageExpiryCheck(); - - // verify that the markDeletePosition was moved forward, and exacly to the last message - PersistentTopicInternalStats internalStatsAfterExpire = admin.topics().getInternalStats(topicName); - CursorStats statsAfterExpire = internalStatsAfterExpire.cursors.get(subscriptionName); - log.info("markDeletePosition after expire {}", statsAfterExpire.markDeletePosition); - assertEquals(statsAfterExpire.markDeletePosition, PositionImpl.get(3, numMsgs - 1 ).toString()); + assertEquals(statsBeforeExpire.markDeletePosition, + PositionImpl.get(firstMessageId.getLedgerId(), -1).toString()); + Awaitility.await().timeout(30, TimeUnit.SECONDS) + .pollDelay(3, TimeUnit.SECONDS).untilAsserted(() -> { + this.runMessageExpiryCheck(); + log.info("***** run message expiry now"); + // verify that the markDeletePosition was moved forward, and exacly to the last message + PersistentTopicInternalStats internalStatsAfterExpire = admin.topics().getInternalStats(topicName); + CursorStats statsAfterExpire = internalStatsAfterExpire.cursors.get(subscriptionName); + log.info("markDeletePosition after expire {}", statsAfterExpire.markDeletePosition); + assertEquals(statsAfterExpire.markDeletePosition, PositionImpl.get(lastMessageId.getLedgerId(), + lastMessageId.getEntryId() ).toString()); + }); } }
