This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new e90f0925bb1 [feat][broker] Support lower boundary shedding for
ThresholdShedder (#17456)
e90f0925bb1 is described below
commit e90f0925bb182a777e1a308d1d0781813024d479
Author: feynmanlin <[email protected]>
AuthorDate: Fri Sep 23 09:19:12 2022 +0800
[feat][broker] Support lower boundary shedding for ThresholdShedder (#17456)
Support lower boundary shedding for ThresholdShedder (#17456)
The existing ThresholdShedder has the following problems, for example:
There are 11 Brokers, of which 10 are loaded at 80% and 1 is loaded at 0%.
The average load is 80 * 10 / 11 = 72.73, and the threshold to unload is
72.73 + 10 = 82.73.
Since 80 < 82.73, unload will not be trigger, and there is one idle Broker
with load of 0%.
On the basis of ThresholdShedder, we adds the lower boundary judgment of
the load.
When 【current usage < average usage - threshold】, the broker with the
highest load will be triggered to unload
---
.../apache/pulsar/broker/ServiceConfiguration.java | 7 ++
.../broker/loadbalance/impl/ThresholdShedder.java | 113 ++++++++++++++++-----
.../loadbalance/impl/ThresholdShedderTest.java | 111 ++++++++++++++++++++
3 files changed, 207 insertions(+), 24 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 06e193a4131..5942ee9a8e9 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2074,6 +2074,13 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private String loadBalancerLoadSheddingStrategy =
"org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder";
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "When [current usage < average usage - threshold], "
+ + "the broker with the highest load will be triggered to
unload"
+ )
+ private boolean lowerBoundarySheddingEnabled = false;
+
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
doc = "load balance placement strategy"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
index 05c69062663..928f045369c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
@@ -52,6 +52,9 @@ public class ThresholdShedder implements LoadSheddingStrategy
{
private static final Logger log =
LoggerFactory.getLogger(ThresholdShedder.class);
private final Multimap<String, String> selectedBundlesCache =
ArrayListMultimap.create();
private static final double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05;
+
+ private static final double LOWER_BOUNDARY_THRESHOLD_MARGIN = 0.5;
+
private static final double MB = 1024 * 1024;
private static final long LOAD_LOG_SAMPLE_DELAY_IN_SEC = 5 * 60; // 5 mins
@@ -80,8 +83,7 @@ public class ThresholdShedder implements LoadSheddingStrategy
{
final Map<String, Long> recentlyUnloadedBundles =
loadData.getRecentlyUnloadedBundles();
final double minThroughputThreshold =
conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
- final double avgUsage = getBrokerAvgUsage(
- loadData, conf.getLoadBalancerHistoryResourcePercentage(),
conf, sampleLog);
+ final double avgUsage = getBrokerAvgUsage(loadData, conf, sampleLog);
if (sampleLog) {
log.info("brokers' resource avgUsage:{}%", toPercentage(avgUsage));
}
@@ -122,17 +124,34 @@ public class ThresholdShedder implements
LoadSheddingStrategy {
broker, 100 * currentUsage, 100 * avgUsage, 100 *
threshold, minimumThroughputToOffload / MB,
(brokerCurrentThroughput - minimumThroughputToOffload) /
MB);
- MutableDouble trafficMarkedToOffload = new MutableDouble(0);
- MutableBoolean atLeastOneBundleSelected = new
MutableBoolean(false);
-
if (localData.getBundles().size() > 1) {
- loadData.getBundleDataForLoadShedding().entrySet().stream()
- .map((e) -> {
- String bundle = e.getKey();
- BundleData bundleData = e.getValue();
- TimeAverageMessageData shortTermData =
bundleData.getShortTermData();
- double throughput = shortTermData.getMsgThroughputIn()
+ shortTermData.getMsgThroughputOut();
- return Pair.of(bundle, throughput);
+ filterAndSelectBundle(loadData, recentlyUnloadedBundles,
broker, localData, minimumThroughputToOffload);
+ } else if (localData.getBundles().size() == 1) {
+ log.warn(
+ "HIGH USAGE WARNING : Sole namespace bundle {} is
overloading broker {}. "
+ + "No Load Shedding will be done on this
broker",
+ localData.getBundles().iterator().next(), broker);
+ } else {
+ log.warn("Broker {} is overloaded despite having no bundles",
broker);
+ }
+ });
+ if (selectedBundlesCache.isEmpty() &&
conf.isLowerBoundarySheddingEnabled()) {
+ tryLowerBoundaryShedding(loadData, conf);
+ }
+ return selectedBundlesCache;
+ }
+
+ private void filterAndSelectBundle(LoadData loadData, Map<String, Long>
recentlyUnloadedBundles, String broker,
+ LocalBrokerData localData, double
minimumThroughputToOffload) {
+ MutableDouble trafficMarkedToOffload = new MutableDouble(0);
+ MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);
+ loadData.getBundleDataForLoadShedding().entrySet().stream()
+ .map((e) -> {
+ String bundle = e.getKey();
+ BundleData bundleData = e.getValue();
+ TimeAverageMessageData shortTermData =
bundleData.getShortTermData();
+ double throughput = shortTermData.getMsgThroughputIn() +
shortTermData.getMsgThroughputOut();
+ return Pair.of(bundle, throughput);
}).filter(e ->
!recentlyUnloadedBundles.containsKey(e.getLeft())
).filter(e ->
@@ -147,21 +166,11 @@ public class ThresholdShedder implements
LoadSheddingStrategy {
atLeastOneBundleSelected.setTrue();
}
});
- } else if (localData.getBundles().size() == 1) {
- log.warn(
- "HIGH USAGE WARNING : Sole namespace bundle {} is
overloading broker {}. "
- + "No Load Shedding will be done on this
broker",
- localData.getBundles().iterator().next(), broker);
- } else {
- log.warn("Broker {} is overloaded despite having no bundles",
broker);
- }
- });
-
- return selectedBundlesCache;
}
- private double getBrokerAvgUsage(final LoadData loadData, final double
historyPercentage,
+ private double getBrokerAvgUsage(final LoadData loadData,
final ServiceConfiguration conf, boolean
sampleLog) {
+ double historyPercentage =
conf.getLoadBalancerHistoryResourcePercentage();
double totalUsage = 0.0;
int totalBrokers = 0;
@@ -227,4 +236,60 @@ public class ThresholdShedder implements
LoadSheddingStrategy {
return historyUsage;
}
+ private void tryLowerBoundaryShedding(LoadData loadData,
ServiceConfiguration conf) {
+ // Select the broker with the most resource usage.
+ final double threshold =
conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+ final double avgUsage = getBrokerAvgUsage(loadData, conf,
canSampleLog());
+ Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold,
avgUsage);
+ boolean hasBrokerBelowLowerBound = result.getLeft();
+ String maxUsageBroker = result.getRight();
+ BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+ if (brokerData == null) {
+ log.info("Load data is null or bundle <=1, skipping bundle
unload.");
+ return;
+ }
+ if (!hasBrokerBelowLowerBound) {
+ log.info("No broker is below the lower bound, threshold is {}, "
+ + "avgUsage usage is {}, max usage of Broker {} is
{}",
+ threshold, avgUsage, maxUsageBroker,
+ brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+ return;
+ }
+ LocalBrokerData localData = brokerData.getLocalData();
+ double brokerCurrentThroughput = localData.getMsgThroughputIn() +
localData.getMsgThroughputOut();
+ double minimumThroughputToOffload = brokerCurrentThroughput *
threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
+ double minThroughputThreshold =
conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+ if (minThroughputThreshold > minimumThroughputToOffload) {
+ log.info("broker {} in lower boundary shedding is planning to shed
throughput {} MByte/s less than "
+ + "minimumThroughputThreshold {} MByte/s, skipping
bundle unload.",
+ maxUsageBroker, minimumThroughputToOffload / MB,
minThroughputThreshold / MB);
+ return;
+ }
+ filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(),
maxUsageBroker, localData,
+ minimumThroughputToOffload);
+ }
+
+ private Pair<Boolean, String> getMaxUsageBroker(
+ LoadData loadData, double threshold, double avgUsage) {
+ String maxUsageBrokerName = "";
+ double maxUsage = avgUsage - threshold;
+ boolean hasBrokerBelowLowerBound = false;
+ for (Map.Entry<String, BrokerData> entry :
loadData.getBrokerData().entrySet()) {
+ String broker = entry.getKey();
+ BrokerData brokerData = entry.getValue();
+ double currentUsage = brokerAvgResourceUsage.getOrDefault(broker,
0.0);
+ // Select the broker with the most resource usage.
+ if (currentUsage > maxUsage && brokerData.getLocalData() != null
+ && brokerData.getLocalData().getBundles().size() > 1) {
+ maxUsage = currentUsage;
+ maxUsageBrokerName = broker;
+ }
+ // Whether any brokers with low usage in the cluster.
+ if (currentUsage < avgUsage - threshold) {
+ hasBrokerBelowLowerBound = true;
+ }
+ }
+ return Pair.of(hasBrokerBelowLowerBound, maxUsageBrokerName);
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java
index 80760a7afc3..8461f8ce74c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java
@@ -47,6 +47,7 @@ public class ThresholdShedderTest {
@BeforeMethod
public void setup() {
+ conf.setLowerBoundarySheddingEnabled(false);
thresholdShedder = new ThresholdShedder();
}
@@ -224,4 +225,114 @@ public class ThresholdShedderTest {
assertEquals(data.printResourceUsage(),
"cpu: 10.00%, memory: 50.00%, directMemory: 90.00%, bandwidthIn:
30.00%, bandwidthOut: 20.00%");
}
+
+ @Test
+ public void testLowerBoundaryShedding() {
+ int numBundles = 10;
+ int brokerNum = 11;
+ int lowLoadNode = 10;
+ LoadData loadData = new LoadData();
+ double throughput = 100 * 1024 * 1024;
+ //There are 11 Brokers, of which 10 are loaded at 80% and 1 is loaded
at 0%.
+ //At this time, the average load is 80*10/11 = 72.73, and the
threshold for rebalancing is 72.73 + 10 = 82.73.
+ //Since 80 < 82.73, rebalancing will not be trigger, and there is one
Broker with load of 0.
+ for (int i = 0; i < brokerNum; i++) {
+ LocalBrokerData broker = new LocalBrokerData();
+ for (int j = 0; j < numBundles; j++) {
+ broker.getBundles().add("bundle-" + j);
+ BundleData bundle = new BundleData();
+ TimeAverageMessageData timeAverageMessageData = new
TimeAverageMessageData();
+ timeAverageMessageData.setMsgThroughputIn(i == lowLoadNode ? 0
: throughput);
+ timeAverageMessageData.setMsgThroughputOut(i == lowLoadNode ?
0 : throughput);
+ bundle.setShortTermData(timeAverageMessageData);
+ String broker2BundleName = "broker-" + i + "-bundle-" + j;
+ loadData.getBundleData().put(broker2BundleName, bundle);
+ broker.getBundles().add(broker2BundleName);
+ }
+ broker.setBandwidthIn(new ResourceUsage(i == lowLoadNode ? 0 : 80,
100));
+ broker.setBandwidthOut(new ResourceUsage(i == lowLoadNode ? 0 :
80, 100));
+ broker.setMsgThroughputIn(i == lowLoadNode ? 0 : throughput);
+ broker.setMsgThroughputOut(i == lowLoadNode ? 0 : throughput);
+ loadData.getBrokerData().put("broker-" + i, new
BrokerData(broker));
+ }
+ ThresholdShedder shedder = new ThresholdShedder();
+ Multimap<String, String> bundlesToUnload =
shedder.findBundlesForUnloading(loadData, conf);
+ assertTrue(bundlesToUnload.isEmpty());
+ conf.setLowerBoundarySheddingEnabled(true);
+ bundlesToUnload = thresholdShedder.findBundlesForUnloading(loadData,
conf);
+ assertFalse(bundlesToUnload.isEmpty());
+ }
+
+ @Test
+ public void testLowerBoundarySheddingNoBrokerToOffload() {
+ int numBundles = 10;
+ int brokerNum = 11;
+ LoadData loadData = new LoadData();
+ double throughput = 80 * 1024 * 1024;
+ //Load of all Brokers are 80%, and no Broker needs to offload.
+ for (int i = 0; i < brokerNum; i++) {
+ LocalBrokerData broker = new LocalBrokerData();
+ for (int j = 0; j < numBundles; j++) {
+ broker.getBundles().add("bundle-" + j);
+ BundleData bundle = new BundleData();
+ TimeAverageMessageData timeAverageMessageData = new
TimeAverageMessageData();
+ timeAverageMessageData.setMsgThroughputIn(throughput);
+ timeAverageMessageData.setMsgThroughputOut(throughput);
+ bundle.setShortTermData(timeAverageMessageData);
+ String broker2BundleName = "broker-" + i + "-bundle-" + j;
+ loadData.getBundleData().put(broker2BundleName, bundle);
+ broker.getBundles().add(broker2BundleName);
+ }
+ broker.setBandwidthIn(new ResourceUsage(80, 100));
+ broker.setBandwidthOut(new ResourceUsage(80, 100));
+ broker.setMsgThroughputIn(throughput);
+ broker.setMsgThroughputOut(throughput);
+ loadData.getBrokerData().put("broker-" + i, new
BrokerData(broker));
+ }
+ ThresholdShedder shedder = new ThresholdShedder();
+ Multimap<String, String> bundlesToUnload =
shedder.findBundlesForUnloading(loadData, conf);
+ assertTrue(bundlesToUnload.isEmpty());
+ conf.setLowerBoundarySheddingEnabled(true);
+ bundlesToUnload = thresholdShedder.findBundlesForUnloading(loadData,
conf);
+ assertTrue(bundlesToUnload.isEmpty());
+ }
+
+ @Test
+ public void testLowerBoundarySheddingBrokerWithOneBundle() {
+ int brokerNum = 11;
+ int lowLoadNode = 5;
+ int brokerWithManyBundles = 3;
+ LoadData loadData = new LoadData();
+ double throughput = 100 * 1024 * 1024;
+ //There are 11 Brokers, of which 10 are loaded at 80% and 1 is loaded
at 0%.
+ //Only broker3 has 10 bundles.
+ for (int i = 0; i < brokerNum; i++) {
+ LocalBrokerData broker = new LocalBrokerData();
+ //Broker3 has 10 bundles
+ int numBundles = i == brokerWithManyBundles ? 10 : 1;
+ for (int j = 0; j < numBundles; j++) {
+ BundleData bundle = new BundleData();
+ TimeAverageMessageData timeAverageMessageData = new
TimeAverageMessageData();
+ timeAverageMessageData.setMsgThroughputIn(i == lowLoadNode ? 0
: throughput);
+ timeAverageMessageData.setMsgThroughputOut(i == lowLoadNode ?
0 : throughput);
+ bundle.setShortTermData(timeAverageMessageData);
+ String broker2BundleName = "broker-" + i + "-bundle-" + j;
+ loadData.getBundleData().put(broker2BundleName, bundle);
+ broker.getBundles().add(broker2BundleName);
+ }
+ broker.setBandwidthIn(new ResourceUsage(i == lowLoadNode ? 0 : 80,
100));
+ broker.setBandwidthOut(new ResourceUsage(i == lowLoadNode ? 0 :
80, 100));
+ broker.setMsgThroughputIn(i == lowLoadNode ? 0 : throughput);
+ broker.setMsgThroughputOut(i == lowLoadNode ? 0 : throughput);
+ loadData.getBrokerData().put("broker-" + i, new
BrokerData(broker));
+ }
+ ThresholdShedder shedder = new ThresholdShedder();
+ Multimap<String, String> bundlesToUnload =
shedder.findBundlesForUnloading(loadData, conf);
+ assertTrue(bundlesToUnload.isEmpty());
+ conf.setLowerBoundarySheddingEnabled(true);
+ bundlesToUnload = thresholdShedder.findBundlesForUnloading(loadData,
conf);
+ assertFalse(bundlesToUnload.isEmpty());
+ assertEquals(bundlesToUnload.size(), 1);
+ assertTrue(bundlesToUnload.containsKey("broker-3"));
+ }
}