This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4d54779ff3634e5786a6dd3a2a9ec1574c92c6d1 Author: crossoverJie <crossover...@gmail.com> AuthorDate: Thu Mar 13 17:59:21 2025 +0800 [improve][broker] Optimize ThresholdShedder with improved boundary checks and parameter reuse (#24064) (cherry picked from commit 7635f3c55b35f341dbc0878c29321a414ae1e9fa) --- .../pulsar/broker/loadbalance/impl/ThresholdShedder.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java index 882c72a71c9..3fa3267c8da 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java @@ -119,7 +119,7 @@ public class ThresholdShedder implements LoadSheddingStrategy { } }); if (selectedBundlesCache.isEmpty() && conf.isLowerBoundarySheddingEnabled()) { - tryLowerBoundaryShedding(loadData, conf); + tryLowerBoundaryShedding(loadData, threshold, conf); } return selectedBundlesCache; } @@ -182,18 +182,12 @@ public class ThresholdShedder implements LoadSheddingStrategy { return historyUsage; } - private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) { + private void tryLowerBoundaryShedding(LoadData loadData, double threshold, ServiceConfiguration conf) { // Select the broker with the most resource usage. - final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0; final double avgUsage = getBrokerAvgUsage(loadData, conf.getLoadBalancerHistoryResourcePercentage(), conf); Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage); boolean hasBrokerBelowLowerBound = result.getLeft(); String maxUsageBroker = result.getRight(); - BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker); - if (brokerData == null) { - log.info("Load data is null or bundle <=1, skipping bundle unload."); - return; - } if (!hasBrokerBelowLowerBound) { log.info("No broker is below the lower bound, threshold is {}, " + "avgUsage usage is {}, max usage of Broker {} is {}", @@ -201,6 +195,11 @@ public class ThresholdShedder implements LoadSheddingStrategy { brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0)); return; } + BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker); + if (brokerData == null) { + log.info("Load data is null or bundle <=1, skipping bundle unload."); + return; + } LocalBrokerData localData = brokerData.getLocalData(); double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut(); double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;