This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3d476336327720237a28bb1f3d84da39a08b09cd Author: hangc0276 <[email protected]> AuthorDate: Tue Jun 29 08:51:06 2021 +0800 Fix replay topic policy message not work (#11136) ### Motivation When set topic level retention policy for a topic and restart the broker, the topic level retention policy doesn't work. The reason is when replay the __change_events topic message on `initPolicesCache` stage, it create a reader and read message from earliest and notify the message to update policy for each topic. On update topic policy, it will call getTopicPolicies method. ```Java public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException { if (policyCacheInitMap.containsKey(topicName.getNamespaceObject()) && !policyCacheInitMap.get(topicName.getNamespaceObject())) { throw new TopicPoliciesCacheNotInitException(); } return policiesCache.get(topicName); } ``` This method will check `policyCacheInitMap` whether init or not for specific namespace. However, before replay all message completely, the `policyCacheInitMap` keep in not init stage. Thus the `getTopicPolicies` will throw `TopicPoliciesCacheNotInitException` and the topic policy message will replay failed. ### Modification 1. replay all policy messages after `policyCacheInitMap` initiated. 2. add retention policy check test for broker restart check. (cherry picked from commit e263e09b1fd6fbe99617edb29b966bed3f5d7c0e) --- .../SystemTopicBasedTopicPoliciesService.java | 10 ++++++++- .../pulsar/broker/admin/TopicPoliciesTest.java | 25 ++++++++++++++++++++-- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index fb72f3b..29d004d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -251,7 +251,6 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject()); } refreshTopicPoliciesCache(msg); - notifyListener(msg); if (log.isDebugEnabled()) { log.debug("[{}] Loop next event reading for system topic.", reader.getSystemTopic().getTopicName().getNamespaceObject()); @@ -264,6 +263,15 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic } policyCacheInitMap.computeIfPresent( reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true); + + // replay policy message + policiesCache.forEach(((topicName, topicPolicies) -> { + if (listeners.get(topicName) != null) { + for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) { + listener.onUpdate(topicPolicies); + } + } + })); future.complete(null); } }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 5f98795..15cd6bc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -615,6 +615,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { Awaitility.await().atMost(3, TimeUnit.SECONDS) .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); + // set namespace level inactive topic policies InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up,100,true); admin.namespaces().setInactiveTopicPolicies(myNamespace, inactiveTopicPolicies); @@ -623,13 +624,28 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getInactiveTopicPolicies(myNamespace).getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_subscriptions_caught_up)); + // set namespace retention policies + final RetentionPolicies retentionPolicies = new RetentionPolicies(10, -1); + admin.namespaces().setRetention(myNamespace, retentionPolicies); + Awaitility.await() + .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getRetention(myNamespace), + retentionPolicies)); + + // set topic level inactive topic policies inactiveTopicPolicies = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,200,false); admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies); InactiveTopicPolicies finalInactiveTopicPolicies = inactiveTopicPolicies; Awaitility.await() - .untilAsserted(() -> Assert.assertEquals(admin.topics().getInactiveTopicPolicies(topic), finalInactiveTopicPolicies)); + .untilAsserted(() -> Assert.assertEquals(admin.topics().getInactiveTopicPolicies(topic), + finalInactiveTopicPolicies)); + + // set topic level retention policies + final RetentionPolicies finalRetentionPolicies = new RetentionPolicies(20, -1); + admin.topics().setRetention(topic, finalRetentionPolicies); + Awaitility.await() + .untilAsserted(() -> Assert.assertEquals(admin.topics().getRetention(topic), finalRetentionPolicies)); // restart broker, policy should still take effect restartBroker(); @@ -637,11 +653,16 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { // Trigger the cache init. Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); - + // check inactive topic policies and retention policies. Awaitility.await() .untilAsserted(() -> { PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get(); + ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig(); Assert.assertEquals(persistentTopic.getInactiveTopicPolicies(), finalInactiveTopicPolicies); + Assert.assertEquals(managedLedgerConfig.getRetentionSizeInMB(), + finalRetentionPolicies.getRetentionSizeInMB()); + Assert.assertEquals(managedLedgerConfig.getRetentionTimeMillis(), + TimeUnit.MINUTES.toMillis(finalRetentionPolicies.getRetentionTimeInMinutes())); }); producer.close();
