merlimat commented on a change in pull request #634: PIP-3 : Introduce 
message-dispatch rate limiting
URL: https://github.com/apache/incubator-pulsar/pull/634#discussion_r131996441
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 ##########
 @@ -1016,9 +1018,47 @@ private void updateConfigurationAndRegisterListeners() {
                 log.warn("Failed to change load manager due to {}", ex);
             }
         });
+        // add listener to update message-dispatch-rate in msg
+        registerConfigurationListener("dispatchRatePerTopicInMsg", 
(dispatchRatePerTopicInMsg) -> {
+            if ((int) dispatchRatePerTopicInMsg <= 0 && 
pulsar.getConfiguration().getDispatchRatePerTopicInByte() > 0) {
+                // if msg-dispatch-rate is disabled then fall back to 
byte-dispatch-rate
+                updateTopicMessageRate(new 
DispatchRate(pulsar.getConfiguration().getDispatchRatePerTopicInByte(),
+                        DispatchRateType.byteRate));
+            } else {
+                updateTopicMessageRate(new DispatchRate((int) 
dispatchRatePerTopicInMsg, DispatchRateType.messageRate));
+            }
+        });
+        // add listener to update message-dispatch-rate in byte
+        registerConfigurationListener("dispatchRatePerTopicInByte", 
(dispatchRatePerTopicInByte) -> {
+            // always give first priority to msg-dispatch-rate. don't set 
byte-dispatch-rate if msg-dispatch-rate is
+            // already configured
+            if (pulsar.getConfiguration().getDispatchRatePerTopicInMsg() > 0) {
+                return;
+            }
+            updateTopicMessageRate(new DispatchRate((long) 
dispatchRatePerTopicInByte, DispatchRateType.byteRate));
+        });
         // add more listeners here
     }
 
+    private void updateTopicMessageRate(final DispatchRate dispatchRate) {
+        this.pulsar().getExecutor().submit(() -> {
+            // update message-rate for each topic
+            multiLayerTopicsMap.forEach((namespace, bundle) -> {
+                bundle.forEach((name, topics) -> {
+                    topics.forEach((topicName, topic) -> {
+                        if (topic instanceof PersistentTopic) {
 
 Review comment:
   Can't we just iterate on the flat topics map?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to