315157973 commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r971005712


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,60 @@ private double updateAvgResourceUsage(String broker, 
LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, 
ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = 
conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, 
canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, 
avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null) {
+            log.info("Load data is null or bundle <=1, skipping bundle 
unload.");
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is 
{}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + 
localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * 
threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
+        double minThroughputThreshold = 
conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+        if (minThroughputThreshold > minimumThroughputToOffload) {
+            log.info("broker {} in lower boundary shedding is planning to shed 
throughput {} MByte/s less than "
+                            + "minimumThroughputThreshold {} MByte/s, skipping 
bundle unload.",
+                    maxUsageBroker, minimumThroughputToOffload / MB, 
minThroughputThreshold / MB);
+            return;
+        }
+        filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(), 
maxUsageBroker, localData,
+                minimumThroughputToOffload);
+    }
+
+    private Pair<Boolean, String> getMaxUsageBroker(
+            LoadData loadData, double threshold, double avgUsage) {
+        String maxUsageBrokerName = "";
+        double maxUsage = -1;
+        boolean hasBrokerBelowLowerBound = false;
+        for (Map.Entry<String, BrokerData> entry : 
loadData.getBrokerData().entrySet()) {
+            String broker = entry.getKey();
+            BrokerData brokerData = entry.getValue();
+            double currentUsage = brokerAvgResourceUsage.getOrDefault(broker, 
0.0);
+            // Select the broker with the most resource usage.
+            if (currentUsage > maxUsage && brokerData.getLocalData() != null
+                    && brokerData.getLocalData().getBundles().size() > 1) {
+                maxUsage = currentUsage;
+                maxUsageBrokerName = broker;
+            }
+            // Whether any brokers with low usage in the cluster.
+            if (currentUsage < avgUsage - threshold) {
+                hasBrokerBelowLowerBound = true;

Review Comment:
   The initial value of `maxUsage` has been set to `avgUsage - threshold`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to