This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3832439a043b498fe3f7a675fc1054c8d877d750
Author: JiangHaiting <[email protected]>
AuthorDate: Mon Nov 22 09:57:46 2021 +0800

    [broker] Fix topic policy listener deleted by mistake. (#12904)
    
    ### Motivation
    
    Here is the current way of dealing topic policy listeners in 
PersistentTopic, for example topic name is "A", with 3 partitions.
    - Register: call TopicPoliciesService.registerListener("A", listener), for 
all 3 partitions of topic "A".
    - Clean: call TopicPoliciesService.clean("A-partition-x"), here is the 
problem it will delete all listeners of all partitions of topic "A", if any 
partition is closed.
    
    This means, if we calls `admin.topics().unload("A-partition-0")`,  
"A-partition-1" and "A-partition-2" will not be able to receive topic policy 
update callbacks any more.
    
    A detailed case is designed in the new unit test 
`testListenerCleanupByPartition`.
    
    ### Modifications
    
    With previous optimization of #12654 , now we can use 
`org.apache.pulsar.broker.service.TopicPoliciesService#unregisterListener` to 
do the clean up.
    
    (cherry picked from commit a0c96a08de83de6ce51fffd06e907833f075bbca)
---
 .../broker/service/persistent/PersistentTopic.java | 20 ++++++++++--------
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 24 ++++++++++++++++++++++
 .../org/apache/pulsar/common/naming/TopicName.java |  8 ++++++++
 3 files changed, 43 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3e7d733..5e884ea 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1155,8 +1155,7 @@ public class PersistentTopic extends AbstractTopic
 
                                             
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
 
-                                            
brokerService.pulsar().getTopicPoliciesService()
-                                                    
.clean(TopicName.get(topic));
+                                            unregisterTopicPolicyListener();
 
                                             log.info("[{}] Topic deleted", 
topic);
                                             deleteFuture.complete(null);
@@ -1259,7 +1258,7 @@ public class PersistentTopic extends AbstractTopic
 
                                 
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
 
-                                
brokerService.pulsar().getTopicPoliciesService().clean(TopicName.get(topic));
+                                unregisterTopicPolicyListener();
                                 log.info("[{}] Topic closed", topic);
                                 closeFuture.complete(null);
                             })
@@ -3156,13 +3155,16 @@ public class PersistentTopic extends AbstractTopic
     private void registerTopicPolicyListener() {
         if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
                 && 
brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
-            TopicName topicName = TopicName.get(topic);
-            TopicName cloneTopicName = topicName;
-            if (topicName.isPartitioned()) {
-                cloneTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
-            }
+            brokerService.getPulsar().getTopicPoliciesService()
+                    
.registerListener(TopicName.getPartitionedTopicName(topic), this);
+        }
+    }
 
-            
brokerService.getPulsar().getTopicPoliciesService().registerListener(cloneTopicName,
 this);
+    private void unregisterTopicPolicyListener() {
+        if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
+                && 
brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+            brokerService.getPulsar().getTopicPoliciesService()
+                    
.unregisterListener(TopicName.getPartitionedTopicName(topic), this);
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index be52f9a..9a489cf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -259,6 +259,30 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         assertNull(listMap.get(topicName));
     }
 
+    @Test
+    public void testListenerCleanupByPartition() throws Exception {
+        final String topic = "persistent://" + NAMESPACE1 + "/test" + 
UUID.randomUUID();
+        TopicName topicName = TopicName.get(topic);
+        admin.topics().createPartitionedTopic(topic, 3);
+        pulsarClient.newProducer().topic(topic).create().close();
+
+        Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listMap =
+                systemTopicBasedTopicPoliciesService.getListeners();
+        Awaitility.await().untilAsserted(() -> {
+            // all 3 topic partition have registered the topic policy 
listeners.
+            assertEquals(listMap.get(topicName).size(), 3);
+        });
+
+        admin.topics().unload(topicName.getPartition(0).toString());
+        assertEquals(listMap.get(topicName).size(), 2);
+        admin.topics().unload(topicName.getPartition(1).toString());
+        assertEquals(listMap.get(topicName).size(), 1);
+        admin.topics().unload(topicName.getPartition(2).toString());
+        assertNull(listMap.get(topicName));
+    }
+
+
+
     private void prepareData() throws PulsarAdminException {
         admin.clusters().createCluster("test", 
ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
         admin.tenants().createTenant("system-topic",
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index b8537d2..1e91f81 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -94,6 +94,14 @@ public class TopicName implements ServiceUnitId {
         }
     }
 
+    public static TopicName getPartitionedTopicName(String topic) {
+        TopicName topicName = TopicName.get(topic);
+        if (topicName.isPartitioned()) {
+            return TopicName.get(topicName.getPartitionedTopicName());
+        }
+        return topicName;
+    }
+
     public static boolean isValid(String topic) {
         try {
             get(topic);

Reply via email to