poorbarcode commented on code in PR #21212:
URL: https://github.com/apache/pulsar/pull/21212#discussion_r1349743690


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java:
##########
@@ -3061,6 +3084,104 @@ public void testGlobalTopicPolicies() throws Exception {
 
     }
 
+    @Test
+    public void testInitPolicesCacheAndNotifyListeners() throws Exception {
+        final String topic = testTopic + UUID.randomUUID();
+        admin.topics().createNonPartitionedTopic(topic);
+        pulsarClient.newProducer().topic(topic).create().close();
+
+        // set up policies
+        TopicName topicName = TopicName.get(topic);
+        TopicPolicies localInitPolicy = 
TopicPolicies.builder().maxConsumerPerTopic(10).build();
+        pulsar.getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
localInitPolicy).get();
+        TopicPolicies globalInitPolicy =
+                
TopicPolicies.builder().maxConsumerPerTopic(20).maxProducerPerTopic(30).isGlobal(true).build();
+        pulsar.getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
globalInitPolicy).get();
+
+        // the policies cache
+        SystemTopicBasedTopicPoliciesService topicPoliciesService
+                = (SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService();
+
+        // reload namespace to trigger init polices cache and notify listeners
+        admin.namespaces().unload(myNamespace);
+        
assertNull(topicPoliciesService.getPoliciesCacheInit(NamespaceName.get(myNamespace)));
+        pulsarClient.newProducer().topic(topic).create().close();
+        Awaitility.await().untilAsserted(
+                () -> 
assertEquals(topicPoliciesService.getPoliciesCacheInit(NamespaceName.get(myNamespace)).isDone()
+                        && 
!topicPoliciesService.getPoliciesCacheInit(NamespaceName.get(myNamespace))
+                        .isCompletedExceptionally(), true));
+
+        // the final policies take effect in topic
+        HierarchyTopicPolicies hierarchyTopicPolicies =
+                
pulsar.getBrokerService().getTopics().get(topic).get().get().getHierarchyTopicPolicies();
+
+        
assertEquals(topicPoliciesService.getTopicPolicies(topicName).getMaxConsumerPerTopic(),
 10);
+        assertEquals(topicPoliciesService.getTopicPolicies(topicName, 
true).getMaxConsumerPerTopic(), 20);
+        
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().getGlobalTopicValue(),
 20);
+        
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().getLocalTopicValue(),
 10);
+        assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 
10);
+
+        
assertEquals(topicPoliciesService.getTopicPolicies(topicName).getMaxProducerPerTopic(),
 null);
+        assertEquals(topicPoliciesService.getTopicPolicies(topicName, 
true).getMaxProducerPerTopic(), 30);
+        
assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().getGlobalTopicValue(),
 30);
+        
assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().getLocalTopicValue(),
 null);
