This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new e7c63d35eb2 cherry-pick -x "7c9ad1c"
e7c63d35eb2 is described below
commit e7c63d35eb2d53573e5e636cd1037062655dce7f
Author: lipenghui <[email protected]>
AuthorDate: Tue Jul 12 08:40:18 2022 +0800
cherry-pick -x "7c9ad1c"
---
.../pulsar/broker/service/MessageTTLTest.java | 30 +++++++++++++---------
1 file changed, 18 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
index e05ec328b41..1fb0d7e7917 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
@@ -22,12 +22,15 @@ import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.testng.Assert.assertEquals;
@@ -76,6 +79,8 @@ public class MessageTTLTest extends BrokerTestBase {
sendFutureList.add(producer.sendAsync(message));
}
FutureUtil.waitForAll(sendFutureList).get();
+ MessageIdImpl firstMessageId = (MessageIdImpl)
sendFutureList.get(0).get();
+ MessageIdImpl lastMessageId = (MessageIdImpl)
sendFutureList.get(sendFutureList.size() - 1).get();
producer.close();
// unload a reload the topic
// this action created a new ledger
@@ -87,19 +92,20 @@ public class MessageTTLTest extends BrokerTestBase {
PersistentTopicInternalStats internalStatsBeforeExpire =
admin.topics().getInternalStats(topicName);
CursorStats statsBeforeExpire =
internalStatsBeforeExpire.cursors.get(subscriptionName);
log.info("markDeletePosition before expire {}",
statsBeforeExpire.markDeletePosition);
- assertEquals(statsBeforeExpire.markDeletePosition, PositionImpl.get(3,
-1).toString());
-
- // wall clock time, we have to make the message to be considered
"expired"
- Thread.sleep(this.conf.getTtlDurationDefaultInSeconds() * 2000L);
- log.info("***** run message expiry now");
- this.runMessageExpiryCheck();
-
- // verify that the markDeletePosition was moved forward, and exacly to
the last message
- PersistentTopicInternalStats internalStatsAfterExpire =
admin.topics().getInternalStats(topicName);
- CursorStats statsAfterExpire =
internalStatsAfterExpire.cursors.get(subscriptionName);
- log.info("markDeletePosition after expire {}",
statsAfterExpire.markDeletePosition);
- assertEquals(statsAfterExpire.markDeletePosition, PositionImpl.get(3,
numMsgs - 1 ).toString());
+ assertEquals(statsBeforeExpire.markDeletePosition,
+ PositionImpl.get(firstMessageId.getLedgerId(), -1).toString());
+ Awaitility.await().timeout(30, TimeUnit.SECONDS)
+ .pollDelay(3, TimeUnit.SECONDS).untilAsserted(() -> {
+ this.runMessageExpiryCheck();
+ log.info("***** run message expiry now");
+ // verify that the markDeletePosition was moved forward, and
exacly to the last message
+ PersistentTopicInternalStats internalStatsAfterExpire =
admin.topics().getInternalStats(topicName);
+ CursorStats statsAfterExpire =
internalStatsAfterExpire.cursors.get(subscriptionName);
+ log.info("markDeletePosition after expire {}",
statsAfterExpire.markDeletePosition);
+ assertEquals(statsAfterExpire.markDeletePosition,
PositionImpl.get(lastMessageId.getLedgerId(),
+ lastMessageId.getEntryId() ).toString());
+ });
}
}