AnonHxy commented on a change in pull request #14038:
URL: https://github.com/apache/pulsar/pull/14038#discussion_r797530717



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -318,6 +318,8 @@ public PersistentTopic(String topic, ManagedLedger ledger, 
BrokerService brokerS
 
                     this.updateTopicPolicyByNamespacePolicy(policies);
 
+                    
this.dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);

Review comment:
       Apply the namespace dispatchRate policy here when persist topic 
initialize

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
##########
@@ -169,6 +169,25 @@ private DispatchRate createDispatchRate() {
      * broker-level
      */
     public void updateDispatchRate() {
+        final ServiceConfiguration serviceConfiguration = 
brokerService.pulsar().getConfiguration();
+        if (serviceConfiguration.isSystemTopicEnabled() && 
serviceConfiguration.isTopicLevelPoliciesEnabled()) {
+            DispatchRate dispatchRateValue = null;
+            switch (type) {
+                case TOPIC:
+                    dispatchRateValue = topic.getDispatchRate();
+            }
+            if (type == Type.BROKER) {

Review comment:
       This will simplify the `BROKER` type  to update dispatch rate.  Maybe I 
should put this code outside of the `if` block

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -224,6 +230,25 @@ protected void updateTopicPolicyByNamespacePolicy(Policies 
namespacePolicies) {
                 type -> this.topicPolicies.getBackLogQuotaMap().get(type)
                         
.updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map, 
type)));
         updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
+        updateNamespaceDispatchRate(namespacePolicies, 
brokerService.getPulsar().getConfig().getClusterName());
+    }
+
+    private void updateNamespaceDispatchRate(Policies namespacePolicies, 
String cluster) {
+        DispatchRateImpl dispatchRate = 
namespacePolicies.topicDispatchRate.get(cluster);
+        if (dispatchRate == null) {
+            dispatchRate = namespacePolicies.clusterDispatchRate.get(cluster);
+        }
+        
topicPolicies.getDispatchRate().updateNamespaceValue(normalize(dispatchRate));
+    }
+
+    private DispatchRateImpl normalize(DispatchRateImpl dispatchRate) {
+        if (dispatchRate != null
+            && (dispatchRate.getDispatchThrottlingRateInMsg() > 0
+            || dispatchRate.getDispatchThrottlingRateInByte() > 0)) {
+            return dispatchRate;

Review comment:
       
`org.apache.pulsar.broker.service.persistent.DispatchRateLimiter#isDispatchRateEnabled`
 illustrate that users can set `dispatchThrottlingRateInMsg` and  
`dispatchThrottlingRateInByte` as negative value to disable the rateLimiter. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to