This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new e90f0925bb1 [feat][broker] Support lower boundary shedding for 
ThresholdShedder (#17456)
e90f0925bb1 is described below

commit e90f0925bb182a777e1a308d1d0781813024d479
Author: feynmanlin <[email protected]>
AuthorDate: Fri Sep 23 09:19:12 2022 +0800

    [feat][broker] Support lower boundary shedding for ThresholdShedder (#17456)
    
     Support lower boundary shedding for ThresholdShedder (#17456)
    
    The existing ThresholdShedder has the following problems, for example:
    There are 11 Brokers, of which 10 are loaded at 80% and 1 is loaded at 0%.
    The average load is 80 * 10 / 11 = 72.73, and the threshold to unload is 
72.73 + 10 = 82.73.
    Since 80 < 82.73, unload will not be trigger, and there is one idle Broker 
with load of 0%.
    
    On the basis of ThresholdShedder, we adds the lower boundary judgment of 
the load.
    When 【current usage < average usage - threshold】, the broker with the 
highest load will be triggered to unload
---
 .../apache/pulsar/broker/ServiceConfiguration.java |   7 ++
 .../broker/loadbalance/impl/ThresholdShedder.java  | 113 ++++++++++++++++-----
 .../loadbalance/impl/ThresholdShedderTest.java     | 111 ++++++++++++++++++++
 3 files changed, 207 insertions(+), 24 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 06e193a4131..5942ee9a8e9 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2074,6 +2074,13 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private String loadBalancerLoadSheddingStrategy = 
"org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder";
 
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "When [current usage < average usage - threshold], "
+                    + "the broker with the highest load will be triggered to 
unload"
+    )
+    private boolean lowerBoundarySheddingEnabled = false;
+
     @FieldContext(
             category = CATEGORY_LOAD_BALANCER,
             doc = "load balance placement strategy"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
index 05c69062663..928f045369c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
@@ -52,6 +52,9 @@ public class ThresholdShedder implements LoadSheddingStrategy 
{
     private static final Logger log = 
LoggerFactory.getLogger(ThresholdShedder.class);
     private final Multimap<String, String> selectedBundlesCache = 
ArrayListMultimap.create();
     private static final double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05;
+
+    private static final double LOWER_BOUNDARY_THRESHOLD_MARGIN = 0.5;
+
     private static final double MB = 1024 * 1024;
 
     private static final long LOAD_LOG_SAMPLE_DELAY_IN_SEC = 5 * 60; // 5 mins
@@ -80,8 +83,7 @@ public class ThresholdShedder implements LoadSheddingStrategy 
{
         final Map<String, Long> recentlyUnloadedBundles = 
loadData.getRecentlyUnloadedBundles();
         final double minThroughputThreshold = 
conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
 
-        final double avgUsage = getBrokerAvgUsage(
-                loadData, conf.getLoadBalancerHistoryResourcePercentage(), 
conf, sampleLog);
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, sampleLog);
         if (sampleLog) {
             log.info("brokers' resource avgUsage:{}%", toPercentage(avgUsage));
         }
@@ -122,17 +124,34 @@ public class ThresholdShedder implements 
LoadSheddingStrategy {
                     broker, 100 * currentUsage, 100 * avgUsage, 100 * 
threshold, minimumThroughputToOffload / MB,
                     (brokerCurrentThroughput - minimumThroughputToOffload) / 
MB);
 
-            MutableDouble trafficMarkedToOffload = new MutableDouble(0);
-            MutableBoolean atLeastOneBundleSelected = new 
MutableBoolean(false);
-
             if (localData.getBundles().size() > 1) {
-                loadData.getBundleDataForLoadShedding().entrySet().stream()
-                    .map((e) -> {
-                        String bundle = e.getKey();
-                        BundleData bundleData = e.getValue();
-                        TimeAverageMessageData shortTermData = 
bundleData.getShortTermData();
-                        double throughput = shortTermData.getMsgThroughputIn() 
+ shortTermData.getMsgThroughputOut();
-                        return Pair.of(bundle, throughput);
+                filterAndSelectBundle(loadData, recentlyUnloadedBundles, 
broker, localData, minimumThroughputToOffload);
+            } else if (localData.getBundles().size() == 1) {
+                log.warn(
+                        "HIGH USAGE WARNING : Sole namespace bundle {} is 
overloading broker {}. "
+                                + "No Load Shedding will be done on this 
broker",
+                        localData.getBundles().iterator().next(), broker);
+            } else {
+                log.warn("Broker {} is overloaded despite having no bundles", 
broker);
+            }
+        });
+        if (selectedBundlesCache.isEmpty() && 
conf.isLowerBoundarySheddingEnabled()) {
+            tryLowerBoundaryShedding(loadData, conf);
+        }
+        return selectedBundlesCache;
+    }
+
+    private void filterAndSelectBundle(LoadData loadData, Map<String, Long> 
recentlyUnloadedBundles, String broker,
+                                       LocalBrokerData localData, double 
minimumThroughputToOffload) {
+        MutableDouble trafficMarkedToOffload = new MutableDouble(0);
+        MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);
+        loadData.getBundleDataForLoadShedding().entrySet().stream()
+                .map((e) -> {
+                    String bundle = e.getKey();
+                    BundleData bundleData = e.getValue();
+                    TimeAverageMessageData shortTermData = 
bundleData.getShortTermData();
+                    double throughput = shortTermData.getMsgThroughputIn() + 
shortTermData.getMsgThroughputOut();
+                    return Pair.of(bundle, throughput);
                 }).filter(e ->
                         !recentlyUnloadedBundles.containsKey(e.getLeft())
                 ).filter(e ->
@@ -147,21 +166,11 @@ public class ThresholdShedder implements 
LoadSheddingStrategy {
                         atLeastOneBundleSelected.setTrue();
                     }
                 });
-            } else if (localData.getBundles().size() == 1) {
-                log.warn(
-                        "HIGH USAGE WARNING : Sole namespace bundle {} is 
overloading broker {}. "
-                                + "No Load Shedding will be done on this 
broker",
-                        localData.getBundles().iterator().next(), broker);
-            } else {
-                log.warn("Broker {} is overloaded despite having no bundles", 
broker);
-            }
-        });
-
-        return selectedBundlesCache;
     }
 
