This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 74eae1a729c Optimized namespace-level dispathcRateLimiter update
(#15315)
74eae1a729c is described below
commit 74eae1a729c33c9fead9d54594bdee5fac8ab153
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Tue Apr 26 11:58:06 2022 +0800
Optimized namespace-level dispathcRateLimiter update (#15315)
---
.../pulsar/broker/service/persistent/PersistentTopic.java | 11 ++---------
1 file changed, 2 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index eed8ba0d3b8..bfe0b644109 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2398,9 +2398,6 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
schemaValidationEnforced = data.schema_validation_enforced;
- //If the topic-level policy already exists, the namespace-level policy
cannot override the topic-level policy.
- Optional<TopicPolicies> topicPolicies = getTopicPolicies();
-
initializeRateLimiterIfNeeded();
updatePublishDispatcher();
@@ -2418,12 +2415,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
sub.getConsumers().forEach(consumer ->
consumerCheckFutures.add(consumer.checkPermissionsAsync()));
subscriptionCheckFutures.add(FutureUtil.waitForAll(consumerCheckFutures).thenRun(()
-> {
Dispatcher dispatcher = sub.getDispatcher();
- // If the topic-level policy already exists, the
namespace-level policy cannot override
- // the topic-level policy.
- if (dispatcher != null && (!topicPolicies.isPresent() ||
!topicPolicies.get()
- .isSubscriptionDispatchRateSet())) {
- dispatcher.getRateLimiter()
- .ifPresent(rateLimiter ->
rateLimiter.onPoliciesUpdate(data));
+ if (dispatcher != null) {
+
dispatcher.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate);
}
}));
});