chenhongSZ commented on code in PR #21212:
URL: https://github.com/apache/pulsar/pull/21212#discussion_r1349857137


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java:
##########
@@ -3061,6 +3084,104 @@ public void testGlobalTopicPolicies() throws Exception {
 
     }
 
+    @Test
+    public void testInitPolicesCacheAndNotifyListeners() throws Exception {
+        final String topic = testTopic + UUID.randomUUID();
+        admin.topics().createNonPartitionedTopic(topic);
+        pulsarClient.newProducer().topic(topic).create().close();
+
+        // set up policies
+        TopicName topicName = TopicName.get(topic);
+        TopicPolicies localInitPolicy = 
TopicPolicies.builder().maxConsumerPerTopic(10).build();
+        pulsar.getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
localInitPolicy).get();
+        TopicPolicies globalInitPolicy =
+                
TopicPolicies.builder().maxConsumerPerTopic(20).maxProducerPerTopic(30).isGlobal(true).build();
+        pulsar.getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
globalInitPolicy).get();
+
+        // the policies cache
+        SystemTopicBasedTopicPoliciesService topicPoliciesService
+                = (SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService();
+
+        // reload namespace to trigger init polices cache and notify listeners
+        admin.namespaces().unload(myNamespace);
+        
assertNull(topicPoliciesService.getPoliciesCacheInit(NamespaceName.get(myNamespace)));
+        pulsarClient.newProducer().topic(topic).create().close();
+        Awaitility.await().untilAsserted(
+                () -> 
assertEquals(topicPoliciesService.getPoliciesCacheInit(NamespaceName.get(myNamespace)).isDone()
+                        && 
!topicPoliciesService.getPoliciesCacheInit(NamespaceName.get(myNamespace))
+                        .isCompletedExceptionally(), true));
+
+        // the final policies take effect in topic
+        HierarchyTopicPolicies hierarchyTopicPolicies =
+                
pulsar.getBrokerService().getTopics().get(topic).get().get().getHierarchyTopicPolicies();
+
+        
assertEquals(topicPoliciesService.getTopicPolicies(topicName).getMaxConsumerPerTopic(),
 10);
+        assertEquals(topicPoliciesService.getTopicPolicies(topicName, 
true).getMaxConsumerPerTopic(), 20);
+        
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().getGlobalTopicValue(),
 20);
+        
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().getLocalTopicValue(),
 10);
+        assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 
10);
+
+        
assertEquals(topicPoliciesService.getTopicPolicies(topicName).getMaxProducerPerTopic(),
 null);
+        assertEquals(topicPoliciesService.getTopicPolicies(topicName, 
true).getMaxProducerPerTopic(), 30);
+        
assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().getGlobalTopicValue(),
 30);
+        
assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().getLocalTopicValue(),
 null);
+        assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 
30);
+    }
+
+    @Test
+    public void testInitPolicesCacheAndNotifyListenersAfterCompaction() throws 
Exception {

Review Comment:
   > @chenhongSZ
   > 
   > Sorry, there is a mistake in the test. Could you change the line like 
below? Thanks
   > 
   > ```java
   > private void triggerAndWaitNewTopicCompaction(String topicName) throws 
Exception {
   >     PersistentTopic tp =
   >             (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
   >     // Wait for the old task finish.
   >     Awaitility.await().untilAsserted(() -> {
   >         CompletableFuture<Long> compactionTask = 
WhiteboxImpl.getInternalState(tp, "currentCompaction");
   >         assertTrue(compactionTask == null || compactionTask.isDone());
   >     });
   >     // Trigger a new task.
   >     tp.triggerCompaction();
   >     // Wait for the new task finish.
   >     Awaitility.await().untilAsserted(() -> {
   >         CompletableFuture<Long> compactionTask = 
WhiteboxImpl.getInternalState(tp, "currentCompaction");
   >         assertTrue(compactionTask == null || compactionTask.isDone());
   >     });
   > }
   > 
   > @Test
   > public void testLocalPolicyAffectAfterCompaction() throws Exception {
   >     final String tpName = BrokerTestUtil.newUniqueName("persistent://" + 
myNamespace + "/tp");
   >     final String tpNameChangeEvents = "persistent://" + myNamespace + "/" 
+ NAMESPACE_EVENTS_LOCAL_NAME;
   >     final String subscriptionName = "s1";
   >     final int rateMsgLocal = 2000;
   >     final int rateMsgGlobal = 1000;
   >     admin.topics().createNonPartitionedTopic(tpName);
   >     admin.topics().createSubscription(tpName, subscriptionName, 
MessageId.earliest);
   > 
   >     // Set global policy and local policy.
   >     DispatchRate dispatchRateLocal = new DispatchRateImpl(rateMsgLocal, 1, 
false, 1);
   >     DispatchRate dispatchRateGlobal = new DispatchRateImpl(rateMsgGlobal, 
1, false, 1);
   >     admin.topicPolicies(false).setDispatchRate(tpName, dispatchRateLocal);
   >     admin.topicPolicies(true).setDispatchRate(tpName, dispatchRateGlobal);
   > 
   >     // Trigger __change_events compaction.
   >     triggerAndWaitNewTopicCompaction(tpNameChangeEvents);
   > 
   >     // Create a new SystemTopicBasedTopicPoliciesService and verify the 
local policies was affected.
   >     Optional<TopicPolicies> topicPoliciesOptional =
   >             new 
SystemTopicBasedTopicPoliciesService(pulsar).getTopicPoliciesAsync(TopicName.get(tpName)).join();
   >     assertTrue(topicPoliciesOptional.isPresent());
   >     
assertEquals(topicPoliciesOptional.get().getDispatchRate().getDispatchThrottlingRateInMsg(),
   >             rateMsgLocal);
   > 
   >     // cleanup.
   >     admin.topics().delete(tpName, false);
   > }
   > ```
   
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to