-    private double getBrokerAvgUsage(final LoadData loadData, final double 
historyPercentage,
+    private double getBrokerAvgUsage(final LoadData loadData,
                                      final ServiceConfiguration conf, boolean 
sampleLog) {
+        double historyPercentage = 
conf.getLoadBalancerHistoryResourcePercentage();
         double totalUsage = 0.0;
         int totalBrokers = 0;
 
@@ -227,4 +236,60 @@ public class ThresholdShedder implements 
LoadSheddingStrategy {
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, 
ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = 
conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, 
canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, 
avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null) {
+            log.info("Load data is null or bundle <=1, skipping bundle 
unload.");
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is 
{}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + 
localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * 
threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
+        double minThroughputThreshold = 
conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+        if (minThroughputThreshold > minimumThroughputToOffload) {
+            log.info("broker {} in lower boundary shedding is planning to shed 
throughput {} MByte/s less than "
+                            + "minimumThroughputThreshold {} MByte/s, skipping 
bundle unload.",
+                    maxUsageBroker, minimumThroughputToOffload / MB, 
minThroughputThreshold / MB);
+            return;
+        }
+        filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(), 
maxUsageBroker, localData,
+                minimumThroughputToOffload);
+    }
+
+    private Pair<Boolean, String> getMaxUsageBroker(
+            LoadData loadData, double threshold, double avgUsage) {
+        String maxUsageBrokerName = "";
+        double maxUsage = avgUsage - threshold;
+        boolean hasBrokerBelowLowerBound = false;
+        for (Map.Entry<String, BrokerData> entry : 
loadData.getBrokerData().entrySet()) {
+            String broker = entry.getKey();
+            BrokerData brokerData = entry.getValue();
+            double currentUsage = brokerAvgResourceUsage.getOrDefault(broker, 
0.0);
+            // Select the broker with the most resource usage.
+            if (currentUsage > maxUsage && brokerData.getLocalData() != null
+                    && brokerData.getLocalData().getBundles().size() > 1) {
+                maxUsage = currentUsage;
+                maxUsageBrokerName = broker;
+            }
+            // Whether any brokers with low usage in the cluster.
+            if (currentUsage < avgUsage - threshold) {
+                hasBrokerBelowLowerBound = true;
+            }
+        }
+        return Pair.of(hasBrokerBelowLowerBound, maxUsageBrokerName);
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java
index 80760a7afc3..8461f8ce74c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java
@@ -47,6 +47,7 @@ public class ThresholdShedderTest {
 
     @BeforeMethod
     public void setup() {
+        conf.setLowerBoundarySheddingEnabled(false);
         thresholdShedder = new ThresholdShedder();
     }
 
@@ -224,4 +225,114 @@ public class ThresholdShedderTest {
         assertEquals(data.printResourceUsage(),
             "cpu: 10.00%, memory: 50.00%, directMemory: 90.00%, bandwidthIn: 
30.00%, bandwidthOut: 20.00%");
     }
+
+    @Test
+    public void testLowerBoundaryShedding() {
+        int numBundles = 10;
+        int brokerNum = 11;
+        int lowLoadNode = 10;
+        LoadData loadData = new LoadData();
+        double throughput = 100 * 1024 * 1024;
+        //There are 11 Brokers, of which 10 are loaded at 80% and 1 is loaded 
at 0%.
+        //At this time, the average load is 80*10/11 = 72.73, and the 
threshold for rebalancing is 72.73 + 10 = 82.73.
+        //Since 80 < 82.73, rebalancing will not be trigger, and there is one 
Broker with load of 0.
+        for (int i = 0; i < brokerNum; i++) {
+            LocalBrokerData broker = new LocalBrokerData();
+            for (int j = 0; j < numBundles; j++) {
+                broker.getBundles().add("bundle-" + j);
+                BundleData bundle = new BundleData();
+                TimeAverageMessageData timeAverageMessageData = new 
TimeAverageMessageData();
+                timeAverageMessageData.setMsgThroughputIn(i == lowLoadNode ? 0 
: throughput);
+                timeAverageMessageData.setMsgThroughputOut(i == lowLoadNode ? 
0 : throughput);
+                bundle.setShortTermData(timeAverageMessageData);
+                String broker2BundleName = "broker-" + i + "-bundle-" + j;
+                loadData.getBundleData().put(broker2BundleName, bundle);
+                broker.getBundles().add(broker2BundleName);
+            }
+            broker.setBandwidthIn(new ResourceUsage(i == lowLoadNode ? 0 : 80, 
100));
+            broker.setBandwidthOut(new ResourceUsage(i == lowLoadNode ? 0 : 
80, 100));
+            broker.setMsgThroughputIn(i == lowLoadNode ? 0 : throughput);
+            broker.setMsgThroughputOut(i == lowLoadNode ? 0 : throughput);
+            loadData.getBrokerData().put("broker-" + i, new 
BrokerData(broker));
+        }
+        ThresholdShedder shedder = new ThresholdShedder();
+        Multimap<String, String> bundlesToUnload = 
shedder.findBundlesForUnloading(loadData, conf);
+        assertTrue(bundlesToUnload.isEmpty());
+        conf.setLowerBoundarySheddingEnabled(true);
+        bundlesToUnload = thresholdShedder.findBundlesForUnloading(loadData, 
conf);
+        assertFalse(bundlesToUnload.isEmpty());
+    }
+
+    @Test
+    public void testLowerBoundarySheddingNoBrokerToOffload() {
+        int numBundles = 10;
+        int brokerNum = 11;
+        LoadData loadData = new LoadData();
+        double throughput = 80 * 1024 * 1024;
+        //Load of all Brokers are 80%, and no Broker needs to offload.
+        for (int i = 0; i < brokerNum; i++) {
+            LocalBrokerData broker = new LocalBrokerData();
+            for (int j = 0; j < numBundles; j++) {
+                broker.getBundles().add("bundle-" + j);
+                BundleData bundle = new BundleData();
+                TimeAverageMessageData timeAverageMessageData = new 
TimeAverageMessageData();
+                timeAverageMessageData.setMsgThroughputIn(throughput);
+                timeAverageMessageData.setMsgThroughputOut(throughput);
+                bundle.setShortTermData(timeAverageMessageData);
+                String broker2BundleName = "broker-" + i + "-bundle-" + j;
+                loadData.getBundleData().put(broker2BundleName, bundle);
+                broker.getBundles().add(broker2BundleName);
+            }
+            broker.setBandwidthIn(new ResourceUsage(80, 100));
+            broker.setBandwidthOut(new ResourceUsage(80, 100));
+            broker.setMsgThroughputIn(throughput);
+            broker.setMsgThroughputOut(throughput);
+            loadData.getBrokerData().put("broker-" + i, new 
BrokerData(broker));
+        }
+        ThresholdShedder shedder = new ThresholdShedder();
+        Multimap<String, String> bundlesToUnload = 
shedder.findBundlesForUnloading(loadData, conf);
+        assertTrue(bundlesToUnload.isEmpty());
+        conf.setLowerBoundarySheddingEnabled(true);
+        bundlesToUnload = thresholdShedder.findBundlesForUnloading(loadData, 
conf);
+        assertTrue(bundlesToUnload.isEmpty());
+    }
+
+    @Test
+    public void testLowerBoundarySheddingBrokerWithOneBundle() {
+        int brokerNum = 11;
+        int lowLoadNode = 5;
+        int brokerWithManyBundles = 3;
+        LoadData loadData = new LoadData();
+        double throughput = 100 * 1024 * 1024;
+        //There are 11 Brokers, of which 10 are loaded at 80% and 1 is loaded 
at 0%.
+        //Only broker3 has 10 bundles.
+        for (int i = 0; i < brokerNum; i++) {
+            LocalBrokerData broker = new LocalBrokerData();
+            //Broker3 has 10 bundles
+            int numBundles = i == brokerWithManyBundles ? 10 : 1;
+            for (int j = 0; j < numBundles; j++) {
+                BundleData bundle = new BundleData();
+                TimeAverageMessageData timeAverageMessageData = new 
TimeAverageMessageData();
+                timeAverageMessageData.setMsgThroughputIn(i == lowLoadNode ? 0 
: throughput);
+                timeAverageMessageData.setMsgThroughputOut(i == lowLoadNode ? 
0 : throughput);
+                bundle.setShortTermData(timeAverageMessageData);
+                String broker2BundleName = "broker-" + i + "-bundle-" + j;
+                loadData.getBundleData().put(broker2BundleName, bundle);
+                broker.getBundles().add(broker2BundleName);
+            }
+            broker.setBandwidthIn(new ResourceUsage(i == lowLoadNode ? 0 : 80, 
100));
+            broker.setBandwidthOut(new ResourceUsage(i == lowLoadNode ? 0 : 
80, 100));
+            broker.setMsgThroughputIn(i == lowLoadNode ? 0 : throughput);
+            broker.setMsgThroughputOut(i == lowLoadNode ? 0 : throughput);
+            loadData.getBrokerData().put("broker-" + i, new 
BrokerData(broker));
+        }
+        ThresholdShedder shedder = new ThresholdShedder();
+        Multimap<String, String> bundlesToUnload = 
shedder.findBundlesForUnloading(loadData, conf);
+        assertTrue(bundlesToUnload.isEmpty());
+        conf.setLowerBoundarySheddingEnabled(true);
+        bundlesToUnload = thresholdShedder.findBundlesForUnloading(loadData, 
conf);
+        assertFalse(bundlesToUnload.isEmpty());
+        assertEquals(bundlesToUnload.size(), 1);
+        assertTrue(bundlesToUnload.containsKey("broker-3"));
+    }
 }

Reply via email to