lordcheng10 commented on a change in pull request #12471:
URL: https://github.com/apache/pulsar/pull/12471#discussion_r742702270



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
##########
@@ -74,19 +77,56 @@
             return selectedBundlesCache;
         }
 
+        AtomicBoolean isBalanced = new AtomicBoolean(true);
+        Map<String, BrokerData> belowAvgUsageBrokers = new HashMap<>();
+        Map<String, BrokerData> overAvgUsageBrokers = new HashMap<>();
+        AtomicDouble sumOfAcceptableTrafficFromBelowAvgUsageBrokers = new 
AtomicDouble(0);
+        AtomicDouble totalTrafficOfOverAvgUsageBrokers = new AtomicDouble(0);
+
+        //  1. Divided into two categories:① (currentUsage > avgUsage); 
②(currentUsage < avgUsage);
         loadData.getBrokerData().forEach((broker, brokerData) -> {
+            final LocalBrokerData localData = brokerData.getLocalData();
+            double brokerCurrentThroughput = localData.getMsgThroughputIn() + 
localData.getMsgThroughputOut();
+            final double currentUsage = 
brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+
+            if (currentUsage > avgUsage - threshold && currentUsage < avgUsage 
+ threshold) {
+                isBalanced.set(false);
+                return;
+            }
+            if (currentUsage < avgUsage) {
+                double percentOfTrafficToAccept = avgUsage - currentUsage;
+                double trafficToAccept = percentOfTrafficToAccept * 
brokerCurrentThroughput;
+                //2. Calculate the sum of acceptable traffic in 
belowAvgUsageBrokers
+                
sumOfAcceptableTrafficFromBelowAvgUsageBrokers.addAndGet(trafficToAccept);
+                belowAvgUsageBrokers.put(broker, brokerData);
+            }
+            if (currentUsage > avgUsage) {
+                //3. Calculate the total traffic in overAvgUsageBrokers
+                
totalTrafficOfOverAvgUsageBrokers.addAndGet(brokerCurrentThroughput);
+                overAvgUsageBrokers.put(broker, brokerData);
+            }
+        });
+
+        if (isBalanced.get()) {
+            return selectedBundlesCache;
+        }
+
+        //4. Calculate the percentage of traffic to be migrated by each broker 
in overAvgUsageBrokers;
+        double percentOfTrafficToOffload = ADDITIONAL_THRESHOLD_PERCENT_MARGIN

Review comment:
       >> Why do we calculate this as a generic average for all brokers? 
   
   In the example I mentioned above, the load utilization rate of all brokers 
is lower than the average utilization rate. At this time, according to the 
original calculation method, there is no way to calculate 
percentoftraffictooffload.
   
   With regard to this new change, I need to explain the following points:
   
   1. First, I define the equilibrium state as:
   
   avgUsage - threshold < currentUsage < avgUsage + threshold;
   
   2. The sum of the traffic that can be received by all nodes lower than 
avgusage is the total traffic that we should unload from nodes higher than 
avgusage. Therefore, we only need to calculate the sum of the total traffic of 
all brokers higher than avgusage over brokertrafficsum, Then, 
sumbelowoavgtraffic / overbrokertrafficsum can get the information from each 
broker that exceeds avgusage
   
   @michaeljmarshall 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to