michaeljmarshall commented on a change in pull request #12471:
URL: https://github.com/apache/pulsar/pull/12471#discussion_r738693507



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
##########
@@ -74,19 +77,56 @@
             return selectedBundlesCache;
         }
 
+        AtomicBoolean isBalanced = new AtomicBoolean(true);
+        Map<String, BrokerData> belowAvgUsageBrokers = new HashMap<>();
+        Map<String, BrokerData> overAvgUsageBrokers = new HashMap<>();
+        AtomicDouble sumOfAcceptableTrafficFromBelowAvgUsageBrokers = new 
AtomicDouble(0);
+        AtomicDouble totalTrafficOfOverAvgUsageBrokers = new AtomicDouble(0);
+
+        //  1. Divided into two categories:① (currentUsage > avgUsage); 
②(currentUsage < avgUsage);
         loadData.getBrokerData().forEach((broker, brokerData) -> {
+            final LocalBrokerData localData = brokerData.getLocalData();
+            double brokerCurrentThroughput = localData.getMsgThroughputIn() + 
localData.getMsgThroughputOut();
+            final double currentUsage = 
brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+
+            if (currentUsage > avgUsage - threshold && currentUsage < avgUsage 
+ threshold) {
+                isBalanced.set(false);
+                return;
+            }
+            if (currentUsage < avgUsage) {
+                double percentOfTrafficToAccept = avgUsage - currentUsage;
+                double trafficToAccept = percentOfTrafficToAccept * 
brokerCurrentThroughput;
+                //2. Calculate the sum of acceptable traffic in 
belowAvgUsageBrokers
+                
sumOfAcceptableTrafficFromBelowAvgUsageBrokers.addAndGet(trafficToAccept);
+                belowAvgUsageBrokers.put(broker, brokerData);
+            }
+            if (currentUsage > avgUsage) {
+                //3. Calculate the total traffic in overAvgUsageBrokers
+                
totalTrafficOfOverAvgUsageBrokers.addAndGet(brokerCurrentThroughput);
+                overAvgUsageBrokers.put(broker, brokerData);
+            }
+        });
+
+        if (isBalanced.get()) {
+            return selectedBundlesCache;
+        }
+
+        //4. Calculate the percentage of traffic to be migrated by each broker 
in overAvgUsageBrokers;
+        double percentOfTrafficToOffload = ADDITIONAL_THRESHOLD_PERCENT_MARGIN

Review comment:
       Why do we calculate this as a generic average for all brokers? The 
original calculation of `percentOfTrafficToOffload` is for a specific broker 
because it is used to determine which bundles to unload to decrease that 
broker's load to beneath the average. By using an averaged value from all 
brokers, we could unload too many bundles from some brokers and too few bundles 
from other brokers.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
##########
@@ -74,19 +77,56 @@
             return selectedBundlesCache;
         }
 
+        AtomicBoolean isBalanced = new AtomicBoolean(true);
+        Map<String, BrokerData> belowAvgUsageBrokers = new HashMap<>();
+        Map<String, BrokerData> overAvgUsageBrokers = new HashMap<>();
+        AtomicDouble sumOfAcceptableTrafficFromBelowAvgUsageBrokers = new 
AtomicDouble(0);
+        AtomicDouble totalTrafficOfOverAvgUsageBrokers = new AtomicDouble(0);
+
+        //  1. Divided into two categories:① (currentUsage > avgUsage); 
②(currentUsage < avgUsage);
         loadData.getBrokerData().forEach((broker, brokerData) -> {
+            final LocalBrokerData localData = brokerData.getLocalData();
+            double brokerCurrentThroughput = localData.getMsgThroughputIn() + 
localData.getMsgThroughputOut();
+            final double currentUsage = 
brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+
+            if (currentUsage > avgUsage - threshold && currentUsage < avgUsage 
+ threshold) {
+                isBalanced.set(false);
+                return;
+            }
+            if (currentUsage < avgUsage) {
+                double percentOfTrafficToAccept = avgUsage - currentUsage;
+                double trafficToAccept = percentOfTrafficToAccept * 
brokerCurrentThroughput;
+                //2. Calculate the sum of acceptable traffic in 
belowAvgUsageBrokers
+                
sumOfAcceptableTrafficFromBelowAvgUsageBrokers.addAndGet(trafficToAccept);
+                belowAvgUsageBrokers.put(broker, brokerData);
+            }
+            if (currentUsage > avgUsage) {
+                //3. Calculate the total traffic in overAvgUsageBrokers
+                
totalTrafficOfOverAvgUsageBrokers.addAndGet(brokerCurrentThroughput);
+                overAvgUsageBrokers.put(broker, brokerData);
+            }
+        });
+
+        if (isBalanced.get()) {
+            return selectedBundlesCache;
+        }
+
+        //4. Calculate the percentage of traffic to be migrated by each broker 
in overAvgUsageBrokers;
+        double percentOfTrafficToOffload = ADDITIONAL_THRESHOLD_PERCENT_MARGIN
+                + sumOfAcceptableTrafficFromBelowAvgUsageBrokers.get() / 
totalTrafficOfOverAvgUsageBrokers.get();
+
+        //5. Select the bundle to unload
+        overAvgUsageBrokers.forEach((broker, brokerData) -> {
             final LocalBrokerData localData = brokerData.getLocalData();
             final double currentUsage = 
brokerAvgResourceUsage.getOrDefault(broker, 0.0);
 
-            if (currentUsage < avgUsage + threshold) {
+            if (currentUsage > avgUsage - threshold && currentUsage < avgUsage 
+ threshold) {

Review comment:
       Based on the new changes (the brokers being iterated over are all over 
average), there is no longer a need to update the logic for this conditional.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to