This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3832439a043b498fe3f7a675fc1054c8d877d750 Author: JiangHaiting <[email protected]> AuthorDate: Mon Nov 22 09:57:46 2021 +0800 [broker] Fix topic policy listener deleted by mistake. (#12904) ### Motivation Here is the current way of dealing topic policy listeners in PersistentTopic, for example topic name is "A", with 3 partitions. - Register: call TopicPoliciesService.registerListener("A", listener), for all 3 partitions of topic "A". - Clean: call TopicPoliciesService.clean("A-partition-x"), here is the problem it will delete all listeners of all partitions of topic "A", if any partition is closed. This means, if we calls `admin.topics().unload("A-partition-0")`, "A-partition-1" and "A-partition-2" will not be able to receive topic policy update callbacks any more. A detailed case is designed in the new unit test `testListenerCleanupByPartition`. ### Modifications With previous optimization of #12654 , now we can use `org.apache.pulsar.broker.service.TopicPoliciesService#unregisterListener` to do the clean up. (cherry picked from commit a0c96a08de83de6ce51fffd06e907833f075bbca) --- .../broker/service/persistent/PersistentTopic.java | 20 ++++++++++-------- .../SystemTopicBasedTopicPoliciesServiceTest.java | 24 ++++++++++++++++++++++ .../org/apache/pulsar/common/naming/TopicName.java | 8 ++++++++ 3 files changed, 43 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 3e7d733..5e884ea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1155,8 +1155,7 @@ public class PersistentTopic extends AbstractTopic subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); - brokerService.pulsar().getTopicPoliciesService() - .clean(TopicName.get(topic)); + unregisterTopicPolicyListener(); log.info("[{}] Topic deleted", topic); deleteFuture.complete(null); @@ -1259,7 +1258,7 @@ public class PersistentTopic extends AbstractTopic subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); - brokerService.pulsar().getTopicPoliciesService().clean(TopicName.get(topic)); + unregisterTopicPolicyListener(); log.info("[{}] Topic closed", topic); closeFuture.complete(null); }) @@ -3156,13 +3155,16 @@ public class PersistentTopic extends AbstractTopic private void registerTopicPolicyListener() { if (brokerService.pulsar().getConfig().isSystemTopicEnabled() && brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) { - TopicName topicName = TopicName.get(topic); - TopicName cloneTopicName = topicName; - if (topicName.isPartitioned()) { - cloneTopicName = TopicName.get(topicName.getPartitionedTopicName()); - } + brokerService.getPulsar().getTopicPoliciesService() + .registerListener(TopicName.getPartitionedTopicName(topic), this); + } + } - brokerService.getPulsar().getTopicPoliciesService().registerListener(cloneTopicName, this); + private void unregisterTopicPolicyListener() { + if (brokerService.pulsar().getConfig().isSystemTopicEnabled() + && brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) { + brokerService.getPulsar().getTopicPoliciesService() + .unregisterListener(TopicName.getPartitionedTopicName(topic), this); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index be52f9a..9a489cf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -259,6 +259,30 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic assertNull(listMap.get(topicName)); } + @Test + public void testListenerCleanupByPartition() throws Exception { + final String topic = "persistent://" + NAMESPACE1 + "/test" + UUID.randomUUID(); + TopicName topicName = TopicName.get(topic); + admin.topics().createPartitionedTopic(topic, 3); + pulsarClient.newProducer().topic(topic).create().close(); + + Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listMap = + systemTopicBasedTopicPoliciesService.getListeners(); + Awaitility.await().untilAsserted(() -> { + // all 3 topic partition have registered the topic policy listeners. + assertEquals(listMap.get(topicName).size(), 3); + }); + + admin.topics().unload(topicName.getPartition(0).toString()); + assertEquals(listMap.get(topicName).size(), 2); + admin.topics().unload(topicName.getPartition(1).toString()); + assertEquals(listMap.get(topicName).size(), 1); + admin.topics().unload(topicName.getPartition(2).toString()); + assertNull(listMap.get(topicName)); + } + + + private void prepareData() throws PulsarAdminException { admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); admin.tenants().createTenant("system-topic", diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index b8537d2..1e91f81 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -94,6 +94,14 @@ public class TopicName implements ServiceUnitId { } } + public static TopicName getPartitionedTopicName(String topic) { + TopicName topicName = TopicName.get(topic); + if (topicName.isPartitioned()) { + return TopicName.get(topicName.getPartitionedTopicName()); + } + return topicName; + } + public static boolean isValid(String topic) { try { get(topic);
