This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new e7c63d35eb2 cherry-pick -x "7c9ad1c"
e7c63d35eb2 is described below

commit e7c63d35eb2d53573e5e636cd1037062655dce7f
Author: lipenghui <[email protected]>
AuthorDate: Tue Jul 12 08:40:18 2022 +0800

    cherry-pick -x "7c9ad1c"
---
 .../pulsar/broker/service/MessageTTLTest.java      | 30 +++++++++++++---------
 1 file changed, 18 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
index e05ec328b41..1fb0d7e7917 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
@@ -22,12 +22,15 @@ import com.google.common.collect.Lists;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import 
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import static org.testng.Assert.assertEquals;
@@ -76,6 +79,8 @@ public class MessageTTLTest extends BrokerTestBase {
             sendFutureList.add(producer.sendAsync(message));
         }
         FutureUtil.waitForAll(sendFutureList).get();
+        MessageIdImpl firstMessageId = (MessageIdImpl) 
sendFutureList.get(0).get();
+        MessageIdImpl lastMessageId = (MessageIdImpl) 
sendFutureList.get(sendFutureList.size() - 1).get();
         producer.close();
         // unload a reload the topic
         // this action created a new ledger
@@ -87,19 +92,20 @@ public class MessageTTLTest extends BrokerTestBase {
         PersistentTopicInternalStats internalStatsBeforeExpire = 
admin.topics().getInternalStats(topicName);
         CursorStats statsBeforeExpire = 
internalStatsBeforeExpire.cursors.get(subscriptionName);
         log.info("markDeletePosition before expire {}", 
statsBeforeExpire.markDeletePosition);
-        assertEquals(statsBeforeExpire.markDeletePosition, PositionImpl.get(3, 
-1).toString());
-
-        // wall clock time, we have to make the message to be considered 
"expired"
-        Thread.sleep(this.conf.getTtlDurationDefaultInSeconds() * 2000L);
-        log.info("***** run message expiry now");
-        this.runMessageExpiryCheck();
-
-        // verify that the markDeletePosition was moved forward, and exacly to 
the last message
-        PersistentTopicInternalStats internalStatsAfterExpire = 
admin.topics().getInternalStats(topicName);
-        CursorStats statsAfterExpire = 
internalStatsAfterExpire.cursors.get(subscriptionName);
-        log.info("markDeletePosition after expire {}", 
statsAfterExpire.markDeletePosition);
-        assertEquals(statsAfterExpire.markDeletePosition, PositionImpl.get(3, 
numMsgs - 1 ).toString());
+        assertEquals(statsBeforeExpire.markDeletePosition,
+                PositionImpl.get(firstMessageId.getLedgerId(), -1).toString());
 
+        Awaitility.await().timeout(30, TimeUnit.SECONDS)
+                .pollDelay(3, TimeUnit.SECONDS).untilAsserted(() -> {
+            this.runMessageExpiryCheck();
+            log.info("***** run message expiry now");
+            // verify that the markDeletePosition was moved forward, and 
exacly to the last message
+            PersistentTopicInternalStats internalStatsAfterExpire = 
admin.topics().getInternalStats(topicName);
+            CursorStats statsAfterExpire = 
internalStatsAfterExpire.cursors.get(subscriptionName);
+            log.info("markDeletePosition after expire {}", 
statsAfterExpire.markDeletePosition);
+            assertEquals(statsAfterExpire.markDeletePosition, 
PositionImpl.get(lastMessageId.getLedgerId(),
+                    lastMessageId.getEntryId() ).toString());
+        });
     }
 
 }

Reply via email to