This is an automated email from the ASF dual-hosted git repository.
thetumbled pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new d58461d7d56 [improve][broker][branch-3.0] PIP-364: Introduce a new
load balance algorithm AvgShedder (#23053)
d58461d7d56 is described below
commit d58461d7d562610d648fdf7e5629484e3693a84c
Author: Wenzhi Feng <[email protected]>
AuthorDate: Fri Jul 19 15:52:38 2024 +0800
[improve][broker][branch-3.0] PIP-364: Introduce a new load balance
algorithm AvgShedder (#23053)
---
conf/broker.conf | 19 ++
.../apache/pulsar/broker/ServiceConfiguration.java | 36 ++-
.../pulsar/broker/loadbalance/impl/AvgShedder.java | 318 +++++++++++++++++++++
.../loadbalance/impl/ModularLoadManagerImpl.java | 16 +-
.../ModularLoadManagerStrategyTest.java | 43 +++
.../broker/loadbalance/impl/AvgShedderTest.java | 283 ++++++++++++++++++
6 files changed, 711 insertions(+), 4 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 2a9641b5b90..f07394a84df 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1418,6 +1418,25 @@ loadBalancerBundleUnloadMinThroughputThreshold=10
# Time to wait for the unloading of a namespace bundle
namespaceBundleUnloadingTimeoutMs=60000
+# configuration for AvgShedder, a new shedding and placement strategy
+# The low threshold for the difference between the highest and lowest loaded
brokers.
+loadBalancerAvgShedderLowThreshold = 15
+
+# The high threshold for the difference between the highest and lowest loaded
brokers.
+loadBalancerAvgShedderHighThreshold = 40
+
+# The number of times the low threshold is triggered before the bundle is
unloaded.
+loadBalancerAvgShedderHitCountLowThreshold = 8
+
+# The number of times the high threshold is triggered before the bundle is
unloaded.
+loadBalancerAvgShedderHitCountHighThreshold = 2
+
+# In the UniformLoadShedder and AvgShedder strategy, the maximum unload ratio.
+# For AvgShedder, recommend to set to 0.5, so that it will distribute the load
evenly
+# between the highest and lowest brokers.
+maxUnloadPercentage = 0.2
+
+
### --- Load balancer extension --- ###
# Option to enable the debug mode for the load balancer logics.
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 6d4635681ed..b3bc0f7e414 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2395,21 +2395,51 @@ public class ServiceConfiguration implements
PulsarConfiguration {
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
- doc = "In the UniformLoadShedder strategy, the minimum message
that triggers unload."
+ doc = "The low threshold for the difference between the highest
and lowest loaded brokers."
+ )
+ private int loadBalancerAvgShedderLowThreshold = 15;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "The high threshold for the difference between the highest
and lowest loaded brokers."
+ )
+ private int loadBalancerAvgShedderHighThreshold = 40;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "The number of times the low threshold is triggered before
the bundle is unloaded."
+ )
+ private int loadBalancerAvgShedderHitCountLowThreshold = 8;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "The number of times the high threshold is triggered before
the bundle is unloaded."
+ )
+ private int loadBalancerAvgShedderHitCountHighThreshold = 2;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "In the UniformLoadShedder and AvgShedder strategy, the
minimum message that triggers unload."
)
private int minUnloadMessage = 1000;
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
- doc = "In the UniformLoadShedder strategy, the minimum throughput
that triggers unload."
+ doc = "In the UniformLoadShedder and AvgShedder strategy, the
minimum throughput that triggers unload."
)
private int minUnloadMessageThroughput = 1 * 1024 * 1024;
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
- doc = "In the UniformLoadShedder strategy, the maximum unload
ratio."
+ doc = "In the UniformLoadShedder and AvgShedder strategy, the
maximum unload ratio."
+ + "For AvgShedder, recommend to set to 0.5, so that it
will distribute the load "
+ + "evenly between the highest and lowest brokers."
)
private double maxUnloadPercentage = 0.2;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java
new file mode 100644
index 00000000000..e33ff097164
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.hash.Hashing;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.mutable.MutableDouble;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
+
+@Slf4j
+public class AvgShedder implements LoadSheddingStrategy,
ModularLoadManagerStrategy {
+ // map bundle to broker.
+ private final Map<BundleData, String> bundleBrokerMap = new HashMap<>();
+ // map broker to Scores. scores:0-100
+ private final Map<String, Double> brokerScoreMap = new HashMap<>();
+ // map broker hit count for high threshold/low threshold
+ private final Map<String, MutableInt> brokerHitCountForHigh = new
HashMap<>();
+ private final Map<String, MutableInt> brokerHitCountForLow = new
HashMap<>();
+ private static final double MB = 1024 * 1024;
+
+ @Override
+ public Multimap<String, String> findBundlesForUnloading(LoadData loadData,
ServiceConfiguration conf) {
+ // result returned by shedding, map broker to bundles.
+ Multimap<String, String> selectedBundlesCache =
ArrayListMultimap.create();
+
+ // configuration for shedding.
+ final double minThroughputThreshold =
conf.getMinUnloadMessageThroughput();
+ final double minMsgThreshold = conf.getMinUnloadMessage();
+ final double maxUnloadPercentage = conf.getMaxUnloadPercentage();
+ final double lowThreshold =
conf.getLoadBalancerAvgShedderLowThreshold();
+ final double highThreshold =
conf.getLoadBalancerAvgShedderHighThreshold();
+ final int hitCountHighThreshold =
conf.getLoadBalancerAvgShedderHitCountHighThreshold();
+ final int hitCountLowThreshold =
conf.getLoadBalancerAvgShedderHitCountLowThreshold();
+ if (log.isDebugEnabled()) {
+ log.debug("highThreshold:{}, lowThreshold:{},
hitCountHighThreshold:{}, hitCountLowThreshold:{}, "
+ + "minMsgThreshold:{}, minThroughputThreshold:{}",
+ highThreshold, lowThreshold, hitCountHighThreshold,
hitCountLowThreshold,
+ minMsgThreshold, minThroughputThreshold);
+ }
+
+ List<String> brokers = calculateScoresAndSort(loadData, conf);
+ log.info("sorted broker list:{}", brokers);
+
+ // find broker pairs for shedding.
+ List<Pair<String, String>> pairs = findBrokerPairs(brokers,
lowThreshold, highThreshold);
+ log.info("brokerHitCountForHigh:{}, brokerHitCountForLow:{}",
brokerHitCountForHigh, brokerHitCountForLow);
+ if (pairs.isEmpty()) {
+ if (log.isDebugEnabled()) {
+ log.debug("there is no any overload broker, no need to
shedding bundles.");
+ }
+ brokerHitCountForHigh.clear();
+ brokerHitCountForLow.clear();
+ return selectedBundlesCache;
+ }
+
+ // choosing bundles to unload.
+ for (Pair<String, String> pair : pairs) {
+ String overloadedBroker = pair.getRight();
+ String underloadedBroker = pair.getLeft();
+
+ // check hit count for high threshold and low threshold.
+ if (!(brokerHitCountForHigh.computeIfAbsent(underloadedBroker, __
-> new MutableInt(0))
+ .intValue() >= hitCountHighThreshold)
+ &&
!(brokerHitCountForHigh.computeIfAbsent(overloadedBroker, __ -> new
MutableInt(0))
+ .intValue() >= hitCountHighThreshold)
+ &&
!(brokerHitCountForLow.computeIfAbsent(underloadedBroker, __ -> new
MutableInt(0))
+ .intValue() >= hitCountLowThreshold)
+ &&
!(brokerHitCountForLow.computeIfAbsent(overloadedBroker, __ -> new
MutableInt(0))
+ .intValue() >= hitCountLowThreshold)) {
+ continue;
+ }
+
+ // if hit, remove entry.
+ brokerHitCountForHigh.remove(underloadedBroker);
+ brokerHitCountForHigh.remove(overloadedBroker);
+ brokerHitCountForLow.remove(underloadedBroker);
+ brokerHitCountForLow.remove(overloadedBroker);
+
+ // select bundle for unloading.
+ selectBundleForUnloading(loadData, overloadedBroker,
underloadedBroker, minThroughputThreshold,
+ minMsgThreshold, maxUnloadPercentage,
selectedBundlesCache);
+ }
+ return selectedBundlesCache;
+ }
+
+ private void selectBundleForUnloading(LoadData loadData, String
overloadedBroker, String underloadedBroker,
+ double minThroughputThreshold,
double minMsgThreshold,
+ double maxUnloadPercentage,
Multimap<String, String> selectedBundlesCache) {
+ // calculate how much throughput to unload.
+ LocalBrokerData minLocalBrokerData =
loadData.getBrokerData().get(underloadedBroker).getLocalData();
+ LocalBrokerData maxLocalBrokerData =
loadData.getBrokerData().get(overloadedBroker).getLocalData();
+
+ double minMsgRate = minLocalBrokerData.getMsgRateIn() +
minLocalBrokerData.getMsgRateOut();
+ double maxMsgRate = maxLocalBrokerData.getMsgRateIn() +
maxLocalBrokerData.getMsgRateOut();
+
+ double minThroughput = minLocalBrokerData.getMsgThroughputIn() +
minLocalBrokerData.getMsgThroughputOut();
+ double maxThroughput = maxLocalBrokerData.getMsgThroughputIn() +
maxLocalBrokerData.getMsgThroughputOut();
+
+ double msgRequiredFromUnloadedBundles = (maxMsgRate - minMsgRate) *
maxUnloadPercentage;
+ double throughputRequiredFromUnloadedBundles = (maxThroughput -
minThroughput) * maxUnloadPercentage;
+
+ boolean isMsgRateToOffload;
+ MutableDouble trafficMarkedToOffload = new MutableDouble(0);
+
+ if (msgRequiredFromUnloadedBundles > minMsgThreshold) {
+ isMsgRateToOffload = true;
+ trafficMarkedToOffload.setValue(msgRequiredFromUnloadedBundles);
+ } else if (throughputRequiredFromUnloadedBundles >
minThroughputThreshold) {
+ isMsgRateToOffload = false;
+
trafficMarkedToOffload.setValue(throughputRequiredFromUnloadedBundles);
+ } else {
+ log.info(
+ "broker:[{}] is planning to shed bundles to
broker:[{}],but the throughput {} MByte/s is "
+ + "less than minimumThroughputThreshold {}
MByte/s, and the msgRate {} rate/s"
+ + " is also less than minimumMsgRateThreshold {}
rate/s, skipping bundle unload.",
+ overloadedBroker, underloadedBroker,
throughputRequiredFromUnloadedBundles / MB,
+ minThroughputThreshold / MB,
msgRequiredFromUnloadedBundles, minMsgThreshold);
+ return;
+ }
+
+ if (maxLocalBrokerData.getBundles().size() == 1) {
+ log.warn("HIGH USAGE WARNING : Sole namespace bundle {} is
overloading broker {}. "
+ + "No Load Shedding will be done on this broker",
+ maxLocalBrokerData.getBundles().iterator().next(),
overloadedBroker);
+ } else if (maxLocalBrokerData.getBundles().isEmpty()) {
+ log.warn("Broker {} is overloaded despite having no bundles",
overloadedBroker);
+ }
+
+ // do shedding
+ log.info(
+ "broker:[{}] is planning to shed bundles to broker:[{}]. "
+ + "maxBroker stat:scores:{}, throughput:{},
msgRate:{}. "
+ + "minBroker stat:scores:{}, throughput:{},
msgRate:{}. "
+ + "isMsgRateToOffload:{}, trafficMarkedToOffload:{}",
+ overloadedBroker, underloadedBroker,
brokerScoreMap.get(overloadedBroker), maxThroughput,
+ maxMsgRate, brokerScoreMap.get(underloadedBroker),
minThroughput, minMsgRate,
+ isMsgRateToOffload, trafficMarkedToOffload);
+
+ loadData.getBundleDataForLoadShedding().entrySet().stream().filter(e ->
+ maxLocalBrokerData.getBundles().contains(e.getKey())
+ ).filter(e ->
+ !loadData.getRecentlyUnloadedBundles().containsKey(e.getKey())
+ ).map((e) -> {
+ BundleData bundleData = e.getValue();
+ TimeAverageMessageData shortTermData =
bundleData.getShortTermData();
+ double traffic = isMsgRateToOffload
+ ? shortTermData.getMsgRateIn() +
shortTermData.getMsgRateOut()
+ : shortTermData.getMsgThroughputIn() +
shortTermData.getMsgThroughputOut();
+ return Pair.of(e, traffic);
+ }).sorted((e1, e2) ->
+ Double.compare(e2.getRight(), e1.getRight())
+ ).forEach(e -> {
+ Map.Entry<String, BundleData> bundle = e.getLeft();
+ double traffic = e.getRight();
+ if (traffic > 0 && traffic <= trafficMarkedToOffload.getValue()) {
+ selectedBundlesCache.put(overloadedBroker, bundle.getKey());
+ bundleBrokerMap.put(bundle.getValue(), underloadedBroker);
+ trafficMarkedToOffload.add(-traffic);
+ if (log.isDebugEnabled()) {
+ log.debug("Found bundle to unload:{},
isMsgRateToOffload:{}, traffic:{}",
+ bundle, isMsgRateToOffload, traffic);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void onActiveBrokersChange(Set<String> activeBrokers) {
+ LoadSheddingStrategy.super.onActiveBrokersChange(activeBrokers);
+ }
+
+ private List<String> calculateScoresAndSort(LoadData loadData,
ServiceConfiguration conf) {
+ brokerScoreMap.clear();
+
+ // calculate scores of brokers.
+ for (Map.Entry<String, BrokerData> entry :
loadData.getBrokerData().entrySet()) {
+ LocalBrokerData localBrokerData = entry.getValue().getLocalData();
+ String broker = entry.getKey();
+ Double score = calculateScores(localBrokerData, conf);
+ brokerScoreMap.put(broker, score);
+ if (log.isDebugEnabled()) {
+ log.info("broker:{}, scores:{}, throughput:{},
messageRate:{}", broker, score,
+ localBrokerData.getMsgThroughputIn() +
localBrokerData.getMsgThroughputOut(),
+ localBrokerData.getMsgRateIn() +
localBrokerData.getMsgRateOut());
+ }
+ }
+
+ // sort brokers by scores.
+ return brokerScoreMap.entrySet().stream().sorted((o1, o2) -> (int)
(o1.getValue() - o2.getValue()))
+ .map(Map.Entry::getKey).toList();
+ }
+
+ private Double calculateScores(LocalBrokerData localBrokerData, final
ServiceConfiguration conf) {
+ return localBrokerData.getMaxResourceUsageWithWeight(
+ conf.getLoadBalancerCPUResourceWeight(),
+ conf.getLoadBalancerDirectMemoryResourceWeight(),
+ conf.getLoadBalancerBandwithInResourceWeight(),
+ conf.getLoadBalancerBandwithOutResourceWeight()) * 100;
+ }
+
+ private List<Pair<String, String>> findBrokerPairs(List<String> brokers,
+ double lowThreshold,
double highThreshold) {
+ List<Pair<String, String>> pairs = new LinkedList<>();
+ int i = 0, j = brokers.size() - 1;
+ while (i <= j) {
+ String maxBroker = brokers.get(j);
+ String minBroker = brokers.get(i);
+ if (brokerScoreMap.get(maxBroker) - brokerScoreMap.get(minBroker)
< lowThreshold) {
+ brokerHitCountForHigh.remove(maxBroker);
+ brokerHitCountForHigh.remove(minBroker);
+
+ brokerHitCountForLow.remove(maxBroker);
+ brokerHitCountForLow.remove(minBroker);
+ } else {
+ pairs.add(Pair.of(minBroker, maxBroker));
+ if (brokerScoreMap.get(maxBroker) -
brokerScoreMap.get(minBroker) < highThreshold) {
+ brokerHitCountForLow.computeIfAbsent(minBroker, k -> new
MutableInt(0)).increment();
+ brokerHitCountForLow.computeIfAbsent(maxBroker, k -> new
MutableInt(0)).increment();
+
+ brokerHitCountForHigh.remove(maxBroker);
+ brokerHitCountForHigh.remove(minBroker);
+ } else {
+ brokerHitCountForLow.computeIfAbsent(minBroker, k -> new
MutableInt(0)).increment();
+ brokerHitCountForLow.computeIfAbsent(maxBroker, k -> new
MutableInt(0)).increment();
+
+ brokerHitCountForHigh.computeIfAbsent(minBroker, k -> new
MutableInt(0)).increment();
+ brokerHitCountForHigh.computeIfAbsent(maxBroker, k -> new
MutableInt(0)).increment();
+ }
+ }
+ i++;
+ j--;
+ }
+ return pairs;
+ }
+
+ @Override
+ public Optional<String> selectBroker(Set<String> candidates, BundleData
bundleToAssign, LoadData loadData,
+ ServiceConfiguration conf) {
+ final var brokerToUnload =
bundleBrokerMap.getOrDefault(bundleToAssign, null);
+ if (brokerToUnload == null ||
!candidates.contains(bundleBrokerMap.get(bundleToAssign))) {
+ // cluster initializing or broker is shutdown
+ if (log.isDebugEnabled()) {
+ if (!bundleBrokerMap.containsKey(bundleToAssign)) {
+ log.debug("cluster is initializing");
+ } else {
+ log.debug("expected broker:{} is shutdown, candidates:{}",
bundleBrokerMap.get(bundleToAssign),
+ candidates);
+ }
+ }
+ String broker = getExpectedBroker(candidates, bundleToAssign);
+ bundleBrokerMap.put(bundleToAssign, broker);
+ return Optional.of(broker);
+ } else {
+ return Optional.of(brokerToUnload);
+ }
+ }
+
+ private static String getExpectedBroker(Collection<String> brokers,
BundleData bundle) {
+ List<String> sortedBrokers = new ArrayList<>(brokers);
+ Collections.sort(sortedBrokers);
+
+ try {
+ // use random number as input of hashing function to avoid special
case that,
+ // if there is 4 brokers running in the cluster,and add
broker5,and shutdown broker3,
+ // then all bundles belonging to broker3 will be loaded on the
same broker.
+ final long hashcode =
Hashing.crc32().hashString(String.valueOf(new Random().nextInt()),
+ StandardCharsets.UTF_8).padToLong();
+ final int index = (int) (Math.abs(hashcode) %
sortedBrokers.size());
+ if (log.isDebugEnabled()) {
+ log.debug("Assignment details: brokers={}, bundle={},
hashcode={}, index={}",
+ sortedBrokers, bundle, hashcode, index);
+ }
+ return sortedBrokers.get(index);
+ } catch (Throwable e) {
+ // theoretically this logic branch should not be executed
+ log.error("Bundle format of {} is invalid", bundle, e);
+ return sortedBrokers.get(Math.abs(bundle.hashCode()) %
sortedBrokers.size());
+ }
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index d895cbb3fcf..c9ff07bf60e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -266,7 +266,21 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
() ->
LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar,
brokerToFailureDomainMap));
});
- loadSheddingPipeline.add(createLoadSheddingStrategy());
+ if (placementStrategy instanceof LoadSheddingStrategy) {
+ // if the placement strategy is also a load shedding strategy
+ // we need to check two strategies are the same
+ if (!conf.getLoadBalancerLoadSheddingStrategy().equals(
+ conf.getLoadBalancerPlacementStrategy())) {
+ throw new IllegalArgumentException("The load shedding
strategy: "
+ + conf.getLoadBalancerLoadSheddingStrategy()
+ + " can't work with the placement strategy: "
+ + conf.getLoadBalancerPlacementStrategy());
+ }
+ // bind the load shedding strategy and the placement strategy
+ loadSheddingPipeline.add((LoadSheddingStrategy) placementStrategy);
+ } else {
+ loadSheddingPipeline.add(createLoadSheddingStrategy());
+ }
}
public void handleDataNotification(Notification t) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java
index c64c9950a95..19998198375 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.loadbalance;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.util.Arrays;
@@ -34,6 +35,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.impl.AvgShedder;
import org.apache.pulsar.broker.loadbalance.impl.LeastLongTermMessageRate;
import org.apache.pulsar.broker.loadbalance.impl.LeastResourceUsageWithWeight;
import org.apache.pulsar.broker.loadbalance.impl.RoundRobinBrokerSelector;
@@ -47,6 +49,47 @@ import org.testng.annotations.Test;
@Test(groups = "broker")
public class ModularLoadManagerStrategyTest {
+ public void testAvgShedderWithPreassignedBroker() throws Exception {
+ ModularLoadManagerStrategy strategy = new AvgShedder();
+ Field field = AvgShedder.class.getDeclaredField("bundleBrokerMap");
+ field.setAccessible(true);
+ Map<BundleData, String> bundleBrokerMap = (Map<BundleData, String>)
field.get(strategy);
+ BundleData bundleData = new BundleData();
+ // assign bundle to broker1 in bundleBrokerMap.
+ bundleBrokerMap.put(bundleData, "1");
+ assertEquals(strategy.selectBroker(Set.of("1", "2", "3"), bundleData,
null, null), Optional.of("1"));
+ assertEquals(bundleBrokerMap.get(bundleData), "1");
+
+ // remove broker1 in candidates, only broker2 is candidate.
+ assertEquals(strategy.selectBroker(Set.of("2"), bundleData, null,
null), Optional.of("2"));
+ assertEquals(bundleBrokerMap.get(bundleData), "2");
+ }
+
+ public void testAvgShedderWithoutPreassignedBroker() throws Exception {
+ ModularLoadManagerStrategy strategy = new AvgShedder();
+ Field field = AvgShedder.class.getDeclaredField("bundleBrokerMap");
+ field.setAccessible(true);
+ Map<BundleData, String> bundleBrokerMap = (Map<BundleData, String>)
field.get(strategy);
+ BundleData bundleData = new BundleData();
+ Set<String> candidates = new HashSet<>();
+ candidates.add("1");
+ candidates.add("2");
+ candidates.add("3");
+
+ // select broker from candidates randomly.
+ Optional<String> selectedBroker = strategy.selectBroker(candidates,
bundleData, null, null);
+ assertTrue(selectedBroker.isPresent());
+ assertTrue(candidates.contains(selectedBroker.get()));
+ assertEquals(bundleBrokerMap.get(bundleData), selectedBroker.get());
+
+ // remove original broker in candidates
+ candidates.remove(selectedBroker.get());
+ selectedBroker = strategy.selectBroker(candidates, bundleData, null,
null);
+ assertTrue(selectedBroker.isPresent());
+ assertTrue(candidates.contains(selectedBroker.get()));
+ assertEquals(bundleBrokerMap.get(bundleData), selectedBroker.get());
+ }
+
// Test that least long term message rate works correctly.
public void testLeastLongTermMessageRate() {
BundleData bundleData = new BundleData();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java
new file mode 100644
index 00000000000..215e3d766a9
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import com.google.common.collect.Multimap;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.TimeAverageBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+@Test(groups = "broker")
+public class AvgShedderTest {
+ private AvgShedder avgShedder;
+ private final ServiceConfiguration conf;
+
+ public AvgShedderTest() {
+ conf = new ServiceConfiguration();
+ }
+
+ @BeforeMethod
+ public void setup() {
+ avgShedder = new AvgShedder();
+ }
+
+ private BrokerData initBrokerData() {
+ LocalBrokerData localBrokerData = new LocalBrokerData();
+ localBrokerData.setCpu(new ResourceUsage());
+ localBrokerData.setMemory(new ResourceUsage());
+ localBrokerData.setBandwidthIn(new ResourceUsage());
+ localBrokerData.setBandwidthOut(new ResourceUsage());
+ BrokerData brokerData = new BrokerData(localBrokerData);
+ TimeAverageBrokerData timeAverageBrokerData = new
TimeAverageBrokerData();
+ brokerData.setTimeAverageData(timeAverageBrokerData);
+ return brokerData;
+ }
+
+ @Test
+ public void testHitHighThreshold() {
+ LoadData loadData = new LoadData();
+ BrokerData brokerData1 = initBrokerData();
+ BrokerData brokerData2 = initBrokerData();
+ BrokerData brokerData3 = initBrokerData();
+ loadData.getBrokerData().put("broker1", brokerData1);
+ loadData.getBrokerData().put("broker2", brokerData2);
+ loadData.getBrokerData().put("broker3", brokerData3);
+ // AvgShedder will distribute the load evenly between the highest and
lowest brokers
+ conf.setMaxUnloadPercentage(0.5);
+
+ // Set the high threshold to 40% and hit count high threshold to 2
+ int hitCountForHighThreshold = 2;
+ conf.setLoadBalancerAvgShedderHighThreshold(40);
+
conf.setLoadBalancerAvgShedderHitCountHighThreshold(hitCountForHighThreshold);
+ brokerData1.getLocalData().setCpu(new ResourceUsage(80, 100));
+ brokerData2.getLocalData().setCpu(new ResourceUsage(30, 100));
+ brokerData1.getLocalData().setMsgRateIn(10000);
+ brokerData1.getLocalData().setMsgRateOut(10000);
+ brokerData2.getLocalData().setMsgRateIn(1000);
+ brokerData2.getLocalData().setMsgRateOut(1000);
+
+ // broker3 is in the middle
+ brokerData3.getLocalData().setCpu(new ResourceUsage(50, 100));
+ brokerData3.getLocalData().setMsgRateIn(5000);
+ brokerData3.getLocalData().setMsgRateOut(5000);
+
+ // expect to shed bundles with message rate(in+out)
((10000+10000)-(1000+1000))/2 = 9000
+ // each bundle with 450 msg rate in and 450 msg rate out
+ // so 9000/(450+450)=10 bundles will be shed
+ for (int i = 0; i < 11; i++) {
+ brokerData1.getLocalData().getBundles().add("bundle-" + i);
+ BundleData bundle = new BundleData();
+ TimeAverageMessageData timeAverageMessageData = new
TimeAverageMessageData();
+ timeAverageMessageData.setMsgRateIn(450);
+ timeAverageMessageData.setMsgRateOut(450);
+ // as AvgShedder map BundleData to broker, the hashCode of
different BundleData should be different
+ // so we need to set some different fields to make the hashCode
different
+ timeAverageMessageData.setNumSamples(i);
+ bundle.setShortTermData(timeAverageMessageData);
+ loadData.getBundleData().put("bundle-" + i, bundle);
+ }
+
+ // do shedding for the first time, expect to shed nothing because hit
count is not enough
+ Multimap<String, String> bundlesToUnload =
avgShedder.findBundlesForUnloading(loadData, conf);
+ assertEquals(bundlesToUnload.size(), 0);
+
+ // do shedding for the second time, expect to shed 10 bundles
+ bundlesToUnload = avgShedder.findBundlesForUnloading(loadData, conf);
+ assertEquals(bundlesToUnload.size(), 10);
+
+ // assert that all the bundles are shed from broker1
+ for (String broker : bundlesToUnload.keys()) {
+ assertEquals(broker, "broker1");
+ }
+ // assert that all the bundles are shed to broker2
+ for (String bundle : bundlesToUnload.values()) {
+ BundleData bundleData = loadData.getBundleData().get(bundle);
+
assertEquals(avgShedder.selectBroker(loadData.getBrokerData().keySet(),
bundleData, loadData, conf).get(), "broker2");
+ }
+ }
+
+ @Test
+ public void testHitLowThreshold() {
+ LoadData loadData = new LoadData();
+ BrokerData brokerData1 = initBrokerData();
+ BrokerData brokerData2 = initBrokerData();
+ BrokerData brokerData3 = initBrokerData();
+ loadData.getBrokerData().put("broker1", brokerData1);
+ loadData.getBrokerData().put("broker2", brokerData2);
+ loadData.getBrokerData().put("broker3", brokerData3);
+ // AvgShedder will distribute the load evenly between the highest and
lowest brokers
+ conf.setMaxUnloadPercentage(0.5);
+
+ // Set the low threshold to 20% and hit count low threshold to 6
+ int hitCountForLowThreshold = 6;
+ conf.setLoadBalancerAvgShedderLowThreshold(20);
+
conf.setLoadBalancerAvgShedderHitCountLowThreshold(hitCountForLowThreshold);
+ brokerData1.getLocalData().setCpu(new ResourceUsage(60, 100));
+ brokerData2.getLocalData().setCpu(new ResourceUsage(40, 100));
+ brokerData1.getLocalData().setMsgRateIn(10000);
+ brokerData1.getLocalData().setMsgRateOut(10000);
+ brokerData2.getLocalData().setMsgRateIn(1000);
+ brokerData2.getLocalData().setMsgRateOut(1000);
+
+ // broker3 is in the middle
+ brokerData3.getLocalData().setCpu(new ResourceUsage(50, 100));
+ brokerData3.getLocalData().setMsgRateIn(5000);
+ brokerData3.getLocalData().setMsgRateOut(5000);
+
+ // expect to shed bundles with message rate(in+out)
((10000+10000)-(1000+1000))/2 = 9000
+ // each bundle with 450 msg rate in and 450 msg rate out
+ // so 9000/(450+450)=10 bundles will be shed
+ for (int i = 0; i < 11; i++) {
+ brokerData1.getLocalData().getBundles().add("bundle-" + i);
+ BundleData bundle = new BundleData();
+ TimeAverageMessageData timeAverageMessageData = new
TimeAverageMessageData();
+ timeAverageMessageData.setMsgRateIn(450);
+ timeAverageMessageData.setMsgRateOut(450);
+ // as AvgShedder map BundleData to broker, the hashCode of
different BundleData should be different
+ // so we need to set some different fields to make the hashCode
different
+ timeAverageMessageData.setNumSamples(i);
+ bundle.setShortTermData(timeAverageMessageData);
+ loadData.getBundleData().put("bundle-" + i, bundle);
+ }
+
+ // do shedding for (lowCountForHighThreshold - 1) times, expect to
shed nothing because hit count is not enough
+ Multimap<String, String> bundlesToUnload;
+ for (int i = 0; i < hitCountForLowThreshold - 1; i++) {
+ bundlesToUnload = avgShedder.findBundlesForUnloading(loadData,
conf);
+ assertEquals(bundlesToUnload.size(), 0);
+ }
+
+ // do shedding for the last time, expect to shed 10 bundles
+ bundlesToUnload = avgShedder.findBundlesForUnloading(loadData, conf);
+ assertEquals(bundlesToUnload.size(), 10);
+
+ // assert that all the bundles are shed from broker1
+ for (String broker : bundlesToUnload.keys()) {
+ assertEquals(broker, "broker1");
+ }
+ // assert that all the bundles are shed to broker2
+ for (String bundle : bundlesToUnload.values()) {
+ BundleData bundleData = loadData.getBundleData().get(bundle);
+
assertEquals(avgShedder.selectBroker(loadData.getBrokerData().keySet(),
bundleData, loadData, conf).get(), "broker2");
+ }
+ }
+
+ @Test
+ public void testSheddingMultiplePairs() {
+ LoadData loadData = new LoadData();
+ BrokerData brokerData1 = initBrokerData();
+ BrokerData brokerData2 = initBrokerData();
+ BrokerData brokerData3 = initBrokerData();
+ BrokerData brokerData4 = initBrokerData();
+ loadData.getBrokerData().put("broker1", brokerData1);
+ loadData.getBrokerData().put("broker2", brokerData2);
+ loadData.getBrokerData().put("broker3", brokerData3);
+ loadData.getBrokerData().put("broker4", brokerData4);
+ // AvgShedder will distribute the load evenly between the highest and
lowest brokers
+ conf.setMaxUnloadPercentage(0.5);
+
+ // Set the high threshold to 40% and hit count high threshold to 2
+ int hitCountForHighThreshold = 2;
+ conf.setLoadBalancerAvgShedderHighThreshold(40);
+
conf.setLoadBalancerAvgShedderHitCountHighThreshold(hitCountForHighThreshold);
+
+ // pair broker1 and broker2
+ brokerData1.getLocalData().setCpu(new ResourceUsage(80, 100));
+ brokerData2.getLocalData().setCpu(new ResourceUsage(30, 100));
+ brokerData1.getLocalData().setMsgRateIn(10000);
+ brokerData1.getLocalData().setMsgRateOut(10000);
+ brokerData2.getLocalData().setMsgRateIn(1000);
+ brokerData2.getLocalData().setMsgRateOut(1000);
+
+ // pair broker3 and broker4
+ brokerData3.getLocalData().setCpu(new ResourceUsage(75, 100));
+ brokerData3.getLocalData().setMsgRateIn(10000);
+ brokerData3.getLocalData().setMsgRateOut(10000);
+ brokerData4.getLocalData().setCpu(new ResourceUsage(35, 100));
+ brokerData4.getLocalData().setMsgRateIn(1000);
+ brokerData4.getLocalData().setMsgRateOut(1000);
+
+ // expect to shed bundles with message rate(in+out)
((10000+10000)-(1000+1000))/2 = 9000
+ // each bundle with 450 msg rate in and 450 msg rate out
+ // so 9000/(450+450)=10 bundles will be shed
+ for (int i = 0; i < 11; i++) {
+ brokerData1.getLocalData().getBundles().add("bundle1-" + i);
+ brokerData3.getLocalData().getBundles().add("bundle3-" + i);
+
+ BundleData bundle = new BundleData();
+ TimeAverageMessageData timeAverageMessageData = new
TimeAverageMessageData();
+ timeAverageMessageData.setMsgRateIn(450);
+ timeAverageMessageData.setMsgRateOut(450);
+ // as AvgShedder map BundleData to broker, the hashCode of
different BundleData should be different
+ // so we need to set some different fields to make the hashCode
different
+ timeAverageMessageData.setNumSamples(i);
+ bundle.setShortTermData(timeAverageMessageData);
+ loadData.getBundleData().put("bundle1-" + i, bundle);
+
+ bundle = new BundleData();
+ timeAverageMessageData = new TimeAverageMessageData();
+ timeAverageMessageData.setMsgRateIn(450);
+ timeAverageMessageData.setMsgRateOut(450);
+ timeAverageMessageData.setNumSamples(i+11);
+ bundle.setShortTermData(timeAverageMessageData);
+ loadData.getBundleData().put("bundle3-" + i, bundle);
+ }
+
+ // do shedding for the first time, expect to shed nothing because hit
count is not enough
+ Multimap<String, String> bundlesToUnload =
avgShedder.findBundlesForUnloading(loadData, conf);
+ assertEquals(bundlesToUnload.size(), 0);
+
+ // do shedding for the second time, expect to shed 10*2=20 bundles
+ bundlesToUnload = avgShedder.findBundlesForUnloading(loadData, conf);
+ assertEquals(bundlesToUnload.size(), 20);
+
+ // assert that half of the bundles are shed from broker1, and the
other half are shed from broker3
+ for (String broker : bundlesToUnload.keys()) {
+ if (broker.equals("broker1")) {
+ assertEquals(bundlesToUnload.get(broker).size(), 10);
+ } else if (broker.equals("broker3")) {
+ assertEquals(bundlesToUnload.get(broker).size(), 10);
+ } else {
+ fail();
+ }
+ }
+
+ // assert that all the bundles from broker1 are shed to broker2, and
all the bundles from broker3 are shed to broker4
+ for (String bundle : bundlesToUnload.values()) {
+ BundleData bundleData = loadData.getBundleData().get(bundle);
+ if (bundle.startsWith("bundle1-")) {
+
assertEquals(avgShedder.selectBroker(loadData.getBrokerData().keySet(),
bundleData, loadData, conf).get(), "broker2");
+ } else if (bundle.startsWith("bundle3-")) {
+
assertEquals(avgShedder.selectBroker(loadData.getBrokerData().keySet(),
bundleData, loadData, conf).get(), "broker4");
+ } else {
+ fail();
+ }
+ }
+ }
+}