This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 51c1985356fbb50e647f135c8addcebcab962f98 Author: Xiaoyu Hou <[email protected]> AuthorDate: Wed Jun 15 14:35:03 2022 +0800 [fix][broker]Fix topic policies update not check message expiry (#15941) (cherry picked from commit cb0cffd6a03799dbbffa54813ebaddba0535787e) --- .../broker/service/persistent/PersistentTopic.java | 2 +- .../pulsar/broker/service/MessageTTLTest.java | 34 ++++++++++++++++++++-- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index a82bdb8d286..ad7903ba8d8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3070,7 +3070,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate())); } replicators.forEach((name, replicator) -> replicator.updateRateLimiter()); - + checkMessageExpiry(); if (policies.getReplicationClusters() != null) { checkReplicationAndRetryOnFailure(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java index e05ec328b41..31556197486 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java @@ -18,19 +18,26 @@ */ package org.apache.pulsar.broker.service; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import com.google.common.collect.Lists; - import java.util.List; import java.util.concurrent.CompletableFuture; +import lombok.Cleanup; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.testng.Assert.assertEquals; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -102,4 +109,27 @@ public class MessageTTLTest extends BrokerTestBase { } + @Test + public void testTTLPoliciesUpdate() throws Exception { + final String namespace = "prop/ns-abc"; + final String topicName = "persistent://" + namespace + "/testTTLPoliciesUpdate"; + + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + assertNotNull(topicRef); + + PersistentTopic topicRefMock = spy(topicRef); + + // Namespace polices must be initiated from admin, which contains `replication_clusters` + Policies policies = admin.namespaces().getPolicies(namespace); + policies.message_ttl_in_seconds = 10; + topicRefMock.onPoliciesUpdate(policies); + verify(topicRefMock, times(1)).checkMessageExpiry(); + + TopicPolicies topicPolicies = new TopicPolicies(); + topicPolicies.setMessageTTLInSeconds(5); + topicRefMock.onUpdate(topicPolicies); + verify(topicRefMock, times(2)).checkMessageExpiry(); + } }
