This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 222e69a53ff [improve][broker] PIP-192 Improve TransferSheder for
overload outlier for large clusters (#20059)
222e69a53ff is described below
commit 222e69a53ffc098115251253a2bc4fe8542ce55b
Author: Heesung Sohn <[email protected]>
AuthorDate: Tue Apr 11 01:59:04 2023 -0700
[improve][broker] PIP-192 Improve TransferSheder for overload outlier for
large clusters (#20059)
---
.../extensions/scheduler/TransferShedder.java | 69 +++++++++++++------
.../extensions/scheduler/TransferShedderTest.java | 78 ++++++++++++++++++++++
2 files changed, 126 insertions(+), 21 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
index 98e05296d60..07d521a28af 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
@@ -291,6 +291,10 @@ public class TransferShedder implements
NamespaceUnloadStrategy {
return brokersSortedByLoad.get(minBrokerIndex).getKey();
}
+ String peekMaxBroker() {
+ return brokersSortedByLoad.get(maxBrokerIndex).getKey();
+ }
+
String pollMaxBroker() {
return brokersSortedByLoad.get(maxBrokerIndex--).getKey();
}
@@ -358,7 +362,9 @@ public class TransferShedder implements
NamespaceUnloadStrategy {
final double targetStd = conf.getLoadBalancerBrokerLoadTargetStd();
boolean transfer = conf.isLoadBalancerTransferEnabled();
- if (stats.std() > targetStd || isUnderLoaded(context,
stats.peekMinBroker(), stats.avg)) {
+ if (stats.std() > targetStd
+ || isUnderLoaded(context, stats.peekMinBroker(), stats.avg)
+ || isOverLoaded(context, stats.peekMaxBroker(),
stats.avg)) {
unloadConditionHitCount++;
} else {
unloadConditionHitCount = 0;
@@ -383,28 +389,36 @@ public class TransferShedder implements
NamespaceUnloadStrategy {
break;
}
UnloadDecision.Reason reason;
- if (stats.std() <= targetStd) {
- if (!isUnderLoaded(context, stats.peekMinBroker(),
stats.avg)) {
- if (debugMode) {
- log.info(CANNOT_CONTINUE_UNLOAD_MSG
- + "The overall cluster load meets
the target, std:{} <= targetStd:{},"
- + " and minBroker:{} is not
underloaded.",
- stats.std(), targetStd,
stats.peekMinBroker());
- }
- break;
- } else {
- reason = Underloaded;
- if (debugMode) {
- log.info(String.format("broker:%s is
underloaded:%s although "
- + "load std:%.2f <=
targetStd:%.2f. "
- + "Continuing unload for this
underloaded broker.",
- stats.peekMinBroker(),
-
context.brokerLoadDataStore().get(stats.peekMinBroker()).get(),
- stats.std(), targetStd));
- }
+ if (stats.std() > targetStd) {
+ reason = Overloaded;
+ } else if (isUnderLoaded(context, stats.peekMinBroker(),
stats.avg)) {
+ reason = Underloaded;
+ if (debugMode) {
+ log.info(String.format("broker:%s is underloaded:%s
although "
+ + "load std:%.2f <= targetStd:%.2f. "
+ + "Continuing unload for this
underloaded broker.",
+ stats.peekMinBroker(),
+
context.brokerLoadDataStore().get(stats.peekMinBroker()).get(),
+ stats.std(), targetStd));
}
- } else {
+ } else if (isOverLoaded(context, stats.peekMaxBroker(),
stats.avg)) {
reason = Overloaded;
+ if (debugMode) {
+ log.info(String.format("broker:%s is overloaded:%s
although "
+ + "load std:%.2f <= targetStd:%.2f. "
+ + "Continuing unload for this
overloaded broker.",
+ stats.peekMaxBroker(),
+
context.brokerLoadDataStore().get(stats.peekMaxBroker()).get(),
+ stats.std(), targetStd));
+ }
+ } else {
+ if (debugMode) {
+ log.info(CANNOT_CONTINUE_UNLOAD_MSG
+ + "The overall cluster load meets the
target, std:{} <= targetStd:{}."
+ + "minBroker:{} is not underloaded.
maxBroker:{} is not overloaded.",
+ stats.std(), targetStd, stats.peekMinBroker(),
stats.peekMaxBroker());
+ }
+ break;
}
String maxBroker = stats.pollMaxBroker();
@@ -671,6 +685,19 @@ public class TransferShedder implements
NamespaceUnloadStrategy {
context.brokerConfiguration().getLoadBalancerBrokerLoadTargetStd() / 2));
}
+ private boolean isOverLoaded(LoadManagerContext context, String broker,
double avgLoad) {
+ var brokerLoadDataOptional = context.brokerLoadDataStore().get(broker);
+ if (brokerLoadDataOptional.isEmpty()) {
+ return false;
+ }
+ var conf = context.brokerConfiguration();
+ var overloadThreshold =
conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
+ var targetStd = conf.getLoadBalancerBrokerLoadTargetStd();
+ var brokerLoadData = brokerLoadDataOptional.get();
+ var load = brokerLoadData.getWeightedMaxEMA();
+ return load > overloadThreshold && load > avgLoad + targetStd;
+ }
+
private boolean isTransferable(LoadManagerContext context,
Map<String, BrokerLookupData>
availableBrokers,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
index 3279cb4e475..d673722d6fb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -200,6 +200,54 @@ public class TransferShedderTest {
return ctx;
}
+ public LoadManagerContext setupContextLoadSkewedOverload(int clusterSize) {
+ var ctx = getContext();
+
+ var brokerLoadDataStore = ctx.brokerLoadDataStore();
+ var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+
+ int i = 0;
+ for (; i < clusterSize-1; i++) {
+ int brokerLoad = 1;
+ topBundlesLoadDataStore.pushAsync("broker" + i,
getTopBundlesLoad("my-tenant/my-namespace" + i,
+ 300_000, 700_000));
+ brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx,
brokerLoad, "broker" + i));
+ }
+ int brokerLoad = 100;
+ topBundlesLoadDataStore.pushAsync("broker" + i,
getTopBundlesLoad("my-tenant/my-namespace" + i,
+ 30_000_000, 70_000_000));
+ brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx,
brokerLoad, "broker" + i));
+
+ return ctx;
+ }
+
+ public LoadManagerContext setupContextLoadSkewedUnderload(int clusterSize)
{
+ var ctx = getContext();
+
+ var brokerLoadDataStore = ctx.brokerLoadDataStore();
+ var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+
+ int i = 0;
+ for (; i < clusterSize-2; i++) {
+ int brokerLoad = 98;
+ topBundlesLoadDataStore.pushAsync("broker" + i,
getTopBundlesLoad("my-tenant/my-namespace" + i,
+ 30_000_000, 70_000_000));
+ brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx,
brokerLoad, "broker" + i));
+ }
+
+ int brokerLoad = 99;
+ topBundlesLoadDataStore.pushAsync("broker" + i,
getTopBundlesLoad("my-tenant/my-namespace" + i,
+ 30_000_000, 70_000_000));
+ brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx,
brokerLoad, "broker" + i));
+ i++;
+
+ brokerLoad = 1;
+ topBundlesLoadDataStore.pushAsync("broker" + i,
getTopBundlesLoad("my-tenant/my-namespace" + i,
+ 300_000, 700_000));
+ brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx,
brokerLoad, "broker" + i));
+ return ctx;
+ }
+
public BrokerLoadData getCpuLoad(LoadManagerContext ctx, int load, String
broker) {
var loadData = new BrokerLoadData();
SystemResourceUsage usage1 = new SystemResourceUsage();
@@ -1176,6 +1224,36 @@ public class TransferShedderTest {
}
}
+ @Test
+ public void testOverloadOutlier() {
+ UnloadCounter counter = new UnloadCounter();
+ TransferShedder transferShedder = new TransferShedder(counter);
+ var ctx = setupContextLoadSkewedOverload(100);
+ var res = transferShedder.findBundlesForUnloading(ctx, Map.of(),
Map.of());
+ var expected = new HashSet<UnloadDecision>();
+ expected.add(new UnloadDecision(
+ new Unload("broker99",
"my-tenant/my-namespace99/0x00000000_0x0FFFFFFF",
+ Optional.of("broker52")), Success, Overloaded));
+ assertEquals(res, expected);
+ assertEquals(counter.getLoadAvg(), 0.019900000000000008);
+ assertEquals(counter.getLoadStd(), 0.09850375627355534);
+ }
+
+ @Test
+ public void testUnderloadOutlier() {
+ UnloadCounter counter = new UnloadCounter();
+ TransferShedder transferShedder = new TransferShedder(counter);
+ var ctx = setupContextLoadSkewedUnderload(100);
+ var res = transferShedder.findBundlesForUnloading(ctx, Map.of(),
Map.of());
+ var expected = new HashSet<UnloadDecision>();
+ expected.add(new UnloadDecision(
+ new Unload("broker98",
"my-tenant/my-namespace98/0x00000000_0x0FFFFFFF",
+ Optional.of("broker99")), Success, Underloaded));
+ assertEquals(res, expected);
+ assertEquals(counter.getLoadAvg(), 0.9704000000000005);
+ assertEquals(counter.getLoadStd(), 0.09652895938523735);
+ }
+
@Test
public void testRandomLoadStats() {
int numBrokers = 10;