This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 33cc1ca82d1 [fix][broker]Backport fix UniformLoadShedder selecet wrong
overloadbroker and underloadbroker (#21182)
33cc1ca82d1 is described below
commit 33cc1ca82d10dfd8fb9e88163a342a3e5cf350ea
Author: AloysZhang <[email protected]>
AuthorDate: Tue Sep 19 18:08:19 2023 +0800
[fix][broker]Backport fix UniformLoadShedder selecet wrong overloadbroker
and underloadbroker (#21182)
---
.../loadbalance/impl/UniformLoadShedder.java | 151 +++++++++++++--------
1 file changed, 92 insertions(+), 59 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
index df8e1b66509..dd51a17c186 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
@@ -25,7 +25,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableDouble;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.commons.lang3.tuple.Triple;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -36,9 +36,9 @@ import
org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
/**
* This strategy tends to distribute load uniformly across all brokers. This
strategy checks load difference between
- * broker with highest load and broker with lowest load. If the difference is
higher than configured thresholds
+ * broker with the highest load and broker with the lowest load. If the
difference is higher than configured thresholds
* {@link
ServiceConfiguration#getLoadBalancerMsgRateDifferenceShedderThreshold()} or
- * {@link
ServiceConfiguration#loadBalancerMsgThroughputMultiplierDifferenceShedderThreshold()}
then it finds out
+ * {@link
ServiceConfiguration#getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold()}
then it finds out
* bundles which can be unloaded to distribute traffic evenly across all
brokers.
*
*/
@@ -68,12 +68,15 @@ public class UniformLoadShedder implements
LoadSheddingStrategy {
Map<String, BundleData> loadBundleData =
loadData.getBundleDataForLoadShedding();
Map<String, Long> recentlyUnloadedBundles =
loadData.getRecentlyUnloadedBundles();
- MutableObject<String> overloadedBroker = new MutableObject<>();
- MutableObject<String> underloadedBroker = new MutableObject<>();
+ MutableObject<String> msgRateOverloadedBroker = new MutableObject<>();
+ MutableObject<String> msgThroughputOverloadedBroker = new
MutableObject<>();
+ MutableObject<String> msgRateUnderloadedBroker = new MutableObject<>();
+ MutableObject<String> msgThroughputUnderloadedBroker = new
MutableObject<>();
MutableDouble maxMsgRate = new MutableDouble(-1);
- MutableDouble maxThroughputRate = new MutableDouble(-1);
+ MutableDouble maxThroughput = new MutableDouble(-1);
MutableDouble minMsgRate = new MutableDouble(Integer.MAX_VALUE);
- MutableDouble minThroughputRate = new MutableDouble(Integer.MAX_VALUE);
+ MutableDouble minThroughput = new MutableDouble(Integer.MAX_VALUE);
+
brokersData.forEach((broker, data) -> {
//broker with one bundle can't be considered for bundle unloading
if (data.getLocalData().getBundles().size() <= 1) {
@@ -83,15 +86,24 @@ public class UniformLoadShedder implements
LoadSheddingStrategy {
double msgRate = data.getLocalData().getMsgRateIn() +
data.getLocalData().getMsgRateOut();
double throughputRate = data.getLocalData().getMsgThroughputIn()
+ data.getLocalData().getMsgThroughputOut();
- if (msgRate > maxMsgRate.getValue() || throughputRate >
maxThroughputRate.getValue()) {
- overloadedBroker.setValue(broker);
+ if (msgRate > maxMsgRate.getValue()) {
+ msgRateOverloadedBroker.setValue(broker);
maxMsgRate.setValue(msgRate);
- maxThroughputRate.setValue(throughputRate);
}
- if (msgRate < minMsgRate.getValue() || throughputRate <
minThroughputRate.getValue()) {
- underloadedBroker.setValue(broker);
+
+ if (throughputRate > maxThroughput.getValue()) {
+ msgThroughputOverloadedBroker.setValue(broker);
+ maxThroughput.setValue(throughputRate);
+ }
+
+ if (msgRate < minMsgRate.getValue()) {
+ msgRateUnderloadedBroker.setValue(broker);
minMsgRate.setValue(msgRate);
- minThroughputRate.setValue(throughputRate);
+ }
+
+ if (throughputRate < minThroughput.getValue()) {
+ msgThroughputUnderloadedBroker.setValue(broker);
+ minThroughput.setValue(throughputRate);
}
});
@@ -101,12 +113,12 @@ public class UniformLoadShedder implements
LoadSheddingStrategy {
if (minMsgRate.getValue() <= EPS && minMsgRate.getValue() >= -EPS) {
minMsgRate.setValue(1.0);
}
- if (minThroughputRate.getValue() <= EPS &&
minThroughputRate.getValue() >= -EPS) {
- minThroughputRate.setValue(1.0);
+ if (minThroughput.getValue() <= EPS && minThroughput.getValue() >=
-EPS) {
+ minThroughput.setValue(1.0);
}
double msgRateDifferencePercentage = ((maxMsgRate.getValue() -
minMsgRate.getValue()) * 100)
/ (minMsgRate.getValue());
- double msgThroughputDifferenceRate = maxThroughputRate.getValue() /
minThroughputRate.getValue();
+ double msgThroughputDifferenceRate = maxThroughput.getValue() /
minThroughput.getValue();
// if the threshold matches then find out how much load needs to be
unloaded by considering number of msgRate
// and throughput.
@@ -118,58 +130,79 @@ public class UniformLoadShedder implements
LoadSheddingStrategy {
.getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold();
if (isMsgRateThresholdExceeded || isMsgThroughputThresholdExceeded) {
- if (log.isDebugEnabled()) {
- log.debug(
- "Found bundles for uniform load balancing. "
- + "overloaded broker {} with
(msgRate,throughput)= ({},{}) "
- + "and underloaded broker {} with
(msgRate,throughput)= ({},{})",
- overloadedBroker.getValue(), maxMsgRate.getValue(),
maxThroughputRate.getValue(),
- underloadedBroker.getValue(), minMsgRate.getValue(),
minThroughputRate.getValue());
- }
MutableInt msgRateRequiredFromUnloadedBundles = new MutableInt(
(int) ((maxMsgRate.getValue() - minMsgRate.getValue()) *
MAX_UNLOAD_PERCENTAGE));
MutableInt msgThroughputRequiredFromUnloadedBundles = new
MutableInt(
- (int) ((maxThroughputRate.getValue() -
minThroughputRate.getValue()) * MAX_UNLOAD_PERCENTAGE));
- LocalBrokerData overloadedBrokerData =
brokersData.get(overloadedBroker.getValue()).getLocalData();
-
- if (overloadedBrokerData.getBundles().size() > 1
- && (msgRateRequiredFromUnloadedBundles.getValue() >=
MIN_UNLOAD_MESSAGE
- || msgThroughputRequiredFromUnloadedBundles.getValue() >=
MIN_UNLOAD_THROUGHPUT)) {
- // Sort bundles by throughput, then pick the bundle which can
help to reduce load uniformly with
- // under-loaded broker
- loadBundleData.entrySet().stream()
- .filter(e ->
overloadedBrokerData.getBundles().contains(e.getKey()))
- .map((e) -> {
- String bundle = e.getKey();
- BundleData bundleData = e.getValue();
- TimeAverageMessageData shortTermData =
bundleData.getShortTermData();
- double throughput = isMsgRateThresholdExceeded
- ? shortTermData.getMsgRateIn() +
shortTermData.getMsgRateOut()
- : shortTermData.getMsgThroughputIn() +
shortTermData.getMsgThroughputOut();
- return Triple.of(bundle, bundleData, throughput);
- }).filter(e ->
!recentlyUnloadedBundles.containsKey(e.getLeft()))
- .sorted((e1, e2) -> Double.compare(e2.getRight(),
e1.getRight())).forEach((e) -> {
- String bundle = e.getLeft();
- BundleData bundleData = e.getMiddle();
- TimeAverageMessageData shortTermData =
bundleData.getShortTermData();
- double throughput =
shortTermData.getMsgThroughputIn()
- + shortTermData.getMsgThroughputOut();
- double bundleMsgRate =
shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut();
- if (isMsgRateThresholdExceeded) {
+ (int) ((maxThroughput.getValue() -
maxThroughput.getValue()) * MAX_UNLOAD_PERCENTAGE));
+ if (isMsgRateThresholdExceeded) {
+ if (log.isDebugEnabled()) {
+ log.debug("Found bundles for uniform load balancing. "
+ + "msgRate overloaded broker: {} with
msgRate: {}, "
+ + "msgRate underloaded broker: {} with
msgRate: {}",
+ msgRateOverloadedBroker.getValue(),
maxMsgRate.getValue(),
+ msgRateUnderloadedBroker.getValue(),
minMsgRate.getValue());
+ }
+ LocalBrokerData overloadedBrokerData =
+
brokersData.get(msgRateOverloadedBroker.getValue()).getLocalData();
+ if (overloadedBrokerData.getBundles().size() > 1
+ && (msgRateRequiredFromUnloadedBundles.getValue() >=
MIN_UNLOAD_MESSAGE)) {
+ // Sort bundles by throughput, then pick the bundle which
can help to reduce load uniforml y with
+ // under-loaded broker
+ loadBundleData.entrySet().stream()
+ .filter(e ->
overloadedBrokerData.getBundles().contains(e.getKey()))
+ .map((e) -> {
+ String bundle = e.getKey();
+ TimeAverageMessageData shortTermData =
e.getValue().getShortTermData();
+ double msgRate = shortTermData.getMsgRateIn()
+ shortTermData.getMsgRateOut();
+ return Pair.of(bundle, msgRate);
+ }).filter(e ->
!recentlyUnloadedBundles.containsKey(e.getLeft()))
+ .sorted((e1, e2) -> Double.compare(e2.getRight(),
e1.getRight())).forEach((e) -> {
+ String bundle = e.getLeft();
+ double bundleMsgRate = e.getRight();
if (bundleMsgRate <=
(msgRateRequiredFromUnloadedBundles.getValue()
+ 1000/* delta */)) {
log.info("Found bundle to unload with
msgRate {}", bundleMsgRate);
msgRateRequiredFromUnloadedBundles.add(-bundleMsgRate);
-
selectedBundlesCache.put(overloadedBroker.getValue(), bundle);
+
selectedBundlesCache.put(msgRateOverloadedBroker.getValue(), bundle);
}
- } else {
- if (throughput <=
(msgThroughputRequiredFromUnloadedBundles.getValue())) {
- log.info("Found bundle to unload with
throughput {}", throughput);
-
msgThroughputRequiredFromUnloadedBundles.add(-throughput);
-
selectedBundlesCache.put(overloadedBroker.getValue(), bundle);
+ });
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Found bundles for uniform load balancing. "
+ + "msgThroughput overloaded broker: {}
with msgThroughput {}, "
+ + "msgThroughput underloaded broker: {}
with msgThroughput: {}",
+ msgThroughputOverloadedBroker.getValue(),
maxThroughput.getValue(),
+ msgThroughputUnderloadedBroker.getValue(),
minThroughput.getValue());
+ }
+ LocalBrokerData overloadedBrokerData =
+
brokersData.get(msgThroughputOverloadedBroker.getValue()).getLocalData();
+ if (overloadedBrokerData.getBundles().size() > 1
+ &&
+ msgThroughputRequiredFromUnloadedBundles.getValue() >=
MIN_UNLOAD_THROUGHPUT) {
+ // Sort bundles by throughput, then pick the bundle which
can help to reduce load uniformly with
+ // under-loaded broker
+ loadBundleData.entrySet().stream()
+ .filter(e ->
overloadedBrokerData.getBundles().contains(e.getKey()))
+ .map((e) -> {
+ String bundle = e.getKey();
+ TimeAverageMessageData shortTermData =
e.getValue().getShortTermData();
+ double msgThroughput =
shortTermData.getMsgThroughputIn()
+ + shortTermData.getMsgThroughputOut();
+ return Pair.of(bundle, msgThroughput);
+ }).filter(e ->
!recentlyUnloadedBundles.containsKey(e.getLeft()))
+ .sorted((e1, e2) -> Double.compare(e2.getRight(),
e1.getRight())).forEach((e) -> {
+ String bundle = e.getLeft();
+ double msgThroughput = e.getRight();
+ if (msgThroughput <=
(msgThroughputRequiredFromUnloadedBundles.getValue()
+ + 1000/* delta */)) {
+ log.info("Found bundle to unload with
msgThroughput {}", msgThroughput);
+
msgThroughputRequiredFromUnloadedBundles.add(-msgThroughput);
+
selectedBundlesCache.put(msgThroughputOverloadedBroker.getValue(), bundle);
}
- }
- });
+ });
+ }
+
}
}