poorbarcode commented on code in PR #21212:
URL: https://github.com/apache/pulsar/pull/21212#discussion_r1349491768
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3565,10 +3566,11 @@ private CompletableFuture<Void>
updateSubscriptionsDispatcherRateLimiter() {
protected CompletableFuture<Void> initTopicPolicy() {
if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
&&
brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
- return CompletableFuture.completedFuture(null).thenRunAsync(() ->
onUpdate(
- brokerService.getPulsar().getTopicPoliciesService()
-
.getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic))),
- brokerService.getTopicOrderedExecutor());
+ return CompletableFuture.completedFuture(null).thenRunAsync(() -> {
+ TopicPoliciesService topicPoliciesService =
brokerService.getPulsar().getTopicPoliciesService();
+
onUpdate(topicPoliciesService.getLocalTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic)));
+
onUpdate(topicPoliciesService.getGlobalTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic)));
Review Comment:
I wrote a test for this case, could you add it into current PR?
```java
@Test
public void testGetTopicPoliciesWhenDeleteTopicPolicy2() throws Exception {
final String tpName = BrokerTestUtil.newUniqueName("persistent://" +
myNamespace + "/tp");
final String subscriptionName = "s1";
final int dispatchThrottlingRateInMsg = 1000;
admin.topics().createNonPartitionedTopic(tpName);
PersistentTopic persistentTopic1 =
(PersistentTopic) pulsar.getBrokerService().getTopic(tpName,
false).join().get();
admin.topics().createSubscription(tpName, subscriptionName,
MessageId.earliest);
Producer producer = pulsarClient.newProducer().topic(tpName).create();
// Set global policy.
DispatchRate dispatchRate = new
DispatchRateImpl(dispatchThrottlingRateInMsg, 1, false, 1);
admin.topicPolicies(true).setDispatchRate(tpName, dispatchRate);
// Assert policy was affected.
Awaitility.await().ignoreExceptions().untilAsserted(() -> {
HierarchyTopicPolicies policies =
persistentTopic1.getHierarchyTopicPolicies();
assertNotNull(policies);
assertEquals(policies.getDispatchRate().getTopicValue().getDispatchThrottlingRateInMsg(),
dispatchThrottlingRateInMsg);
DispatchRate dispatchRateAp =
admin.topicPolicies(true).getDispatchRate(tpName, true);
assertEquals(dispatchRateAp.getDispatchThrottlingRateInMsg(),
dispatchThrottlingRateInMsg);
});
// Unload topic and check again.
admin.topics().unload(tpName);
// Wait topic load complete.
producer.send("1".getBytes(StandardCharsets.UTF_8));
PersistentTopic persistentTopic2 =
(PersistentTopic) pulsar.getBrokerService().getTopic(tpName,
false).join().get();
// Assert policy was affected.
Awaitility.await().ignoreExceptions().untilAsserted(() -> {
HierarchyTopicPolicies policies =
persistentTopic2.getHierarchyTopicPolicies();
assertNotNull(policies);
assertEquals(policies.getDispatchRate().getTopicValue().getDispatchThrottlingRateInMsg(),
dispatchThrottlingRateInMsg);
DispatchRate dispatchRateAp =
admin.topicPolicies(true).getDispatchRate(tpName, true);
assertEquals(dispatchRateAp.getDispatchThrottlingRateInMsg(),
dispatchThrottlingRateInMsg);
});
// cleanup.
producer.close();
admin.topics().delete(tpName, false);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]