This is an automated email from the ASF dual-hosted git repository.

thetumbled pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new d58461d7d56 [improve][broker][branch-3.0] PIP-364: Introduce a new 
load balance algorithm AvgShedder (#23053)
d58461d7d56 is described below

commit d58461d7d562610d648fdf7e5629484e3693a84c
Author: Wenzhi Feng <[email protected]>
AuthorDate: Fri Jul 19 15:52:38 2024 +0800

    [improve][broker][branch-3.0] PIP-364: Introduce a new load balance 
algorithm AvgShedder (#23053)
---
 conf/broker.conf                                   |  19 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  36 ++-
 .../pulsar/broker/loadbalance/impl/AvgShedder.java | 318 +++++++++++++++++++++
 .../loadbalance/impl/ModularLoadManagerImpl.java   |  16 +-
 .../ModularLoadManagerStrategyTest.java            |  43 +++
 .../broker/loadbalance/impl/AvgShedderTest.java    | 283 ++++++++++++++++++
 6 files changed, 711 insertions(+), 4 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 2a9641b5b90..f07394a84df 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1418,6 +1418,25 @@ loadBalancerBundleUnloadMinThroughputThreshold=10
 # Time to wait for the unloading of a namespace bundle
 namespaceBundleUnloadingTimeoutMs=60000
 
+# configuration for AvgShedder, a new shedding and placement strategy
+# The low threshold for the difference between the highest and lowest loaded 
brokers.
+loadBalancerAvgShedderLowThreshold = 15
+
+# The high threshold for the difference between the highest and lowest loaded 
brokers.
+loadBalancerAvgShedderHighThreshold = 40
+
+# The number of times the low threshold is triggered before the bundle is 
unloaded.
+loadBalancerAvgShedderHitCountLowThreshold = 8
+
+# The number of times the high threshold is triggered before the bundle is 
unloaded.
+loadBalancerAvgShedderHitCountHighThreshold = 2
+
+# In the UniformLoadShedder and AvgShedder strategy, the maximum unload ratio.
+# For AvgShedder, recommend to set to 0.5, so that it will distribute the load 
evenly
+#  between the highest and lowest brokers.
+maxUnloadPercentage = 0.2
+
+
 ### --- Load balancer extension --- ###
 
 # Option to enable the debug mode for the load balancer logics.
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 6d4635681ed..b3bc0f7e414 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2395,21 +2395,51 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     @FieldContext(
             dynamic = true,
             category = CATEGORY_LOAD_BALANCER,
-            doc = "In the UniformLoadShedder strategy, the minimum message 
that triggers unload."
+            doc = "The low threshold for the difference between the highest 
and lowest loaded brokers."
+    )
+    private int loadBalancerAvgShedderLowThreshold = 15;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "The high threshold for the difference between the highest 
and lowest loaded brokers."
+    )
+    private int loadBalancerAvgShedderHighThreshold = 40;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "The number of times the low threshold is triggered before 
the bundle is unloaded."
+    )
+    private int loadBalancerAvgShedderHitCountLowThreshold = 8;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "The number of times the high threshold is triggered before 
the bundle is unloaded."
+    )
+    private int loadBalancerAvgShedderHitCountHighThreshold = 2;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "In the UniformLoadShedder and AvgShedder strategy, the 
minimum message that triggers unload."
     )
     private int minUnloadMessage = 1000;
 
     @FieldContext(
             dynamic = true,
             category = CATEGORY_LOAD_BALANCER,
-            doc = "In the UniformLoadShedder strategy, the minimum throughput 
that triggers unload."
+            doc = "In the UniformLoadShedder and AvgShedder strategy, the 
minimum throughput that triggers unload."
     )
     private int minUnloadMessageThroughput = 1 * 1024 * 1024;
 
     @FieldContext(
             dynamic = true,
             category = CATEGORY_LOAD_BALANCER,
-            doc = "In the UniformLoadShedder strategy, the maximum unload 
ratio."
+            doc = "In the UniformLoadShedder and AvgShedder strategy, the 
maximum unload ratio."
+                    + "For AvgShedder, recommend to set to 0.5, so that it 
will distribute the load "
+                    + "evenly between the highest and lowest brokers."
     )
     private double maxUnloadPercentage = 0.2;
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java
new file mode 100644
index 00000000000..e33ff097164
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.hash.Hashing;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.mutable.MutableDouble;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
+
+@Slf4j
+public class AvgShedder implements LoadSheddingStrategy, 
ModularLoadManagerStrategy {
+    // map bundle to broker.
+    private final Map<BundleData, String> bundleBrokerMap = new HashMap<>();
+    // map broker to Scores. scores:0-100
+    private final Map<String, Double> brokerScoreMap = new HashMap<>();
+    // map broker hit count for high threshold/low threshold
+    private final Map<String, MutableInt> brokerHitCountForHigh = new 
HashMap<>();
+    private final Map<String, MutableInt> brokerHitCountForLow = new 
HashMap<>();
+    private static final double MB = 1024 * 1024;
+
+    @Override
+    public Multimap<String, String> findBundlesForUnloading(LoadData loadData, 
ServiceConfiguration conf) {
+        // result returned by shedding, map broker to bundles.
+        Multimap<String, String> selectedBundlesCache = 
ArrayListMultimap.create();
+
+        // configuration for shedding.
+        final double minThroughputThreshold = 
conf.getMinUnloadMessageThroughput();
+        final double minMsgThreshold = conf.getMinUnloadMessage();
+        final double maxUnloadPercentage = conf.getMaxUnloadPercentage();
+        final double lowThreshold = 
conf.getLoadBalancerAvgShedderLowThreshold();
+        final double highThreshold = 
conf.getLoadBalancerAvgShedderHighThreshold();
+        final int hitCountHighThreshold = 
conf.getLoadBalancerAvgShedderHitCountHighThreshold();
+        final int hitCountLowThreshold = 
conf.getLoadBalancerAvgShedderHitCountLowThreshold();
+        if (log.isDebugEnabled()) {
+            log.debug("highThreshold:{}, lowThreshold:{}, 
hitCountHighThreshold:{}, hitCountLowThreshold:{}, "
+                            + "minMsgThreshold:{}, minThroughputThreshold:{}",
+                    highThreshold, lowThreshold, hitCountHighThreshold, 
hitCountLowThreshold,
+                    minMsgThreshold, minThroughputThreshold);
+        }
+
+        List<String> brokers = calculateScoresAndSort(loadData, conf);
+        log.info("sorted broker list:{}", brokers);
+
+        // find broker pairs for shedding.
+        List<Pair<String, String>> pairs = findBrokerPairs(brokers, 
lowThreshold, highThreshold);
+        log.info("brokerHitCountForHigh:{}, brokerHitCountForLow:{}", 
brokerHitCountForHigh, brokerHitCountForLow);
+        if (pairs.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug("there is no any overload broker, no need to 
shedding bundles.");
+            }
+            brokerHitCountForHigh.clear();
+            brokerHitCountForLow.clear();
+            return selectedBundlesCache;
+        }
+
+        // choosing bundles to unload.
+        for (Pair<String, String> pair : pairs) {
+            String overloadedBroker = pair.getRight();
+            String underloadedBroker = pair.getLeft();
+
+            // check hit count for high threshold and low threshold.
+            if (!(brokerHitCountForHigh.computeIfAbsent(underloadedBroker, __ 
-> new MutableInt(0))
+                    .intValue() >= hitCountHighThreshold)
+                    && 
!(brokerHitCountForHigh.computeIfAbsent(overloadedBroker, __ -> new 
MutableInt(0))
+                    .intValue() >= hitCountHighThreshold)
+                    && 
!(brokerHitCountForLow.computeIfAbsent(underloadedBroker, __ -> new 
MutableInt(0))
+                    .intValue() >= hitCountLowThreshold)
+                    && 
!(brokerHitCountForLow.computeIfAbsent(overloadedBroker, __ -> new 
MutableInt(0))
+                    .intValue() >= hitCountLowThreshold)) {
+                continue;
+            }
+
+            // if hit, remove entry.
+            brokerHitCountForHigh.remove(underloadedBroker);
+            brokerHitCountForHigh.remove(overloadedBroker);
+            brokerHitCountForLow.remove(underloadedBroker);
+            brokerHitCountForLow.remove(overloadedBroker);
+
+            // select bundle for unloading.
+            selectBundleForUnloading(loadData, overloadedBroker, 
underloadedBroker, minThroughputThreshold,
+                    minMsgThreshold, maxUnloadPercentage, 
selectedBundlesCache);
+        }
+        return selectedBundlesCache;
+    }
+
+    private void selectBundleForUnloading(LoadData loadData, String 
overloadedBroker, String underloadedBroker,
+                                          double minThroughputThreshold, 
double minMsgThreshold,
+                                          double maxUnloadPercentage, 
Multimap<String, String> selectedBundlesCache) {
+        // calculate how much throughput to unload.
+        LocalBrokerData minLocalBrokerData = 
loadData.getBrokerData().get(underloadedBroker).getLocalData();
+        LocalBrokerData maxLocalBrokerData = 
loadData.getBrokerData().get(overloadedBroker).getLocalData();
+
+        double minMsgRate = minLocalBrokerData.getMsgRateIn() + 
minLocalBrokerData.getMsgRateOut();
+        double maxMsgRate = maxLocalBrokerData.getMsgRateIn() + 
maxLocalBrokerData.getMsgRateOut();
+
+        double minThroughput = minLocalBrokerData.getMsgThroughputIn() + 
minLocalBrokerData.getMsgThroughputOut();
+        double maxThroughput = maxLocalBrokerData.getMsgThroughputIn() + 
maxLocalBrokerData.getMsgThroughputOut();
+
+        double msgRequiredFromUnloadedBundles = (maxMsgRate - minMsgRate) * 
maxUnloadPercentage;
+        double throughputRequiredFromUnloadedBundles = (maxThroughput - 
minThroughput) * maxUnloadPercentage;
+
+        boolean isMsgRateToOffload;
+        MutableDouble trafficMarkedToOffload = new MutableDouble(0);
+
+        if (msgRequiredFromUnloadedBundles > minMsgThreshold) {
+            isMsgRateToOffload = true;
+            trafficMarkedToOffload.setValue(msgRequiredFromUnloadedBundles);
+        } else if (throughputRequiredFromUnloadedBundles > 
minThroughputThreshold) {
+            isMsgRateToOffload = false;
+            
trafficMarkedToOffload.setValue(throughputRequiredFromUnloadedBundles);
+        } else {
+            log.info(
+                    "broker:[{}] is planning to shed bundles to 
broker:[{}],but the throughput {} MByte/s is "
+                            + "less than minimumThroughputThreshold {} 
MByte/s, and the msgRate {} rate/s"
+                            + " is also less than minimumMsgRateThreshold {} 
rate/s, skipping bundle unload.",
+                    overloadedBroker, underloadedBroker, 
throughputRequiredFromUnloadedBundles / MB,
+                    minThroughputThreshold / MB, 
msgRequiredFromUnloadedBundles, minMsgThreshold);
+            return;
+        }
+
+        if (maxLocalBrokerData.getBundles().size() == 1) {
+            log.warn("HIGH USAGE WARNING : Sole namespace bundle {} is 
overloading broker {}. "
+                            + "No Load Shedding will be done on this broker",
+                    maxLocalBrokerData.getBundles().iterator().next(), 
overloadedBroker);
+        } else if (maxLocalBrokerData.getBundles().isEmpty()) {
+            log.warn("Broker {} is overloaded despite having no bundles", 
overloadedBroker);
+        }
+
+        // do shedding
+        log.info(
+                "broker:[{}] is planning to shed bundles to broker:[{}]. "
+                        + "maxBroker stat:scores:{}, throughput:{}, 
msgRate:{}. "
+                        + "minBroker stat:scores:{}, throughput:{}, 
msgRate:{}. "
+                        + "isMsgRateToOffload:{},  trafficMarkedToOffload:{}",
+                overloadedBroker, underloadedBroker, 
brokerScoreMap.get(overloadedBroker), maxThroughput,
+                maxMsgRate, brokerScoreMap.get(underloadedBroker), 
minThroughput, minMsgRate,
+                isMsgRateToOffload, trafficMarkedToOffload);
+
+        loadData.getBundleDataForLoadShedding().entrySet().stream().filter(e ->
+                maxLocalBrokerData.getBundles().contains(e.getKey())
+        ).filter(e ->
+                !loadData.getRecentlyUnloadedBundles().containsKey(e.getKey())
+        ).map((e) -> {
+            BundleData bundleData = e.getValue();
+            TimeAverageMessageData shortTermData = 
bundleData.getShortTermData();
+            double traffic = isMsgRateToOffload
+                    ? shortTermData.getMsgRateIn() + 
shortTermData.getMsgRateOut()
+                    : shortTermData.getMsgThroughputIn() + 
shortTermData.getMsgThroughputOut();
+            return Pair.of(e, traffic);
+        }).sorted((e1, e2) ->
+                Double.compare(e2.getRight(), e1.getRight())
+        ).forEach(e -> {
+            Map.Entry<String, BundleData> bundle = e.getLeft();
+            double traffic = e.getRight();
+            if (traffic > 0 && traffic <= trafficMarkedToOffload.getValue()) {
+                selectedBundlesCache.put(overloadedBroker, bundle.getKey());
+                bundleBrokerMap.put(bundle.getValue(), underloadedBroker);
+                trafficMarkedToOffload.add(-traffic);
+                if (log.isDebugEnabled()) {
+                    log.debug("Found bundle to unload:{}, 
isMsgRateToOffload:{}, traffic:{}",
+                            bundle, isMsgRateToOffload, traffic);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void onActiveBrokersChange(Set<String> activeBrokers) {
+        LoadSheddingStrategy.super.onActiveBrokersChange(activeBrokers);
+    }
+
+    private List<String> calculateScoresAndSort(LoadData loadData, 
ServiceConfiguration conf) {
+        brokerScoreMap.clear();
+
+        // calculate scores of brokers.
+        for (Map.Entry<String, BrokerData> entry : 
loadData.getBrokerData().entrySet()) {
+            LocalBrokerData localBrokerData = entry.getValue().getLocalData();
+            String broker = entry.getKey();
+            Double score = calculateScores(localBrokerData, conf);
+            brokerScoreMap.put(broker, score);
+            if (log.isDebugEnabled()) {
+                log.info("broker:{}, scores:{}, throughput:{}, 
messageRate:{}", broker, score,
+                        localBrokerData.getMsgThroughputIn() + 
localBrokerData.getMsgThroughputOut(),
+                        localBrokerData.getMsgRateIn() + 
localBrokerData.getMsgRateOut());
+            }
+        }
+
+        // sort brokers by scores.
+        return brokerScoreMap.entrySet().stream().sorted((o1, o2) -> (int) 
(o1.getValue() - o2.getValue()))
+                .map(Map.Entry::getKey).toList();
+    }
+
+    private Double calculateScores(LocalBrokerData localBrokerData, final 
ServiceConfiguration conf) {
+        return localBrokerData.getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight()) * 100;
+    }
+
+    private List<Pair<String, String>> findBrokerPairs(List<String> brokers,
+                                                       double lowThreshold, 
double highThreshold) {
+        List<Pair<String, String>> pairs = new LinkedList<>();
+        int i = 0, j = brokers.size() - 1;
+        while (i <= j) {
+            String maxBroker = brokers.get(j);
+            String minBroker = brokers.get(i);
+            if (brokerScoreMap.get(maxBroker) - brokerScoreMap.get(minBroker) 
< lowThreshold) {
+                brokerHitCountForHigh.remove(maxBroker);
+                brokerHitCountForHigh.remove(minBroker);
+
+                brokerHitCountForLow.remove(maxBroker);
+                brokerHitCountForLow.remove(minBroker);
+            } else {
+                pairs.add(Pair.of(minBroker, maxBroker));
+                if (brokerScoreMap.get(maxBroker) - 
brokerScoreMap.get(minBroker) < highThreshold) {
+                    brokerHitCountForLow.computeIfAbsent(minBroker, k -> new 
MutableInt(0)).increment();
+                    brokerHitCountForLow.computeIfAbsent(maxBroker, k -> new 
MutableInt(0)).increment();
+
+                    brokerHitCountForHigh.remove(maxBroker);
+                    brokerHitCountForHigh.remove(minBroker);
+                } else {
+                    brokerHitCountForLow.computeIfAbsent(minBroker, k -> new 
MutableInt(0)).increment();
+                    brokerHitCountForLow.computeIfAbsent(maxBroker, k -> new 
MutableInt(0)).increment();
+
+                    brokerHitCountForHigh.computeIfAbsent(minBroker, k -> new 
MutableInt(0)).increment();
+                    brokerHitCountForHigh.computeIfAbsent(maxBroker, k -> new 
MutableInt(0)).increment();
+                }
+            }
+            i++;
+            j--;
+        }
+        return pairs;
+    }
+
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData 
bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        final var brokerToUnload = 
bundleBrokerMap.getOrDefault(bundleToAssign, null);
+        if (brokerToUnload == null || 
!candidates.contains(bundleBrokerMap.get(bundleToAssign))) {
+            // cluster initializing or broker is shutdown
+            if (log.isDebugEnabled()) {
+                if (!bundleBrokerMap.containsKey(bundleToAssign)) {
+                    log.debug("cluster is initializing");
+                } else {
+                    log.debug("expected broker:{} is shutdown, candidates:{}", 
bundleBrokerMap.get(bundleToAssign),
+                            candidates);
+                }
+            }
+            String broker = getExpectedBroker(candidates, bundleToAssign);
+            bundleBrokerMap.put(bundleToAssign, broker);
+            return Optional.of(broker);
+        } else {
+            return Optional.of(brokerToUnload);
+        }
+    }
+
+    private static String getExpectedBroker(Collection<String> brokers, 
BundleData bundle) {
+        List<String> sortedBrokers = new ArrayList<>(brokers);
+        Collections.sort(sortedBrokers);
+
+        try {
+            // use random number as input of hashing function to avoid special 
case that,
+            // if there is 4 brokers running in the cluster,and add 
broker5,and shutdown broker3,
+            // then all bundles belonging to broker3 will be loaded on the 
same broker.
+            final long hashcode = 
Hashing.crc32().hashString(String.valueOf(new Random().nextInt()),
+                    StandardCharsets.UTF_8).padToLong();
+            final int index = (int) (Math.abs(hashcode) % 
sortedBrokers.size());
+            if (log.isDebugEnabled()) {
+                log.debug("Assignment details: brokers={}, bundle={}, 
hashcode={}, index={}",
+                        sortedBrokers, bundle, hashcode, index);
+            }
+            return sortedBrokers.get(index);
+        } catch (Throwable e) {
+            // theoretically this logic branch should not be executed
+            log.error("Bundle format of {} is invalid", bundle, e);
+            return sortedBrokers.get(Math.abs(bundle.hashCode()) % 
sortedBrokers.size());
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index d895cbb3fcf..c9ff07bf60e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -266,7 +266,21 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
                             () -> 
LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, 
brokerToFailureDomainMap));
                 });
 
-        loadSheddingPipeline.add(createLoadSheddingStrategy());
+        if (placementStrategy instanceof LoadSheddingStrategy) {
+            // if the placement strategy is also a load shedding strategy
+            // we need to check two strategies are the same
+            if (!conf.getLoadBalancerLoadSheddingStrategy().equals(
+                    conf.getLoadBalancerPlacementStrategy())) {
+                throw new IllegalArgumentException("The load shedding 
strategy: "
+                        + conf.getLoadBalancerLoadSheddingStrategy()
+                        + " can't work with the placement strategy: "
+                        + conf.getLoadBalancerPlacementStrategy());
+            }
+            // bind the load shedding strategy and the placement strategy
+            loadSheddingPipeline.add((LoadSheddingStrategy) placementStrategy);
+        } else {
+            loadSheddingPipeline.add(createLoadSheddingStrategy());
+        }
     }
 
     public void handleDataNotification(Notification t) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java
index c64c9950a95..19998198375 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.loadbalance;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 
 import java.lang.reflect.Field;
 import java.util.Arrays;
@@ -34,6 +35,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.impl.AvgShedder;
 import org.apache.pulsar.broker.loadbalance.impl.LeastLongTermMessageRate;
 import org.apache.pulsar.broker.loadbalance.impl.LeastResourceUsageWithWeight;
 import org.apache.pulsar.broker.loadbalance.impl.RoundRobinBrokerSelector;
@@ -47,6 +49,47 @@ import org.testng.annotations.Test;
 @Test(groups = "broker")
 public class ModularLoadManagerStrategyTest {
 
+    public void testAvgShedderWithPreassignedBroker() throws Exception {
+        ModularLoadManagerStrategy strategy = new AvgShedder();
+        Field field = AvgShedder.class.getDeclaredField("bundleBrokerMap");
+        field.setAccessible(true);
+        Map<BundleData, String> bundleBrokerMap = (Map<BundleData, String>) 
field.get(strategy);
+        BundleData bundleData = new BundleData();
+        // assign bundle to broker1 in bundleBrokerMap.
+        bundleBrokerMap.put(bundleData, "1");
+        assertEquals(strategy.selectBroker(Set.of("1", "2", "3"), bundleData, 
null, null), Optional.of("1"));
+        assertEquals(bundleBrokerMap.get(bundleData), "1");
+
+        // remove broker1 in candidates, only broker2 is candidate.
+        assertEquals(strategy.selectBroker(Set.of("2"), bundleData, null, 
null), Optional.of("2"));
+        assertEquals(bundleBrokerMap.get(bundleData), "2");
+    }
+
+    public void testAvgShedderWithoutPreassignedBroker() throws Exception {
+        ModularLoadManagerStrategy strategy = new AvgShedder();
+        Field field = AvgShedder.class.getDeclaredField("bundleBrokerMap");
+        field.setAccessible(true);
+        Map<BundleData, String> bundleBrokerMap = (Map<BundleData, String>) 
field.get(strategy);
+        BundleData bundleData = new BundleData();
+        Set<String> candidates = new HashSet<>();
+        candidates.add("1");
+        candidates.add("2");
+        candidates.add("3");
+
+        // select broker from candidates randomly.
+        Optional<String> selectedBroker = strategy.selectBroker(candidates, 
bundleData, null, null);
+        assertTrue(selectedBroker.isPresent());
+        assertTrue(candidates.contains(selectedBroker.get()));
+        assertEquals(bundleBrokerMap.get(bundleData), selectedBroker.get());
+
+        // remove original broker in candidates
+        candidates.remove(selectedBroker.get());
+        selectedBroker = strategy.selectBroker(candidates, bundleData, null, 
null);
+        assertTrue(selectedBroker.isPresent());
+        assertTrue(candidates.contains(selectedBroker.get()));
+        assertEquals(bundleBrokerMap.get(bundleData), selectedBroker.get());
+    }
+
     // Test that least long term message rate works correctly.
     public void testLeastLongTermMessageRate() {
         BundleData bundleData = new BundleData();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java
new file mode 100644
index 00000000000..215e3d766a9
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import com.google.common.collect.Multimap;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.TimeAverageBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+@Test(groups = "broker")
+public class AvgShedderTest {
+    private AvgShedder avgShedder;
+    private final ServiceConfiguration conf;
+
+    public AvgShedderTest() {
+        conf = new ServiceConfiguration();
+    }
+
+    @BeforeMethod
+    public void setup() {
+        avgShedder = new AvgShedder();
+    }
+
+    private BrokerData initBrokerData() {
+        LocalBrokerData localBrokerData = new LocalBrokerData();
+        localBrokerData.setCpu(new ResourceUsage());
+        localBrokerData.setMemory(new ResourceUsage());
+        localBrokerData.setBandwidthIn(new ResourceUsage());
+        localBrokerData.setBandwidthOut(new ResourceUsage());
+        BrokerData brokerData = new BrokerData(localBrokerData);
+        TimeAverageBrokerData timeAverageBrokerData = new 
TimeAverageBrokerData();
+        brokerData.setTimeAverageData(timeAverageBrokerData);
+        return brokerData;
+    }
+
+    @Test
+    public void testHitHighThreshold() {
+        LoadData loadData = new LoadData();
+        BrokerData brokerData1 = initBrokerData();
+        BrokerData brokerData2 = initBrokerData();
+        BrokerData brokerData3 = initBrokerData();
+        loadData.getBrokerData().put("broker1", brokerData1);
+        loadData.getBrokerData().put("broker2", brokerData2);
+        loadData.getBrokerData().put("broker3", brokerData3);
+        // AvgShedder will distribute the load evenly between the highest and 
lowest brokers
+        conf.setMaxUnloadPercentage(0.5);
+
+        // Set the high threshold to 40% and hit count high threshold to 2
+        int hitCountForHighThreshold = 2;
+        conf.setLoadBalancerAvgShedderHighThreshold(40);
+        
conf.setLoadBalancerAvgShedderHitCountHighThreshold(hitCountForHighThreshold);
+        brokerData1.getLocalData().setCpu(new ResourceUsage(80, 100));
+        brokerData2.getLocalData().setCpu(new ResourceUsage(30, 100));
+        brokerData1.getLocalData().setMsgRateIn(10000);
+        brokerData1.getLocalData().setMsgRateOut(10000);
+        brokerData2.getLocalData().setMsgRateIn(1000);
+        brokerData2.getLocalData().setMsgRateOut(1000);
+
+        // broker3 is in the middle
+        brokerData3.getLocalData().setCpu(new ResourceUsage(50, 100));
+        brokerData3.getLocalData().setMsgRateIn(5000);
+        brokerData3.getLocalData().setMsgRateOut(5000);
+
+        // expect to shed bundles with message rate(in+out) 
((10000+10000)-(1000+1000))/2 = 9000
+        // each bundle with 450 msg rate in and 450 msg rate out
+        // so 9000/(450+450)=10 bundles will be shed
+        for (int i = 0; i < 11; i++) {
+            brokerData1.getLocalData().getBundles().add("bundle-" + i);
+            BundleData bundle = new BundleData();
+            TimeAverageMessageData timeAverageMessageData = new 
TimeAverageMessageData();
+            timeAverageMessageData.setMsgRateIn(450);
+            timeAverageMessageData.setMsgRateOut(450);
+            // as AvgShedder map BundleData to broker, the hashCode of 
different BundleData should be different
+            // so we need to set some different fields to make the hashCode 
different
+            timeAverageMessageData.setNumSamples(i);
+            bundle.setShortTermData(timeAverageMessageData);
+            loadData.getBundleData().put("bundle-" + i, bundle);
+        }
+
+        // do shedding for the first time, expect to shed nothing because hit 
count is not enough
+        Multimap<String, String> bundlesToUnload = 
avgShedder.findBundlesForUnloading(loadData, conf);
+        assertEquals(bundlesToUnload.size(), 0);
+
+        // do shedding for the second time, expect to shed 10 bundles
+        bundlesToUnload = avgShedder.findBundlesForUnloading(loadData, conf);
+        assertEquals(bundlesToUnload.size(), 10);
+
+        // assert that all the bundles are shed from broker1
+        for (String broker : bundlesToUnload.keys()) {
+            assertEquals(broker, "broker1");
+        }
+        // assert that all the bundles are shed to broker2
+        for (String bundle : bundlesToUnload.values()) {
+            BundleData bundleData = loadData.getBundleData().get(bundle);
+            
assertEquals(avgShedder.selectBroker(loadData.getBrokerData().keySet(), 
bundleData, loadData, conf).get(), "broker2");
+        }
+    }
+
+    @Test
+    public void testHitLowThreshold() {
+        LoadData loadData = new LoadData();
+        BrokerData brokerData1 = initBrokerData();
+        BrokerData brokerData2 = initBrokerData();
+        BrokerData brokerData3 = initBrokerData();
+        loadData.getBrokerData().put("broker1", brokerData1);
+        loadData.getBrokerData().put("broker2", brokerData2);
+        loadData.getBrokerData().put("broker3", brokerData3);
+        // AvgShedder will distribute the load evenly between the highest and 
lowest brokers
+        conf.setMaxUnloadPercentage(0.5);
+
+        // Set the low threshold to 20% and hit count low threshold to 6
+        int hitCountForLowThreshold = 6;
+        conf.setLoadBalancerAvgShedderLowThreshold(20);
+        
conf.setLoadBalancerAvgShedderHitCountLowThreshold(hitCountForLowThreshold);
+        brokerData1.getLocalData().setCpu(new ResourceUsage(60, 100));
+        brokerData2.getLocalData().setCpu(new ResourceUsage(40, 100));
+        brokerData1.getLocalData().setMsgRateIn(10000);
+        brokerData1.getLocalData().setMsgRateOut(10000);
+        brokerData2.getLocalData().setMsgRateIn(1000);
+        brokerData2.getLocalData().setMsgRateOut(1000);
+
+        // broker3 is in the middle
+        brokerData3.getLocalData().setCpu(new ResourceUsage(50, 100));
+        brokerData3.getLocalData().setMsgRateIn(5000);
+        brokerData3.getLocalData().setMsgRateOut(5000);
+
+        // expect to shed bundles with message rate(in+out) 
((10000+10000)-(1000+1000))/2 = 9000
+        // each bundle with 450 msg rate in and 450 msg rate out
+        // so 9000/(450+450)=10 bundles will be shed
+        for (int i = 0; i < 11; i++) {
+            brokerData1.getLocalData().getBundles().add("bundle-" + i);
+            BundleData bundle = new BundleData();
+            TimeAverageMessageData timeAverageMessageData = new 
TimeAverageMessageData();
+            timeAverageMessageData.setMsgRateIn(450);
+            timeAverageMessageData.setMsgRateOut(450);
+            // as AvgShedder map BundleData to broker, the hashCode of 
different BundleData should be different
+            // so we need to set some different fields to make the hashCode 
different
+            timeAverageMessageData.setNumSamples(i);
+            bundle.setShortTermData(timeAverageMessageData);
+            loadData.getBundleData().put("bundle-" + i, bundle);
+        }
+
+        // do shedding for (lowCountForHighThreshold - 1) times, expect to 
shed nothing because hit count is not enough
+        Multimap<String, String> bundlesToUnload;
+        for (int i = 0; i < hitCountForLowThreshold - 1; i++) {
+            bundlesToUnload = avgShedder.findBundlesForUnloading(loadData, 
conf);
+            assertEquals(bundlesToUnload.size(), 0);
+        }
+
+        // do shedding for the last time, expect to shed 10 bundles
+        bundlesToUnload = avgShedder.findBundlesForUnloading(loadData, conf);
+        assertEquals(bundlesToUnload.size(), 10);
+
+        // assert that all the bundles are shed from broker1
+        for (String broker : bundlesToUnload.keys()) {
+            assertEquals(broker, "broker1");
+        }
+        // assert that all the bundles are shed to broker2
+        for (String bundle : bundlesToUnload.values()) {
+            BundleData bundleData = loadData.getBundleData().get(bundle);
+            
assertEquals(avgShedder.selectBroker(loadData.getBrokerData().keySet(), 
bundleData, loadData, conf).get(), "broker2");
+        }
+    }
+
+    @Test
+    public void testSheddingMultiplePairs() {
+        LoadData loadData = new LoadData();
+        BrokerData brokerData1 = initBrokerData();
+        BrokerData brokerData2 = initBrokerData();
+        BrokerData brokerData3 = initBrokerData();
+        BrokerData brokerData4 = initBrokerData();
+        loadData.getBrokerData().put("broker1", brokerData1);
+        loadData.getBrokerData().put("broker2", brokerData2);
+        loadData.getBrokerData().put("broker3", brokerData3);
+        loadData.getBrokerData().put("broker4", brokerData4);
+        // AvgShedder will distribute the load evenly between the highest and 
lowest brokers
+        conf.setMaxUnloadPercentage(0.5);
+
+        // Set the high threshold to 40% and hit count high threshold to 2
+        int hitCountForHighThreshold = 2;
+        conf.setLoadBalancerAvgShedderHighThreshold(40);
+        
conf.setLoadBalancerAvgShedderHitCountHighThreshold(hitCountForHighThreshold);
+
+        // pair broker1 and broker2
+        brokerData1.getLocalData().setCpu(new ResourceUsage(80, 100));
+        brokerData2.getLocalData().setCpu(new ResourceUsage(30, 100));
+        brokerData1.getLocalData().setMsgRateIn(10000);
+        brokerData1.getLocalData().setMsgRateOut(10000);
+        brokerData2.getLocalData().setMsgRateIn(1000);
+        brokerData2.getLocalData().setMsgRateOut(1000);
+
+        // pair broker3 and broker4
+        brokerData3.getLocalData().setCpu(new ResourceUsage(75, 100));
+        brokerData3.getLocalData().setMsgRateIn(10000);
+        brokerData3.getLocalData().setMsgRateOut(10000);
+        brokerData4.getLocalData().setCpu(new ResourceUsage(35, 100));
+        brokerData4.getLocalData().setMsgRateIn(1000);
+        brokerData4.getLocalData().setMsgRateOut(1000);
+
+        // expect to shed bundles with message rate(in+out) 
((10000+10000)-(1000+1000))/2 = 9000
+        // each bundle with 450 msg rate in and 450 msg rate out
+        // so 9000/(450+450)=10 bundles will be shed
+        for (int i = 0; i < 11; i++) {
+            brokerData1.getLocalData().getBundles().add("bundle1-" + i);
+            brokerData3.getLocalData().getBundles().add("bundle3-" + i);
+
+            BundleData bundle = new BundleData();
+            TimeAverageMessageData timeAverageMessageData = new 
TimeAverageMessageData();
+            timeAverageMessageData.setMsgRateIn(450);
+            timeAverageMessageData.setMsgRateOut(450);
+            // as AvgShedder map BundleData to broker, the hashCode of 
different BundleData should be different
+            // so we need to set some different fields to make the hashCode 
different
+            timeAverageMessageData.setNumSamples(i);
+            bundle.setShortTermData(timeAverageMessageData);
+            loadData.getBundleData().put("bundle1-" + i, bundle);
+
+            bundle = new BundleData();
+            timeAverageMessageData = new TimeAverageMessageData();
+            timeAverageMessageData.setMsgRateIn(450);
+            timeAverageMessageData.setMsgRateOut(450);
+            timeAverageMessageData.setNumSamples(i+11);
+            bundle.setShortTermData(timeAverageMessageData);
+            loadData.getBundleData().put("bundle3-" + i, bundle);
+        }
+
+        // do shedding for the first time, expect to shed nothing because hit 
count is not enough
+        Multimap<String, String> bundlesToUnload = 
avgShedder.findBundlesForUnloading(loadData, conf);
+        assertEquals(bundlesToUnload.size(), 0);
+
+        // do shedding for the second time, expect to shed 10*2=20 bundles
+        bundlesToUnload = avgShedder.findBundlesForUnloading(loadData, conf);
+        assertEquals(bundlesToUnload.size(), 20);
+
+        // assert that half of the bundles are shed from broker1, and the 
other half are shed from broker3
+        for (String broker : bundlesToUnload.keys()) {
+            if (broker.equals("broker1")) {
+                assertEquals(bundlesToUnload.get(broker).size(), 10);
+            } else if (broker.equals("broker3")) {
+                assertEquals(bundlesToUnload.get(broker).size(), 10);
+            } else {
+                fail();
+            }
+        }
+
+        // assert that all the bundles from broker1 are shed to broker2, and 
all the bundles from broker3 are shed to broker4
+        for (String bundle : bundlesToUnload.values()) {
+            BundleData bundleData = loadData.getBundleData().get(bundle);
+            if (bundle.startsWith("bundle1-")) {
+                
assertEquals(avgShedder.selectBroker(loadData.getBrokerData().keySet(), 
bundleData, loadData, conf).get(), "broker2");
+            } else if (bundle.startsWith("bundle3-")) {
+                
assertEquals(avgShedder.selectBroker(loadData.getBrokerData().keySet(), 
bundleData, loadData, conf).get(), "broker4");
+            } else {
+                fail();
+            }
+        }
+    }
+}


Reply via email to