codelipenghui commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r972048793
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java:
##########
@@ -224,4 +225,114 @@ public void testPrintResourceUsage() {
assertEquals(data.printResourceUsage(),
"cpu: 10.00%, memory: 50.00%, directMemory: 90.00%, bandwidthIn:
30.00%, bandwidthOut: 20.00%");
}
+
+ @Test
+ public void testRangeThroughput() {
Review Comment:
```suggestion
public void testLowerBoundaryShedding() {
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,60 @@ private double updateAvgResourceUsage(String broker,
LocalBrokerData localBroker
return historyUsage;
}
+ private void tryLowerBoundaryShedding(LoadData loadData,
ServiceConfiguration conf) {
+ // Select the broker with the most resource usage.
+ final double threshold =
conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+ final double avgUsage = getBrokerAvgUsage(loadData, conf,
canSampleLog());
+ Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold,
avgUsage);
+ boolean hasBrokerBelowLowerBound = result.getLeft();
+ String maxUsageBroker = result.getRight();
+ BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+ if (brokerData == null) {
+ log.info("Load data is null or bundle <=1, skipping bundle
unload.");
+ return;
+ }
+ if (!hasBrokerBelowLowerBound) {
+ log.info("No broker is below the lower bound, threshold is {}, "
+ + "avgUsage usage is {}, max usage of Broker {} is
{}",
+ threshold, avgUsage, maxUsageBroker,
+ brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+ return;
+ }
+ LocalBrokerData localData = brokerData.getLocalData();
+ double brokerCurrentThroughput = localData.getMsgThroughputIn() +
localData.getMsgThroughputOut();
+ double minimumThroughputToOffload = brokerCurrentThroughput *
threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
+ double minThroughputThreshold =
conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+ if (minThroughputThreshold > minimumThroughputToOffload) {
+ log.info("broker {} in lower boundary shedding is planning to shed
throughput {} MByte/s less than "
+ + "minimumThroughputThreshold {} MByte/s, skipping
bundle unload.",
+ maxUsageBroker, minimumThroughputToOffload / MB,
minThroughputThreshold / MB);
+ return;
+ }
+ filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(),
maxUsageBroker, localData,
+ minimumThroughputToOffload);
+ }
+
+ private Pair<Boolean, String> getMaxUsageBroker(
+ LoadData loadData, double threshold, double avgUsage) {
+ String maxUsageBrokerName = "";
+ double maxUsage = avgUsage - threshold;
Review Comment:
The max usage broker load: > (avgUsage - threshold)
The below lower broker load: < (avgUsage - threshold)
After the max usage broker unloads to the lower broker, the max usage broker
might become the lower broker, the lower broker becomes the max usage broker.
Can this will lead to frequent bundle unloading?
It looks like we need to change
```java
double minimumThroughputToOffload = brokerCurrentThroughput * threshold *
LOWER_BOUNDARY_THRESHOLD_MARGIN;
```
to
```java
double minimumThroughputToOffload = Math.min(brokerCurrentThroughput *
threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN, brokerCurrentThroughput - avgUsage
- threshold);
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,60 @@ private double updateAvgResourceUsage(String broker,
LocalBrokerData localBroker
return historyUsage;
}
+ private void tryLowerBoundaryShedding(LoadData loadData,
ServiceConfiguration conf) {
+ // Select the broker with the most resource usage.
+ final double threshold =
conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+ final double avgUsage = getBrokerAvgUsage(loadData, conf,
canSampleLog());
+ Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold,
avgUsage);
+ boolean hasBrokerBelowLowerBound = result.getLeft();
+ String maxUsageBroker = result.getRight();
+ BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+ if (brokerData == null) {
+ log.info("Load data is null or bundle <=1, skipping bundle
unload.");
+ return;
+ }
+ if (!hasBrokerBelowLowerBound) {
+ log.info("No broker is below the lower bound, threshold is {}, "
+ + "avgUsage usage is {}, max usage of Broker {} is
{}",
+ threshold, avgUsage, maxUsageBroker,
+ brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+ return;
+ }
+ LocalBrokerData localData = brokerData.getLocalData();
+ double brokerCurrentThroughput = localData.getMsgThroughputIn() +
localData.getMsgThroughputOut();
+ double minimumThroughputToOffload = brokerCurrentThroughput *
threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
+ double minThroughputThreshold =
conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+ if (minThroughputThreshold > minimumThroughputToOffload) {
+ log.info("broker {} in lower boundary shedding is planning to shed
throughput {} MByte/s less than "
+ + "minimumThroughputThreshold {} MByte/s, skipping
bundle unload.",
+ maxUsageBroker, minimumThroughputToOffload / MB,
minThroughputThreshold / MB);
+ return;
+ }
+ filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(),
maxUsageBroker, localData,
+ minimumThroughputToOffload);
+ }
+
+ private Pair<Boolean, String> getMaxUsageBroker(
+ LoadData loadData, double threshold, double avgUsage) {
+ String maxUsageBrokerName = "";
+ double maxUsage = avgUsage - threshold;
Review Comment:
And we should also add a test for this case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]