This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f96e10ce95c1edfe09262f8564b0d92b05cd6a4c Author: crossoverJie <crossover...@gmail.com> AuthorDate: Thu Mar 13 17:59:21 2025 +0800 [improve][broker] Optimize ThresholdShedder with improved boundary checks and parameter reuse (#24064) (cherry picked from commit 7635f3c55b35f341dbc0878c29321a414ae1e9fa) --- .../pulsar/broker/loadbalance/impl/ThresholdShedder.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java index 86df49f9526..82a4b4ae4ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java @@ -116,7 +116,7 @@ public class ThresholdShedder implements LoadSheddingStrategy { } }); if (selectedBundlesCache.isEmpty() && conf.isLowerBoundarySheddingEnabled()) { - tryLowerBoundaryShedding(loadData, conf); + tryLowerBoundaryShedding(loadData, threshold, conf); } return selectedBundlesCache; } @@ -179,18 +179,12 @@ public class ThresholdShedder implements LoadSheddingStrategy { return historyUsage; } - private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) { + private void tryLowerBoundaryShedding(LoadData loadData, double threshold, ServiceConfiguration conf) { // Select the broker with the most resource usage. - final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0; final double avgUsage = getBrokerAvgUsage(loadData, conf.getLoadBalancerHistoryResourcePercentage(), conf); Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage); boolean hasBrokerBelowLowerBound = result.getLeft(); String maxUsageBroker = result.getRight(); - BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker); - if (brokerData == null) { - log.info("Load data is null or bundle <=1, skipping bundle unload."); - return; - } if (!hasBrokerBelowLowerBound) { log.info("No broker is below the lower bound, threshold is {}, " + "avgUsage usage is {}, max usage of Broker {} is {}", @@ -198,6 +192,11 @@ public class ThresholdShedder implements LoadSheddingStrategy { brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0)); return; } + BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker); + if (brokerData == null) { + log.info("Load data is null or bundle <=1, skipping bundle unload."); + return; + } LocalBrokerData localData = brokerData.getLocalData(); double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut(); double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;