lordcheng10 commented on a change in pull request #12471:
URL: https://github.com/apache/pulsar/pull/12471#discussion_r742702270



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
##########
@@ -74,19 +77,56 @@
             return selectedBundlesCache;
         }
 
+        AtomicBoolean isBalanced = new AtomicBoolean(true);
+        Map<String, BrokerData> belowAvgUsageBrokers = new HashMap<>();
+        Map<String, BrokerData> overAvgUsageBrokers = new HashMap<>();
+        AtomicDouble sumOfAcceptableTrafficFromBelowAvgUsageBrokers = new 
AtomicDouble(0);
+        AtomicDouble totalTrafficOfOverAvgUsageBrokers = new AtomicDouble(0);
+
+        //  1. Divided into two categories:① (currentUsage > avgUsage); 
②(currentUsage < avgUsage);
         loadData.getBrokerData().forEach((broker, brokerData) -> {
+            final LocalBrokerData localData = brokerData.getLocalData();
+            double brokerCurrentThroughput = localData.getMsgThroughputIn() + 
localData.getMsgThroughputOut();
+            final double currentUsage = 
brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+
+            if (currentUsage > avgUsage - threshold && currentUsage < avgUsage 
+ threshold) {
+                isBalanced.set(false);
+                return;
+            }
+            if (currentUsage < avgUsage) {
+                double percentOfTrafficToAccept = avgUsage - currentUsage;
+                double trafficToAccept = percentOfTrafficToAccept * 
brokerCurrentThroughput;
+                //2. Calculate the sum of acceptable traffic in 
belowAvgUsageBrokers
+                
sumOfAcceptableTrafficFromBelowAvgUsageBrokers.addAndGet(trafficToAccept);
+                belowAvgUsageBrokers.put(broker, brokerData);
+            }
+            if (currentUsage > avgUsage) {
+                //3. Calculate the total traffic in overAvgUsageBrokers
+                
totalTrafficOfOverAvgUsageBrokers.addAndGet(brokerCurrentThroughput);
+                overAvgUsageBrokers.put(broker, brokerData);
+            }
+        });
+
+        if (isBalanced.get()) {
+            return selectedBundlesCache;
+        }
+
+        //4. Calculate the percentage of traffic to be migrated by each broker 
in overAvgUsageBrokers;
+        double percentOfTrafficToOffload = ADDITIONAL_THRESHOLD_PERCENT_MARGIN

Review comment:
       >> Why do we calculate this as a generic average for all brokers? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to