This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 33cc1ca82d1 [fix][broker]Backport fix UniformLoadShedder selecet wrong 
overloadbroker and underloadbroker (#21182)
33cc1ca82d1 is described below

commit 33cc1ca82d10dfd8fb9e88163a342a3e5cf350ea
Author: AloysZhang <[email protected]>
AuthorDate: Tue Sep 19 18:08:19 2023 +0800

    [fix][broker]Backport fix UniformLoadShedder selecet wrong overloadbroker 
and underloadbroker (#21182)
---
 .../loadbalance/impl/UniformLoadShedder.java       | 151 +++++++++++++--------
 1 file changed, 92 insertions(+), 59 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
index df8e1b66509..dd51a17c186 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
@@ -25,7 +25,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.mutable.MutableDouble;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.commons.lang3.tuple.Triple;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.BrokerData;
 import org.apache.pulsar.broker.BundleData;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -36,9 +36,9 @@ import 
org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
 
 /**
  * This strategy tends to distribute load uniformly across all brokers. This 
strategy checks load difference between
- * broker with highest load and broker with lowest load. If the difference is 
higher than configured thresholds
+ * broker with the highest load and broker with the lowest load. If the 
difference is higher than configured thresholds
  * {@link 
ServiceConfiguration#getLoadBalancerMsgRateDifferenceShedderThreshold()} or
- * {@link 
ServiceConfiguration#loadBalancerMsgThroughputMultiplierDifferenceShedderThreshold()}
 then it finds out
+ * {@link 
ServiceConfiguration#getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold()}
 then it finds out
  * bundles which can be unloaded to distribute traffic evenly across all 
brokers.
  *
  */
@@ -68,12 +68,15 @@ public class UniformLoadShedder implements 
LoadSheddingStrategy {
         Map<String, BundleData> loadBundleData = 
loadData.getBundleDataForLoadShedding();
         Map<String, Long> recentlyUnloadedBundles = 
loadData.getRecentlyUnloadedBundles();
 
-        MutableObject<String> overloadedBroker = new MutableObject<>();
-        MutableObject<String> underloadedBroker = new MutableObject<>();
+        MutableObject<String> msgRateOverloadedBroker = new MutableObject<>();
+        MutableObject<String> msgThroughputOverloadedBroker = new 
MutableObject<>();
+        MutableObject<String> msgRateUnderloadedBroker = new MutableObject<>();
+        MutableObject<String> msgThroughputUnderloadedBroker = new 
MutableObject<>();
         MutableDouble maxMsgRate = new MutableDouble(-1);
-        MutableDouble maxThroughputRate = new MutableDouble(-1);
+        MutableDouble maxThroughput = new MutableDouble(-1);
         MutableDouble minMsgRate = new MutableDouble(Integer.MAX_VALUE);
-        MutableDouble minThroughputRate = new MutableDouble(Integer.MAX_VALUE);
+        MutableDouble minThroughput = new MutableDouble(Integer.MAX_VALUE);
+
         brokersData.forEach((broker, data) -> {
             //broker with one bundle can't be considered for bundle unloading
             if (data.getLocalData().getBundles().size() <= 1) {
@@ -83,15 +86,24 @@ public class UniformLoadShedder implements 
LoadSheddingStrategy {
             double msgRate = data.getLocalData().getMsgRateIn() + 
data.getLocalData().getMsgRateOut();
             double throughputRate = data.getLocalData().getMsgThroughputIn()
                     + data.getLocalData().getMsgThroughputOut();
-            if (msgRate > maxMsgRate.getValue() || throughputRate > 
maxThroughputRate.getValue()) {
-                overloadedBroker.setValue(broker);
+            if (msgRate > maxMsgRate.getValue()) {
+                msgRateOverloadedBroker.setValue(broker);
                 maxMsgRate.setValue(msgRate);
-                maxThroughputRate.setValue(throughputRate);
             }
-            if (msgRate < minMsgRate.getValue() || throughputRate < 
minThroughputRate.getValue()) {
-                underloadedBroker.setValue(broker);
+
+            if (throughputRate > maxThroughput.getValue()) {
+                msgThroughputOverloadedBroker.setValue(broker);
+                maxThroughput.setValue(throughputRate);
+            }
+
+            if (msgRate < minMsgRate.getValue()) {
+                msgRateUnderloadedBroker.setValue(broker);
                 minMsgRate.setValue(msgRate);
-                minThroughputRate.setValue(throughputRate);
+            }
+
+            if (throughputRate < minThroughput.getValue()) {
+                msgThroughputUnderloadedBroker.setValue(broker);
+                minThroughput.setValue(throughputRate);
             }
         });
 
@@ -101,12 +113,12 @@ public class UniformLoadShedder implements 
LoadSheddingStrategy {
         if (minMsgRate.getValue() <= EPS && minMsgRate.getValue() >= -EPS) {
             minMsgRate.setValue(1.0);
         }
-        if (minThroughputRate.getValue() <= EPS && 
minThroughputRate.getValue() >= -EPS) {
-            minThroughputRate.setValue(1.0);
+        if (minThroughput.getValue() <= EPS && minThroughput.getValue() >= 
-EPS) {
+            minThroughput.setValue(1.0);
         }
         double msgRateDifferencePercentage = ((maxMsgRate.getValue() - 
minMsgRate.getValue()) * 100)
                 / (minMsgRate.getValue());
-        double msgThroughputDifferenceRate = maxThroughputRate.getValue() / 
minThroughputRate.getValue();
+        double msgThroughputDifferenceRate = maxThroughput.getValue() / 
minThroughput.getValue();
 
         // if the threshold matches then find out how much load needs to be 
unloaded by considering number of msgRate
         // and throughput.
@@ -118,58 +130,79 @@ public class UniformLoadShedder implements 
LoadSheddingStrategy {
                         
.getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold();
 
         if (isMsgRateThresholdExceeded || isMsgThroughputThresholdExceeded) {
-            if (log.isDebugEnabled()) {
-                log.debug(
-                        "Found bundles for uniform load balancing. "
-                                + "overloaded broker {} with 
(msgRate,throughput)= ({},{}) "
-                                + "and underloaded broker {} with 
(msgRate,throughput)= ({},{})",
-                        overloadedBroker.getValue(), maxMsgRate.getValue(), 
maxThroughputRate.getValue(),
-                        underloadedBroker.getValue(), minMsgRate.getValue(), 
minThroughputRate.getValue());
-            }
             MutableInt msgRateRequiredFromUnloadedBundles = new MutableInt(
                     (int) ((maxMsgRate.getValue() - minMsgRate.getValue()) * 
MAX_UNLOAD_PERCENTAGE));
             MutableInt msgThroughputRequiredFromUnloadedBundles = new 
MutableInt(
-                    (int) ((maxThroughputRate.getValue() - 
minThroughputRate.getValue()) * MAX_UNLOAD_PERCENTAGE));
-            LocalBrokerData overloadedBrokerData = 
brokersData.get(overloadedBroker.getValue()).getLocalData();
-
-            if (overloadedBrokerData.getBundles().size() > 1
-                && (msgRateRequiredFromUnloadedBundles.getValue() >= 
MIN_UNLOAD_MESSAGE
-                    || msgThroughputRequiredFromUnloadedBundles.getValue() >= 
MIN_UNLOAD_THROUGHPUT)) {
-                // Sort bundles by throughput, then pick the bundle which can 
help to reduce load uniformly with
-                // under-loaded broker
-                loadBundleData.entrySet().stream()
-                        .filter(e -> 
overloadedBrokerData.getBundles().contains(e.getKey()))
-                        .map((e) -> {
-                            String bundle = e.getKey();
-                            BundleData bundleData = e.getValue();
-                            TimeAverageMessageData shortTermData = 
bundleData.getShortTermData();
-                            double throughput = isMsgRateThresholdExceeded
-                                    ? shortTermData.getMsgRateIn() + 
shortTermData.getMsgRateOut()
-                                    : shortTermData.getMsgThroughputIn() + 
shortTermData.getMsgThroughputOut();
-                            return Triple.of(bundle, bundleData, throughput);
-                        }).filter(e -> 
!recentlyUnloadedBundles.containsKey(e.getLeft()))
-                        .sorted((e1, e2) -> Double.compare(e2.getRight(), 
e1.getRight())).forEach((e) -> {
-                            String bundle = e.getLeft();
-                            BundleData bundleData = e.getMiddle();
-                            TimeAverageMessageData shortTermData = 
bundleData.getShortTermData();
-                            double throughput = 
shortTermData.getMsgThroughputIn()
-                                    + shortTermData.getMsgThroughputOut();
-                            double bundleMsgRate = 
shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut();
-                            if (isMsgRateThresholdExceeded) {
+                    (int) ((maxThroughput.getValue() - 
maxThroughput.getValue()) * MAX_UNLOAD_PERCENTAGE));
+            if (isMsgRateThresholdExceeded) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Found bundles for uniform load balancing. "
+                                    + "msgRate overloaded broker: {} with 
msgRate: {}, "
+                                    + "msgRate underloaded broker: {} with 
msgRate: {}",
+                            msgRateOverloadedBroker.getValue(), 
maxMsgRate.getValue(),
+                            msgRateUnderloadedBroker.getValue(), 
minMsgRate.getValue());
+                }
+                LocalBrokerData overloadedBrokerData =
+                        
brokersData.get(msgRateOverloadedBroker.getValue()).getLocalData();
+                if (overloadedBrokerData.getBundles().size() > 1
+                        && (msgRateRequiredFromUnloadedBundles.getValue() >= 
MIN_UNLOAD_MESSAGE)) {
+                    // Sort bundles by throughput, then pick the bundle which 
can help to reduce load uniforml  y with
+                    // under-loaded broker
+                    loadBundleData.entrySet().stream()
+                            .filter(e -> 
overloadedBrokerData.getBundles().contains(e.getKey()))
+                            .map((e) -> {
+                                String bundle = e.getKey();
+                                TimeAverageMessageData shortTermData = 
e.getValue().getShortTermData();
+                                double msgRate = shortTermData.getMsgRateIn() 
+ shortTermData.getMsgRateOut();
+                                return Pair.of(bundle, msgRate);
+                            }).filter(e -> 
!recentlyUnloadedBundles.containsKey(e.getLeft()))
+                            .sorted((e1, e2) -> Double.compare(e2.getRight(), 
e1.getRight())).forEach((e) -> {
+                                String bundle = e.getLeft();
+                                double bundleMsgRate = e.getRight();
                                 if (bundleMsgRate <= 
(msgRateRequiredFromUnloadedBundles.getValue()
                                         + 1000/* delta */)) {
                                     log.info("Found bundle to unload with 
msgRate {}", bundleMsgRate);
                                     
msgRateRequiredFromUnloadedBundles.add(-bundleMsgRate);
-                                    
selectedBundlesCache.put(overloadedBroker.getValue(), bundle);
+                                    
selectedBundlesCache.put(msgRateOverloadedBroker.getValue(), bundle);
                                 }
-                            } else {
-                                if (throughput <= 
(msgThroughputRequiredFromUnloadedBundles.getValue())) {
-                                    log.info("Found bundle to unload with 
throughput {}", throughput);
-                                    
msgThroughputRequiredFromUnloadedBundles.add(-throughput);
-                                    
selectedBundlesCache.put(overloadedBroker.getValue(), bundle);
+                            });
+                }
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("Found bundles for uniform load balancing. "
+                                    + "msgThroughput overloaded broker: {} 
with msgThroughput {}, "
+                                    + "msgThroughput underloaded broker: {} 
with msgThroughput: {}",
+                            msgThroughputOverloadedBroker.getValue(), 
maxThroughput.getValue(),
+                            msgThroughputUnderloadedBroker.getValue(), 
minThroughput.getValue());
+                }
+                LocalBrokerData overloadedBrokerData =
+                        
brokersData.get(msgThroughputOverloadedBroker.getValue()).getLocalData();
+                if (overloadedBrokerData.getBundles().size() > 1
+                        &&
+                        msgThroughputRequiredFromUnloadedBundles.getValue() >= 
MIN_UNLOAD_THROUGHPUT) {
+                    // Sort bundles by throughput, then pick the bundle which 
can help to reduce load uniformly with
+                    // under-loaded broker
+                    loadBundleData.entrySet().stream()
+                            .filter(e -> 
overloadedBrokerData.getBundles().contains(e.getKey()))
+                            .map((e) -> {
+                                String bundle = e.getKey();
+                                TimeAverageMessageData shortTermData = 
e.getValue().getShortTermData();
+                                double msgThroughput = 
shortTermData.getMsgThroughputIn()
+                                        + shortTermData.getMsgThroughputOut();
+                                return Pair.of(bundle, msgThroughput);
+                            }).filter(e -> 
!recentlyUnloadedBundles.containsKey(e.getLeft()))
+                            .sorted((e1, e2) -> Double.compare(e2.getRight(), 
e1.getRight())).forEach((e) -> {
+                                String bundle = e.getLeft();
+                                double msgThroughput = e.getRight();
+                                if (msgThroughput <= 
(msgThroughputRequiredFromUnloadedBundles.getValue()
+                                        + 1000/* delta */)) {
+                                    log.info("Found bundle to unload with 
msgThroughput {}", msgThroughput);
+                                    
msgThroughputRequiredFromUnloadedBundles.add(-msgThroughput);
+                                    
selectedBundlesCache.put(msgThroughputOverloadedBroker.getValue(), bundle);
                                 }
-                            }
-                        });
+                            });
+                }
+
             }
         }
 

Reply via email to