poorbarcode commented on code in PR #21212:
URL: https://github.com/apache/pulsar/pull/21212#discussion_r1349615107
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java:
##########
@@ -3061,6 +3087,270 @@ public void testGlobalTopicPolicies() throws Exception {
}
+ @Test
+ public void testInitTopicPolicy() throws Exception {
+ // create topic
+ final String topic = testTopic + UUID.randomUUID();
+ admin.topics().createNonPartitionedTopic(topic);
+ pulsarClient.newProducer().topic(topic).create().close();
+
+ // unload
+ admin.namespaces().unload(myNamespace);
+ // the policies cache
+ SystemTopicBasedTopicPoliciesService topicPoliciesService
+ = (SystemTopicBasedTopicPoliciesService)
pulsar.getTopicPoliciesService();
+
assertNull(topicPoliciesService.getPoliciesCacheInit(NamespaceName.get(myNamespace)));
+
+ // set up policies
+ TopicName topicName = TopicName.get(topic);
+ TopicPolicies localInitPolicy =
TopicPolicies.builder().maxConsumerPerTopic(10).build();
+ pulsar.getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
localInitPolicy).get();
+ TopicPolicies globalInitPolicy =
+
TopicPolicies.builder().maxConsumerPerTopic(20).maxProducerPerTopic(30).isGlobal(true).build();
+ pulsar.getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
globalInitPolicy).get();
+
+ Awaitility.await().untilAsserted(
+ () ->
assertEquals(topicPoliciesService.getPoliciesCacheInit(NamespaceName.get(myNamespace)).isDone()
+ &&
!topicPoliciesService.getPoliciesCacheInit(NamespaceName.get(myNamespace))
+ .isCompletedExceptionally(), true));
+
+ // load up topic after the corresponding namespace's policy caches
initialization to make sure the topic
+ // polices applied in
`org.apache.pulsar.broker.service.persistent.PersistentTopic.initTopicPolicy`
+ pulsarClient.newProducer().topic(topic).create().close();
+
+ // the final policies take effect in topic
+ HierarchyTopicPolicies hierarchyTopicPolicies =
+
pulsar.getBrokerService().getTopics().get(topic).get().get().getHierarchyTopicPolicies();
+
+
assertEquals(topicPoliciesService.getTopicPolicies(topicName).getMaxConsumerPerTopic(),
10);
+ assertEquals(topicPoliciesService.getTopicPolicies(topicName,
true).getMaxConsumerPerTopic(), 20);
+
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().getGlobalTopicValue(),
20);
+
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().getLocalTopicValue(),
10);
+ assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(),
10);
+
+
assertEquals(topicPoliciesService.getTopicPolicies(topicName).getMaxProducerPerTopic(),
null);
+ assertEquals(topicPoliciesService.getTopicPolicies(topicName,
true).getMaxProducerPerTopic(), 30);
+
assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().getGlobalTopicValue(),
30);
+
assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().getLocalTopicValue(),
null);
+ assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().get(),
30);
+ }
+
+ @Test
+ public void testInitPolicesCacheAndNotifyListeners() throws Exception {
+ // create topic
+ final String topic = testTopic + UUID.randomUUID();
+ admin.topics().createNonPartitionedTopic(topic);
+ pulsarClient.newProducer().topic(topic).create().close();
+
+ // set up policies for testTopic
+ TopicName topicName = TopicName.get(topic);
+ TopicPolicies localInitPolicy =
TopicPolicies.builder().maxConsumerPerTopic(10).build();
+ pulsar.getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
localInitPolicy).get();
+ TopicPolicies globalInitPolicy =
+
TopicPolicies.builder().maxConsumerPerTopic(20).maxProducerPerTopic(30).isGlobal(true).build();
+ pulsar.getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
globalInitPolicy).get();
+
+ /*
+ if the topic initialization is completed before the corresponding
namespace's policy caches initialization,
+ the topic policies will be applied in
+
`org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.initPolicesCacheAndNotifyListeners`,
+ otherwise it will be applied in
+
`org.apache.pulsar.broker.service.persistent.PersistentTopic.initTopicPolicy`
+
+ the topicPolicyEventsTopic initialization always completed before
the corresponding namespace's policy
+ caches initialization, use topicPolicyEventsTopic to verify
`initPolicesCacheAndNotifyListeners`
+ */
+ // set up policies for eventsTopic
+ TopicName eventsTopicName = TopicName.get(topicPolicyEventsTopic);
+ TopicPolicies eventsLocalInitPolicy =
TopicPolicies.builder().maxConsumerPerTopic(40).build();
+
pulsar.getTopicPoliciesService().updateTopicPoliciesAsync(eventsTopicName,
eventsLocalInitPolicy).get();
+ TopicPolicies eventsGlobalInitPolicy =
+
TopicPolicies.builder().maxConsumerPerTopic(50).maxProducerPerTopic(60).isGlobal(true).build();
+
pulsar.getTopicPoliciesService().updateTopicPoliciesAsync(eventsTopicName,
eventsGlobalInitPolicy).get();
+
+ // reload namespace to trigger init polices cache and notify listeners
+ admin.namespaces().unload(myNamespace);
+ // the policies cache
+ SystemTopicBasedTopicPoliciesService topicPoliciesService
+ = (SystemTopicBasedTopicPoliciesService)
pulsar.getTopicPoliciesService();
+
assertNull(topicPoliciesService.getPoliciesCacheInit(NamespaceName.get(myNamespace)));
+
+ pulsarClient.newProducer().topic(topic).create().close();
+ Awaitility.await().untilAsserted(
+ () ->
assertEquals(topicPoliciesService.getPoliciesCacheInit(NamespaceName.get(myNamespace)).isDone()
+ &&
!topicPoliciesService.getPoliciesCacheInit(NamespaceName.get(myNamespace))
+ .isCompletedExceptionally(), true));
+
+ // check testTopic start
+ // the final policies take effect in topic
+ HierarchyTopicPolicies hierarchyTopicPolicies =
+
pulsar.getBrokerService().getTopics().get(topic).get().get().getHierarchyTopicPolicies();
+
assertEquals(topicPoliciesService.getTopicPolicies(topicName).getMaxConsumerPerTopic(),
10);
+ assertEquals(topicPoliciesService.getTopicPolicies(topicName,
true).getMaxConsumerPerTopic(), 20);
+
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().getGlobalTopicValue(),
20);
+
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().getLocalTopicValue(),
10);
+ assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(),
10);
+
+
assertEquals(topicPoliciesService.getTopicPolicies(topicName).getMaxProducerPerTopic(),
null);
+ assertEquals(topicPoliciesService.getTopicPolicies(topicName,
true).getMaxProducerPerTopic(), 30);
+
assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().getGlobalTopicValue(),
30);
+
assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().getLocalTopicValue(),
null);
+ assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().get(),
30);
+ // check testTopic end
+
+ // check eventsTopic start
+ // the final policies take effect in topic
+ HierarchyTopicPolicies eventsHierarchyTopicPolicies =
+
pulsar.getBrokerService().getTopics().get(topicPolicyEventsTopic).get().get().getHierarchyTopicPolicies();
+
assertEquals(topicPoliciesService.getTopicPolicies(eventsTopicName).getMaxConsumerPerTopic(),
40);
+ assertEquals(topicPoliciesService.getTopicPolicies(eventsTopicName,
true).getMaxConsumerPerTopic(), 50);
+
assertEquals(eventsHierarchyTopicPolicies.getMaxConsumerPerTopic().getGlobalTopicValue(),
50);
+
assertEquals(eventsHierarchyTopicPolicies.getMaxConsumerPerTopic().getLocalTopicValue(),
40);
+
assertEquals(eventsHierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 40);
+
+
assertEquals(topicPoliciesService.getTopicPolicies(eventsTopicName).getMaxProducerPerTopic(),
null);
+ assertEquals(topicPoliciesService.getTopicPolicies(eventsTopicName,
true).getMaxProducerPerTopic(), 60);
+
assertEquals(eventsHierarchyTopicPolicies.getMaxProducersPerTopic().getGlobalTopicValue(),
60);
+
assertEquals(eventsHierarchyTopicPolicies.getMaxProducersPerTopic().getLocalTopicValue(),
null);
+
assertEquals(eventsHierarchyTopicPolicies.getMaxProducersPerTopic().get(), 60);
+ // check eventsTopic end
+ }
+
+ @Test
+ public void testGetTopicPoliciesWhenDeleteTopicPolicy2() throws Exception {
Review Comment:
Sorry, I forgot to rename this method. Please help to rename it to
"testGlobalPolicyAfterUnloadTopic"
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java:
##########
@@ -3061,6 +3084,104 @@ public void testGlobalTopicPolicies() throws Exception {
}
+ @Test
+ public void testInitPolicesCacheAndNotifyListeners() throws Exception {
+ final String topic = testTopic + UUID.randomUUID();
+ admin.topics().createNonPartitionedTopic(topic);
+ pulsarClient.newProducer().topic(topic).create().close();
+
+ // set up policies
+ TopicName topicName = TopicName.get(topic);
+ TopicPolicies localInitPolicy =
TopicPolicies.builder().maxConsumerPerTopic(10).build();
+ pulsar.getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
localInitPolicy).get();
+ TopicPolicies globalInitPolicy =
+
TopicPolicies.builder().maxConsumerPerTopic(20).maxProducerPerTopic(30).isGlobal(true).build();
+ pulsar.getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
globalInitPolicy).get();
+
+ // the policies cache
+ SystemTopicBasedTopicPoliciesService topicPoliciesService
+ = (SystemTopicBasedTopicPoliciesService)
pulsar.getTopicPoliciesService();
+
+ // reload namespace to trigger init polices cache and notify listeners
+ admin.namespaces().unload(myNamespace);
+
assertNull(topicPoliciesService.getPoliciesCacheInit(NamespaceName.get(myNamespace)));
+ pulsarClient.newProducer().topic(topic).create().close();
+ Awaitility.await().untilAsserted(
+ () ->
assertEquals(topicPoliciesService.getPoliciesCacheInit(NamespaceName.get(myNamespace)).isDone()
+ &&
!topicPoliciesService.getPoliciesCacheInit(NamespaceName.get(myNamespace))
+ .isCompletedExceptionally(), true));
+
+ // the final policies take effect in topic
+ HierarchyTopicPolicies hierarchyTopicPolicies =
+
pulsar.getBrokerService().getTopics().get(topic).get().get().getHierarchyTopicPolicies();
+
+
assertEquals(topicPoliciesService.getTopicPolicies(topicName).getMaxConsumerPerTopic(),
10);
+ assertEquals(topicPoliciesService.getTopicPolicies(topicName,
true).getMaxConsumerPerTopic(), 20);
+
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().getGlobalTopicValue(),
20);
+
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().getLocalTopicValue(),
10);
+ assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(),
10);
+
+
assertEquals(topicPoliciesService.getTopicPolicies(topicName).getMaxProducerPerTopic(),
null);
+ assertEquals(topicPoliciesService.getTopicPolicies(topicName,
true).getMaxProducerPerTopic(), 30);
+
assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().getGlobalTopicValue(),
30);
+
assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().getLocalTopicValue(),
null);
+ assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().get(),
30);
+ }
+
+ @Test
+ public void testInitPolicesCacheAndNotifyListenersAfterCompaction() throws
Exception {
Review Comment:
I do not think the test you provided can cover the case. Could you try to
run the test below? By the way,
```java
private void triggerAndWaitNewTopicCompaction(String topicName) throws
Exception {
PersistentTopic tp =
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName,
false).join().get();
// Wait for the old task finish.
Awaitility.await().untilAsserted(() -> {
CompletableFuture<Long> compactionTask =
WhiteboxImpl.getInternalState(tp, "currentCompaction");
assertTrue(compactionTask == null || compactionTask.isDone());
});
// Trigger a new task.
tp.triggerCompaction();
// Wait for the new task finish.
Awaitility.await().untilAsserted(() -> {
CompletableFuture<Long> compactionTask =
WhiteboxImpl.getInternalState(tp, "currentCompaction");
assertTrue(compactionTask == null || compactionTask.isDone());
});
}
/***
* It is not a thread safety method, something will go to a wrong pointer if
there is a task is trying to load a
* topic policies.
*/
private void clearTopicPoliciesCache() {
TopicPoliciesService topicPoliciesService =
pulsar.getTopicPoliciesService();
if (topicPoliciesService instanceof
TopicPoliciesService.TopicPoliciesServiceDisabled) {
return;
}
assertTrue(topicPoliciesService instanceof
SystemTopicBasedTopicPoliciesService);
Map<NamespaceName, CompletableFuture<Void>> policyCacheInitMap =
WhiteboxImpl.getInternalState(topicPoliciesService,
"policyCacheInitMap");
for (CompletableFuture<Void> future : policyCacheInitMap.values()) {
future.join();
}
Map<TopicName, TopicPolicies> policiesCache =
WhiteboxImpl.getInternalState(topicPoliciesService,
"policiesCache");
Map<TopicName, TopicPolicies> globalPoliciesCache =
WhiteboxImpl.getInternalState(topicPoliciesService,
"globalPoliciesCache");
policyCacheInitMap.clear();
policiesCache.clear();
globalPoliciesCache.clear();
}
@Test
public void testGetTopicPoliciesWhenDeleteTopicPolicy2() throws Exception {
final String tpName = BrokerTestUtil.newUniqueName("persistent://" +
myNamespace + "/tp");
final String tpNameChangeEvents = "persistent://" + myNamespace + "/" +
NAMESPACE_EVENTS_LOCAL_NAME;
final String subscriptionName = "s1";
final int rateMsgLocal = 2000;
final int rateMsgGlobal = 1000;
admin.topics().createNonPartitionedTopic(tpName);
admin.topics().createSubscription(tpName, subscriptionName,
MessageId.earliest);
// Set global policy and local policy.
DispatchRate dispatchRateLocal = new DispatchRateImpl(rateMsgLocal, 1,
false, 1);
DispatchRate dispatchRateGlobal = new DispatchRateImpl(rateMsgGlobal, 1,
false, 1);
admin.topicPolicies(false).setDispatchRate(tpName, dispatchRateLocal);
admin.topicPolicies(true).setDispatchRate(tpName, dispatchRateGlobal);
// Trigger __change_events compaction and clear topic policies cache.
triggerAndWaitNewTopicCompaction(tpNameChangeEvents);
clearTopicPoliciesCache();
// Reload the topic policies.
// Verify the local policies was affected.
Optional<TopicPolicies> topicPoliciesOptional =
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName)).join();
assertTrue(topicPoliciesOptional.isPresent());
assertEquals(topicPoliciesOptional.get().getDispatchRate().getDispatchThrottlingRateInMsg(),
rateMsgLocal);
// cleanup.
admin.topics().delete(tpName, false);
}
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java:
##########
@@ -3061,6 +3084,104 @@ public void testGlobalTopicPolicies() throws Exception {
}
+ @Test
+ public void testInitPolicesCacheAndNotifyListeners() throws Exception {
+ final String topic = testTopic + UUID.randomUUID();
+ admin.topics().createNonPartitionedTopic(topic);
+ pulsarClient.newProducer().topic(topic).create().close();
+
+ // set up policies
+ TopicName topicName = TopicName.get(topic);
+ TopicPolicies localInitPolicy =
TopicPolicies.builder().maxConsumerPerTopic(10).build();
+ pulsar.getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
localInitPolicy).get();
+ TopicPolicies globalInitPolicy =
+
TopicPolicies.builder().maxConsumerPerTopic(20).maxProducerPerTopic(30).isGlobal(true).build();
+ pulsar.getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
globalInitPolicy).get();
+
+ // the policies cache
+ SystemTopicBasedTopicPoliciesService topicPoliciesService
+ = (SystemTopicBasedTopicPoliciesService)
pulsar.getTopicPoliciesService();
+
+ // reload namespace to trigger init polices cache and notify listeners
+ admin.namespaces().unload(myNamespace);
+
assertNull(topicPoliciesService.getPoliciesCacheInit(NamespaceName.get(myNamespace)));
+ pulsarClient.newProducer().topic(topic).create().close();
+ Awaitility.await().untilAsserted(
+ () ->
assertEquals(topicPoliciesService.getPoliciesCacheInit(NamespaceName.get(myNamespace)).isDone()
+ &&
!topicPoliciesService.getPoliciesCacheInit(NamespaceName.get(myNamespace))
+ .isCompletedExceptionally(), true));
+
+ // the final policies take effect in topic
+ HierarchyTopicPolicies hierarchyTopicPolicies =
+
pulsar.getBrokerService().getTopics().get(topic).get().get().getHierarchyTopicPolicies();
+
+
assertEquals(topicPoliciesService.getTopicPolicies(topicName).getMaxConsumerPerTopic(),
10);
+ assertEquals(topicPoliciesService.getTopicPolicies(topicName,
true).getMaxConsumerPerTopic(), 20);
+
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().getGlobalTopicValue(),
20);
+
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().getLocalTopicValue(),
10);
+ assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(),
10);
+
+
assertEquals(topicPoliciesService.getTopicPolicies(topicName).getMaxProducerPerTopic(),
null);
+ assertEquals(topicPoliciesService.getTopicPolicies(topicName,
true).getMaxProducerPerTopic(), 30);
+
assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().getGlobalTopicValue(),
30);
+
assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().getLocalTopicValue(),
null);
+ assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().get(),
30);
+ }
+
+ @Test
+ public void testInitPolicesCacheAndNotifyListenersAfterCompaction() throws
Exception {
Review Comment:
I do not think the test you provided can cover the case. Could you try to
run the test below?
```java
private void triggerAndWaitNewTopicCompaction(String topicName) throws
Exception {
PersistentTopic tp =
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName,
false).join().get();
// Wait for the old task finish.
Awaitility.await().untilAsserted(() -> {
CompletableFuture<Long> compactionTask =
WhiteboxImpl.getInternalState(tp, "currentCompaction");
assertTrue(compactionTask == null || compactionTask.isDone());
});
// Trigger a new task.
tp.triggerCompaction();
// Wait for the new task finish.
Awaitility.await().untilAsserted(() -> {
CompletableFuture<Long> compactionTask =
WhiteboxImpl.getInternalState(tp, "currentCompaction");
assertTrue(compactionTask == null || compactionTask.isDone());
});
}
/***
* It is not a thread safety method, something will go to a wrong pointer if
there is a task is trying to load a
* topic policies.
*/
private void clearTopicPoliciesCache() {
TopicPoliciesService topicPoliciesService =
pulsar.getTopicPoliciesService();
if (topicPoliciesService instanceof
TopicPoliciesService.TopicPoliciesServiceDisabled) {
return;
}
assertTrue(topicPoliciesService instanceof
SystemTopicBasedTopicPoliciesService);
Map<NamespaceName, CompletableFuture<Void>> policyCacheInitMap =
WhiteboxImpl.getInternalState(topicPoliciesService,
"policyCacheInitMap");
for (CompletableFuture<Void> future : policyCacheInitMap.values()) {
future.join();
}
Map<TopicName, TopicPolicies> policiesCache =
WhiteboxImpl.getInternalState(topicPoliciesService,
"policiesCache");
Map<TopicName, TopicPolicies> globalPoliciesCache =
WhiteboxImpl.getInternalState(topicPoliciesService,
"globalPoliciesCache");
policyCacheInitMap.clear();
policiesCache.clear();
globalPoliciesCache.clear();
}
@Test
public void testGetTopicPoliciesWhenDeleteTopicPolicy2() throws Exception {
final String tpName = BrokerTestUtil.newUniqueName("persistent://" +
myNamespace + "/tp");
final String tpNameChangeEvents = "persistent://" + myNamespace + "/" +
NAMESPACE_EVENTS_LOCAL_NAME;
final String subscriptionName = "s1";
final int rateMsgLocal = 2000;
final int rateMsgGlobal = 1000;
admin.topics().createNonPartitionedTopic(tpName);
admin.topics().createSubscription(tpName, subscriptionName,
MessageId.earliest);
// Set global policy and local policy.
DispatchRate dispatchRateLocal = new DispatchRateImpl(rateMsgLocal, 1,
false, 1);
DispatchRate dispatchRateGlobal = new DispatchRateImpl(rateMsgGlobal, 1,
false, 1);
admin.topicPolicies(false).setDispatchRate(tpName, dispatchRateLocal);
admin.topicPolicies(true).setDispatchRate(tpName, dispatchRateGlobal);
// Trigger __change_events compaction and clear topic policies cache.
triggerAndWaitNewTopicCompaction(tpNameChangeEvents);
clearTopicPoliciesCache();
// Reload the topic policies.
// Verify the local policies was affected.
Optional<TopicPolicies> topicPoliciesOptional =
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName)).join();
assertTrue(topicPoliciesOptional.isPresent());
assertEquals(topicPoliciesOptional.get().getDispatchRate().getDispatchThrottlingRateInMsg(),
rateMsgLocal);
// cleanup.
admin.topics().delete(tpName, false);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]