codelipenghui commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r972048793


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java:
##########
@@ -224,4 +225,114 @@ public void testPrintResourceUsage() {
         assertEquals(data.printResourceUsage(),
             "cpu: 10.00%, memory: 50.00%, directMemory: 90.00%, bandwidthIn: 
30.00%, bandwidthOut: 20.00%");
     }
+
+    @Test
+    public void testRangeThroughput() {

Review Comment:
   ```suggestion
       public void testLowerBoundaryShedding() {
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,60 @@ private double updateAvgResourceUsage(String broker, 
LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, 
ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = 
conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, 
canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, 
avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null) {
+            log.info("Load data is null or bundle <=1, skipping bundle 
unload.");
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is 
{}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + 
localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * 
threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
+        double minThroughputThreshold = 
conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+        if (minThroughputThreshold > minimumThroughputToOffload) {
+            log.info("broker {} in lower boundary shedding is planning to shed 
throughput {} MByte/s less than "
+                            + "minimumThroughputThreshold {} MByte/s, skipping 
bundle unload.",
+                    maxUsageBroker, minimumThroughputToOffload / MB, 
minThroughputThreshold / MB);
+            return;
+        }
+        filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(), 
maxUsageBroker, localData,
+                minimumThroughputToOffload);
+    }
+
+    private Pair<Boolean, String> getMaxUsageBroker(
+            LoadData loadData, double threshold, double avgUsage) {
+        String maxUsageBrokerName = "";
+        double maxUsage = avgUsage - threshold;

Review Comment:
   The max usage broker load: > (avgUsage - threshold)
   The below lower broker load: < (avgUsage - threshold)
   
   After the max usage broker unloads to the lower broker, the max usage broker 
might become the lower broker, the lower broker becomes the max usage broker. 
Can this will lead to frequent bundle unloading?
   
   It looks like we need to change 
   
   ```java
   double minimumThroughputToOffload = brokerCurrentThroughput * threshold * 
LOWER_BOUNDARY_THRESHOLD_MARGIN;
   ```
   
   to 
   
   ```java
   double minimumThroughputToOffload = Math.min(brokerCurrentThroughput * 
threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN, brokerCurrentThroughput - avgUsage 
- threshold);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,60 @@ private double updateAvgResourceUsage(String broker, 
LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, 
ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = 
conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, 
canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, 
avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null) {
+            log.info("Load data is null or bundle <=1, skipping bundle 
unload.");
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is 
{}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + 
localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * 
threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
+        double minThroughputThreshold = 
conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+        if (minThroughputThreshold > minimumThroughputToOffload) {
+            log.info("broker {} in lower boundary shedding is planning to shed 
throughput {} MByte/s less than "
+                            + "minimumThroughputThreshold {} MByte/s, skipping 
bundle unload.",
+                    maxUsageBroker, minimumThroughputToOffload / MB, 
minThroughputThreshold / MB);
+            return;
+        }
+        filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(), 
maxUsageBroker, localData,
+                minimumThroughputToOffload);
+    }
+
+    private Pair<Boolean, String> getMaxUsageBroker(
+            LoadData loadData, double threshold, double avgUsage) {
+        String maxUsageBrokerName = "";
+        double maxUsage = avgUsage - threshold;

Review Comment:
   And we should also add a test for this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to