This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fc86f3b5f06 [fix][broker] fix UniformLoadShedder seleet wrong
overloadbroker and underloadbroker (#21025)
fc86f3b5f06 is described below
commit fc86f3b5f066bc71840d42d1163e85f048216aa7
Author: AloysZhang <[email protected]>
AuthorDate: Tue Sep 12 16:25:42 2023 +0800
[fix][broker] fix UniformLoadShedder seleet wrong overloadbroker and
underloadbroker (#21025)
---
.../loadbalance/impl/UniformLoadShedder.java | 163 +++++++++++++--------
.../loadbalance/impl/UniformLoadShedderTest.java | 78 ++++++++++
2 files changed, 178 insertions(+), 63 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
index b92af5b7c69..e3055246f4b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
@@ -25,7 +25,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableDouble;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.commons.lang3.tuple.Triple;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
@@ -36,7 +36,7 @@ import
org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
/**
* This strategy tends to distribute load uniformly across all brokers. This
strategy checks load difference between
- * broker with highest load and broker with lowest load. If the difference is
higher than configured thresholds
+ * broker with the highest load and broker with the lowest load. If the
difference is higher than configured thresholds
* {@link
ServiceConfiguration#getLoadBalancerMsgRateDifferenceShedderThreshold()} or
* {@link
ServiceConfiguration#getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold()}
then it finds out
* bundles which can be unloaded to distribute traffic evenly across all
brokers.
@@ -63,25 +63,37 @@ public class UniformLoadShedder implements
LoadSheddingStrategy {
Map<String, BundleData> loadBundleData =
loadData.getBundleDataForLoadShedding();
Map<String, Long> recentlyUnloadedBundles =
loadData.getRecentlyUnloadedBundles();
- MutableObject<String> overloadedBroker = new MutableObject<>();
- MutableObject<String> underloadedBroker = new MutableObject<>();
+ MutableObject<String> msgRateOverloadedBroker = new MutableObject<>();
+ MutableObject<String> msgThroughputOverloadedBroker = new
MutableObject<>();
+ MutableObject<String> msgRateUnderloadedBroker = new MutableObject<>();
+ MutableObject<String> msgThroughputUnderloadedBroker = new
MutableObject<>();
MutableDouble maxMsgRate = new MutableDouble(-1);
- MutableDouble maxThroughputRate = new MutableDouble(-1);
+ MutableDouble maxThroughput = new MutableDouble(-1);
MutableDouble minMsgRate = new MutableDouble(Integer.MAX_VALUE);
- MutableDouble minThroughputRate = new MutableDouble(Integer.MAX_VALUE);
+ MutableDouble minThroughput = new MutableDouble(Integer.MAX_VALUE);
+
brokersData.forEach((broker, data) -> {
double msgRate = data.getLocalData().getMsgRateIn() +
data.getLocalData().getMsgRateOut();
double throughputRate = data.getLocalData().getMsgThroughputIn()
+ data.getLocalData().getMsgThroughputOut();
- if (msgRate > maxMsgRate.getValue() || throughputRate >
maxThroughputRate.getValue()) {
- overloadedBroker.setValue(broker);
+ if (msgRate > maxMsgRate.getValue()) {
+ msgRateOverloadedBroker.setValue(broker);
maxMsgRate.setValue(msgRate);
- maxThroughputRate.setValue(throughputRate);
}
- if (msgRate < minMsgRate.getValue() || throughputRate <
minThroughputRate.getValue()) {
- underloadedBroker.setValue(broker);
+
+ if (throughputRate > maxThroughput.getValue()) {
+ msgThroughputOverloadedBroker.setValue(broker);
+ maxThroughput.setValue(throughputRate);
+ }
+
+ if (msgRate < minMsgRate.getValue()) {
+ msgRateUnderloadedBroker.setValue(broker);
minMsgRate.setValue(msgRate);
- minThroughputRate.setValue(throughputRate);
+ }
+
+ if (throughputRate < minThroughput.getValue()) {
+ msgThroughputUnderloadedBroker.setValue(broker);
+ minThroughput.setValue(throughputRate);
}
});
@@ -91,12 +103,12 @@ public class UniformLoadShedder implements
LoadSheddingStrategy {
if (minMsgRate.getValue() <= EPS && minMsgRate.getValue() >= -EPS) {
minMsgRate.setValue(1.0);
}
- if (minThroughputRate.getValue() <= EPS &&
minThroughputRate.getValue() >= -EPS) {
- minThroughputRate.setValue(1.0);
+ if (minThroughput.getValue() <= EPS && minThroughput.getValue() >=
-EPS) {
+ minThroughput.setValue(1.0);
}
double msgRateDifferencePercentage = ((maxMsgRate.getValue() -
minMsgRate.getValue()) * 100)
/ (minMsgRate.getValue());
- double msgThroughputDifferenceRate = maxThroughputRate.getValue() /
minThroughputRate.getValue();
+ double msgThroughputDifferenceRate = maxThroughput.getValue() /
minThroughput.getValue();
// if the threshold matches then find out how much load needs to be
unloaded by considering number of msgRate
// and throughput.
@@ -105,66 +117,91 @@ public class UniformLoadShedder implements
LoadSheddingStrategy {
boolean isMsgThroughputThresholdExceeded = conf
.getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold() > 0
&& msgThroughputDifferenceRate > conf
-
.getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold();
+
.getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold();
if (isMsgRateThresholdExceeded || isMsgThroughputThresholdExceeded) {
- if (log.isDebugEnabled()) {
- log.debug(
- "Found bundles for uniform load balancing. "
- + "overloaded broker {} with
(msgRate,throughput)= ({},{}) "
- + "and underloaded broker {} with
(msgRate,throughput)= ({},{})",
- overloadedBroker.getValue(), maxMsgRate.getValue(),
maxThroughputRate.getValue(),
- underloadedBroker.getValue(), minMsgRate.getValue(),
minThroughputRate.getValue());
- }
MutableInt msgRateRequiredFromUnloadedBundles = new MutableInt(
(int) ((maxMsgRate.getValue() - minMsgRate.getValue()) *
conf.getMaxUnloadPercentage()));
MutableInt msgThroughputRequiredFromUnloadedBundles = new
MutableInt(
- (int) ((maxThroughputRate.getValue() -
minThroughputRate.getValue())
+ (int) ((maxThroughput.getValue() -
minThroughput.getValue())
* conf.getMaxUnloadPercentage()));
- LocalBrokerData overloadedBrokerData =
brokersData.get(overloadedBroker.getValue()).getLocalData();
-
- if (overloadedBrokerData.getBundles().size() > 1
- && (msgRateRequiredFromUnloadedBundles.getValue() >=
conf.getMinUnloadMessage()
- || msgThroughputRequiredFromUnloadedBundles.getValue() >=
conf.getMinUnloadMessageThroughput())) {
- // Sort bundles by throughput, then pick the bundle which can
help to reduce load uniformly with
- // under-loaded broker
- loadBundleData.entrySet().stream()
- .filter(e ->
overloadedBrokerData.getBundles().contains(e.getKey()))
- .map((e) -> {
- String bundle = e.getKey();
- BundleData bundleData = e.getValue();
- TimeAverageMessageData shortTermData =
bundleData.getShortTermData();
- double throughput = isMsgRateThresholdExceeded
- ? shortTermData.getMsgRateIn() +
shortTermData.getMsgRateOut()
- : shortTermData.getMsgThroughputIn() +
shortTermData.getMsgThroughputOut();
- return Triple.of(bundle, bundleData, throughput);
- }).filter(e ->
!recentlyUnloadedBundles.containsKey(e.getLeft()))
- .sorted((e1, e2) -> Double.compare(e2.getRight(),
e1.getRight())).forEach((e) -> {
- if (conf.getMaxUnloadBundleNumPerShedding() != -1
- && selectedBundlesCache.size() >=
conf.getMaxUnloadBundleNumPerShedding()) {
- return;
- }
- String bundle = e.getLeft();
- BundleData bundleData = e.getMiddle();
- TimeAverageMessageData shortTermData =
bundleData.getShortTermData();
- double throughput =
shortTermData.getMsgThroughputIn()
- + shortTermData.getMsgThroughputOut();
- double bundleMsgRate =
shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut();
- if (isMsgRateThresholdExceeded) {
+ if (isMsgRateThresholdExceeded) {
+ if (log.isDebugEnabled()) {
+ log.debug("Found bundles for uniform load balancing. "
+ + "msgRate overloaded broker: {} with
msgRate: {}, "
+ + "msgRate underloaded broker: {} with
msgRate: {}",
+ msgRateOverloadedBroker.getValue(),
maxMsgRate.getValue(),
+ msgRateUnderloadedBroker.getValue(),
minMsgRate.getValue());
+ }
+ LocalBrokerData overloadedBrokerData =
+
brokersData.get(msgRateOverloadedBroker.getValue()).getLocalData();
+ if (overloadedBrokerData.getBundles().size() > 1
+ && (msgRateRequiredFromUnloadedBundles.getValue() >=
conf.getMinUnloadMessage())) {
+ // Sort bundles by throughput, then pick the bundle which
can help to reduce load uniformly with
+ // under-loaded broker
+ loadBundleData.entrySet().stream()
+ .filter(e ->
overloadedBrokerData.getBundles().contains(e.getKey()))
+ .map((e) -> {
+ String bundle = e.getKey();
+ TimeAverageMessageData shortTermData =
e.getValue().getShortTermData();
+ double msgRate = shortTermData.getMsgRateIn()
+ shortTermData.getMsgRateOut();
+ return Pair.of(bundle, msgRate);
+ }).filter(e ->
!recentlyUnloadedBundles.containsKey(e.getLeft()))
+ .sorted((e1, e2) -> Double.compare(e2.getRight(),
e1.getRight())).forEach((e) -> {
+ if (conf.getMaxUnloadBundleNumPerShedding() !=
-1
+ && selectedBundlesCache.size() >=
conf.getMaxUnloadBundleNumPerShedding()) {
+ return;
+ }
+ String bundle = e.getLeft();
+ double bundleMsgRate = e.getRight();
if (bundleMsgRate <=
(msgRateRequiredFromUnloadedBundles.getValue()
+ 1000/* delta */)) {
log.info("Found bundle to unload with
msgRate {}", bundleMsgRate);
msgRateRequiredFromUnloadedBundles.add(-bundleMsgRate);
-
selectedBundlesCache.put(overloadedBroker.getValue(), bundle);
+
selectedBundlesCache.put(msgRateOverloadedBroker.getValue(), bundle);
}
- } else {
- if (throughput <=
(msgThroughputRequiredFromUnloadedBundles.getValue())) {
- log.info("Found bundle to unload with
throughput {}", throughput);
-
msgThroughputRequiredFromUnloadedBundles.add(-throughput);
-
selectedBundlesCache.put(overloadedBroker.getValue(), bundle);
+ });
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Found bundles for uniform load balancing. "
+ + "msgThroughput overloaded broker: {}
with msgThroughput {}, "
+ + "msgThroughput underloaded broker: {}
with msgThroughput: {}",
+ msgThroughputOverloadedBroker.getValue(),
maxThroughput.getValue(),
+ msgThroughputUnderloadedBroker.getValue(),
minThroughput.getValue());
+ }
+ LocalBrokerData overloadedBrokerData =
+
brokersData.get(msgThroughputOverloadedBroker.getValue()).getLocalData();
+ if (overloadedBrokerData.getBundles().size() > 1
+ &&
+ msgThroughputRequiredFromUnloadedBundles.getValue() >=
conf.getMinUnloadMessageThroughput()) {
+ // Sort bundles by throughput, then pick the bundle which
can help to reduce load uniformly with
+ // under-loaded broker
+ loadBundleData.entrySet().stream()
+ .filter(e ->
overloadedBrokerData.getBundles().contains(e.getKey()))
+ .map((e) -> {
+ String bundle = e.getKey();
+ TimeAverageMessageData shortTermData =
e.getValue().getShortTermData();
+ double msgThroughtput =
shortTermData.getMsgThroughputIn()
+ + shortTermData.getMsgThroughputOut();
+ return Pair.of(bundle, msgThroughtput);
+ }).filter(e ->
!recentlyUnloadedBundles.containsKey(e.getLeft()))
+ .sorted((e1, e2) -> Double.compare(e2.getRight(),
e1.getRight())).forEach((e) -> {
+ if (conf.getMaxUnloadBundleNumPerShedding() !=
-1
+ && selectedBundlesCache.size() >=
conf.getMaxUnloadBundleNumPerShedding()) {
+ return;
}
- }
- });
+ String bundle = e.getLeft();
+ double msgThroughput = e.getRight();
+ if (msgThroughput <=
(msgThroughputRequiredFromUnloadedBundles.getValue()
+ + 1000/* delta */)) {
+ log.info("Found bundle to unload with
msgThroughput {}", msgThroughput);
+
msgThroughputRequiredFromUnloadedBundles.add(-msgThroughput);
+
selectedBundlesCache.put(msgThroughputOverloadedBroker.getValue(), bundle);
+ }
+ });
+ }
+
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedderTest.java
index 00182fffb8a..4b4042cf31a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedderTest.java
@@ -26,6 +26,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
@Test(groups = "broker")
public class UniformLoadShedderTest {
@@ -119,4 +120,81 @@ public class UniformLoadShedderTest {
assertFalse(bundlesToUnload.isEmpty());
}
+ @Test
+ public void testOverloadBrokerSelect() {
+ conf.setMaxUnloadBundleNumPerShedding(1);
+ conf.setMaxUnloadPercentage(0.5);
+ int numBrokers = 5;
+ int numBundles = 5;
+ LoadData loadData = new LoadData();
+
+ LocalBrokerData[] localBrokerDatas = new LocalBrokerData[]{
+ new LocalBrokerData(),
+ new LocalBrokerData(),
+ new LocalBrokerData(),
+ new LocalBrokerData(),
+ new LocalBrokerData()};
+
+ String[] brokerNames = new String[]{"broker0", "broker1", "broker2",
"broker3", "broker4"};
+
+ double[] brokerMsgRates = new double[]{
+ 50000, // broker0
+ 60000, // broker1
+ 70000, // broker2
+ 10000, // broker3
+ 20000};// broker4
+
+ double[] brokerMsgThroughputs = new double[]{
+ 50 * 1024 * 1024, // broker0
+ 60 * 1024 * 1024, // broker1
+ 70 * 1024 * 1024, // broker2
+ 80 * 1024 * 1024, // broker3
+ 10 * 1024 * 1024};// broker4
+
+
+ for (int brokerId = 0; brokerId < numBrokers; brokerId++) {
+ double msgRate = brokerMsgRates[brokerId] / numBundles;
+ double throughput = brokerMsgThroughputs[brokerId] / numBundles;
+ for (int i = 0; i < numBundles; ++i) {
+ String bundleName = "broker-" + brokerId + "-bundle-" + i;
+ localBrokerDatas[brokerId].getBundles().add(bundleName);
+
localBrokerDatas[brokerId].setMsgRateIn(brokerMsgRates[brokerId]);
+
localBrokerDatas[brokerId].setMsgThroughputIn(brokerMsgThroughputs[brokerId]);
+ BundleData bundle = new BundleData();
+
+ TimeAverageMessageData timeAverageMessageData = new
TimeAverageMessageData();
+ timeAverageMessageData.setMsgRateIn(msgRate);
+ timeAverageMessageData.setMsgThroughputIn(throughput);
+ bundle.setShortTermData(timeAverageMessageData);
+ loadData.getBundleData().put(bundleName, bundle);
+ }
+ loadData.getBrokerData().put(brokerNames[brokerId], new
BrokerData(localBrokerDatas[brokerId]));
+ }
+
+ // disable throughput based load shedding, enable rate based load
shedding only
+ conf.setLoadBalancerMsgRateDifferenceShedderThreshold(50);
+
conf.setLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold(0);
+
+ Multimap<String, String> bundlesToUnload =
uniformLoadShedder.findBundlesForUnloading(loadData, conf);
+ assertEquals(bundlesToUnload.size(), 1);
+ assertTrue(bundlesToUnload.containsKey("broker2"));
+
+
+ // disable rate based load shedding, enable throughput based load
shedding only
+ conf.setLoadBalancerMsgRateDifferenceShedderThreshold(0);
+
conf.setLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold(2);
+
+ bundlesToUnload = uniformLoadShedder.findBundlesForUnloading(loadData,
conf);
+ assertEquals(bundlesToUnload.size(), 1);
+ assertTrue(bundlesToUnload.containsKey("broker3"));
+
+ // enable both rate and throughput based load shedding, but rate based
load shedding has higher priority
+ conf.setLoadBalancerMsgRateDifferenceShedderThreshold(50);
+
conf.setLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold(2);
+
+ bundlesToUnload = uniformLoadShedder.findBundlesForUnloading(loadData,
conf);
+ assertEquals(bundlesToUnload.size(), 1);
+ assertTrue(bundlesToUnload.containsKey("broker2"));
+ }
+
}