michaeljmarshall commented on a change in pull request #12471:
URL: https://github.com/apache/pulsar/pull/12471#discussion_r738693507
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
##########
@@ -74,19 +77,56 @@
return selectedBundlesCache;
}
+ AtomicBoolean isBalanced = new AtomicBoolean(true);
+ Map<String, BrokerData> belowAvgUsageBrokers = new HashMap<>();
+ Map<String, BrokerData> overAvgUsageBrokers = new HashMap<>();
+ AtomicDouble sumOfAcceptableTrafficFromBelowAvgUsageBrokers = new
AtomicDouble(0);
+ AtomicDouble totalTrafficOfOverAvgUsageBrokers = new AtomicDouble(0);
+
+ // 1. Divided into two categories:① (currentUsage > avgUsage);
②(currentUsage < avgUsage);
loadData.getBrokerData().forEach((broker, brokerData) -> {
+ final LocalBrokerData localData = brokerData.getLocalData();
+ double brokerCurrentThroughput = localData.getMsgThroughputIn() +
localData.getMsgThroughputOut();
+ final double currentUsage =
brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+
+ if (currentUsage > avgUsage - threshold && currentUsage < avgUsage
+ threshold) {
+ isBalanced.set(false);
+ return;
+ }
+ if (currentUsage < avgUsage) {
+ double percentOfTrafficToAccept = avgUsage - currentUsage;
+ double trafficToAccept = percentOfTrafficToAccept *
brokerCurrentThroughput;
+ //2. Calculate the sum of acceptable traffic in
belowAvgUsageBrokers
+
sumOfAcceptableTrafficFromBelowAvgUsageBrokers.addAndGet(trafficToAccept);
+ belowAvgUsageBrokers.put(broker, brokerData);
+ }
+ if (currentUsage > avgUsage) {
+ //3. Calculate the total traffic in overAvgUsageBrokers
+
totalTrafficOfOverAvgUsageBrokers.addAndGet(brokerCurrentThroughput);
+ overAvgUsageBrokers.put(broker, brokerData);
+ }
+ });
+
+ if (isBalanced.get()) {
+ return selectedBundlesCache;
+ }
+
+ //4. Calculate the percentage of traffic to be migrated by each broker
in overAvgUsageBrokers;
+ double percentOfTrafficToOffload = ADDITIONAL_THRESHOLD_PERCENT_MARGIN
Review comment:
Why do we calculate this as a generic average for all brokers? The
original calculation of `percentOfTrafficToOffload` is for a specific broker
because it is used to determine which bundles to unload to decrease that
broker's load to beneath the average. By using an averaged value from all
brokers, we could unload too many bundles from some brokers and too few bundles
from other brokers.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
##########
@@ -74,19 +77,56 @@
return selectedBundlesCache;
}
+ AtomicBoolean isBalanced = new AtomicBoolean(true);
+ Map<String, BrokerData> belowAvgUsageBrokers = new HashMap<>();
+ Map<String, BrokerData> overAvgUsageBrokers = new HashMap<>();
+ AtomicDouble sumOfAcceptableTrafficFromBelowAvgUsageBrokers = new
AtomicDouble(0);
+ AtomicDouble totalTrafficOfOverAvgUsageBrokers = new AtomicDouble(0);
+
+ // 1. Divided into two categories:① (currentUsage > avgUsage);
②(currentUsage < avgUsage);
loadData.getBrokerData().forEach((broker, brokerData) -> {
+ final LocalBrokerData localData = brokerData.getLocalData();
+ double brokerCurrentThroughput = localData.getMsgThroughputIn() +
localData.getMsgThroughputOut();
+ final double currentUsage =
brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+
+ if (currentUsage > avgUsage - threshold && currentUsage < avgUsage
+ threshold) {
+ isBalanced.set(false);
+ return;
+ }
+ if (currentUsage < avgUsage) {
+ double percentOfTrafficToAccept = avgUsage - currentUsage;
+ double trafficToAccept = percentOfTrafficToAccept *
brokerCurrentThroughput;
+ //2. Calculate the sum of acceptable traffic in
belowAvgUsageBrokers
+
sumOfAcceptableTrafficFromBelowAvgUsageBrokers.addAndGet(trafficToAccept);
+ belowAvgUsageBrokers.put(broker, brokerData);
+ }
+ if (currentUsage > avgUsage) {
+ //3. Calculate the total traffic in overAvgUsageBrokers
+
totalTrafficOfOverAvgUsageBrokers.addAndGet(brokerCurrentThroughput);
+ overAvgUsageBrokers.put(broker, brokerData);
+ }
+ });
+
+ if (isBalanced.get()) {
+ return selectedBundlesCache;
+ }
+
+ //4. Calculate the percentage of traffic to be migrated by each broker
in overAvgUsageBrokers;
+ double percentOfTrafficToOffload = ADDITIONAL_THRESHOLD_PERCENT_MARGIN
+ + sumOfAcceptableTrafficFromBelowAvgUsageBrokers.get() /
totalTrafficOfOverAvgUsageBrokers.get();
+
+ //5. Select the bundle to unload
+ overAvgUsageBrokers.forEach((broker, brokerData) -> {
final LocalBrokerData localData = brokerData.getLocalData();
final double currentUsage =
brokerAvgResourceUsage.getOrDefault(broker, 0.0);
- if (currentUsage < avgUsage + threshold) {
+ if (currentUsage > avgUsage - threshold && currentUsage < avgUsage
+ threshold) {
Review comment:
Based on the new changes (the brokers being iterated over are all over
average), there is no longer a need to update the logic for this conditional.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]