+        assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 
30);
+    }
+
+    @Test
+    public void testInitPolicesCacheAndNotifyListenersAfterCompaction() throws 
Exception {

Review Comment:
   The changes below solved this issue. 
   
   
https://github.com/apache/pulsar/pull/21212/files#diff-9d2948d863c111e4be6d508a1c573667a1326b98c4314e917ba9e344bb61dc27R665-L635
   
   <img width="970" alt="截屏2023-10-09 00 11 31" 
src="https://github.com/apache/pulsar/assets/25195800/eb5525bf-505d-4589-85e2-85c554cf7dd4";>
   
   @chenhongSZ 
   
   But this fix also changed the rule of key building, which makes it not 
forward-compatible. For example:
   - The user updates local policies, and Pulsar sends a message with the key 
"persistent://public/default/topic_1"
   - The user upgrades the Pulsar cluster to the newest version.
   - The user updates local policies, and Pulsar sends a message with the key 
"persistent/public/default/topic_1/false"
   - The user updates global policies, and Pulsar sends a message with the key 
"persistent/public/default/topic_1/true"
   
   At this time, there should be three messages in the system topic, but the 
behavior will be correct even if it is not expected, but the behavior will be 
wrong after the steps below.
   
   - Delete the topic
     - Pulsar deletes the key "persistent/public/default/topic_1/false"
     - Pulsar deletes the key "persistent/public/default/topic_1/true"
   - Create a new topic with the same name as the old one.
   
   Currently, the message whose key is "persistent://public/default/topic_1" is 
still there.
   
   I think once the new key is generated, the data of the old key should be 
deleted. And could you also add a test for this case?
   
   By the way, the original rule is `persistent://xxx`, do not change it to 
`persistent/xxx`.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java:
##########
@@ -3061,6 +3084,104 @@ public void testGlobalTopicPolicies() throws Exception {
 
     }
 
+    @Test
+    public void testInitPolicesCacheAndNotifyListeners() throws Exception {
+        final String topic = testTopic + UUID.randomUUID();
+        admin.topics().createNonPartitionedTopic(topic);
+        pulsarClient.newProducer().topic(topic).create().close();
+
+        // set up policies
+        TopicName topicName = TopicName.get(topic);
+        TopicPolicies localInitPolicy = 
TopicPolicies.builder().maxConsumerPerTopic(10).build();
+        pulsar.getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
localInitPolicy).get();
+        TopicPolicies globalInitPolicy =
+                
TopicPolicies.builder().maxConsumerPerTopic(20).maxProducerPerTopic(30).isGlobal(true).build();
+        pulsar.getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
globalInitPolicy).get();
+
+        // the policies cache
+        SystemTopicBasedTopicPoliciesService topicPoliciesService
+                = (SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService();
+
+        // reload namespace to trigger init polices cache and notify listeners
+        admin.namespaces().unload(myNamespace);
+        
assertNull(topicPoliciesService.getPoliciesCacheInit(NamespaceName.get(myNamespace)));
+        pulsarClient.newProducer().topic(topic).create().close();
+        Awaitility.await().untilAsserted(
+                () -> 
assertEquals(topicPoliciesService.getPoliciesCacheInit(NamespaceName.get(myNamespace)).isDone()
+                        && 
!topicPoliciesService.getPoliciesCacheInit(NamespaceName.get(myNamespace))
+                        .isCompletedExceptionally(), true));
+
+        // the final policies take effect in topic
+        HierarchyTopicPolicies hierarchyTopicPolicies =
+                
pulsar.getBrokerService().getTopics().get(topic).get().get().getHierarchyTopicPolicies();
+
+        
assertEquals(topicPoliciesService.getTopicPolicies(topicName).getMaxConsumerPerTopic(),
 10);
+        assertEquals(topicPoliciesService.getTopicPolicies(topicName, 
true).getMaxConsumerPerTopic(), 20);
+        
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().getGlobalTopicValue(),
 20);
+        
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().getLocalTopicValue(),
 10);
+        assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 
10);
+
+        
assertEquals(topicPoliciesService.getTopicPolicies(topicName).getMaxProducerPerTopic(),
 null);
+        assertEquals(topicPoliciesService.getTopicPolicies(topicName, 
true).getMaxProducerPerTopic(), 30);
+        
assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().getGlobalTopicValue(),
 30);
+        
assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().getLocalTopicValue(),
 null);
+        assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 
30);
+    }
+
+    @Test
+    public void testInitPolicesCacheAndNotifyListenersAfterCompaction() throws 
Exception {

Review Comment:
   The changes below solved this issue. 
   
   
https://github.com/apache/pulsar/pull/21212/files#diff-9d2948d863c111e4be6d508a1c573667a1326b98c4314e917ba9e344bb61dc27R665-L635
   
   <img width="970" alt="截屏2023-10-09 00 11 31" 
src="https://github.com/apache/pulsar/assets/25195800/eb5525bf-505d-4589-85e2-85c554cf7dd4";>
   
   @chenhongSZ 
   
   But this fix also changed the rule of key building, which makes it not 
forward-compatible. For example:
   - The user updates local policies, and Pulsar sends a message with the key 
"persistent://public/default/topic_1"
   - The user upgrades the Pulsar cluster to the newest version.
   - The user updates local policies, and Pulsar sends a message with the key 
"persistent/public/default/topic_1/false"
   - The user updates global policies, and Pulsar sends a message with the key 
"persistent/public/default/topic_1/true"
   
   At this time, there should be three messages in the system topic, but the 
behavior will be correct even if it is not expected, but the behavior will be 
wrong after the steps below.
   
   - Delete the topic
     - Pulsar deletes the key "persistent/public/default/topic_1/false"
     - Pulsar deletes the key "persistent/public/default/topic_1/true"
   - Create a new topic with the same name as the old one.
   
   Currently, the message whose key is "persistent://public/default/topic_1" is 
still there.
   
   I think once the new key is generated, the data of the old key should be 
deleted. And could you also add a test for this case?
   
   By the way, the original rule is `persistent://xxx`, do not change it to 
`persistent/xxx`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to