This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fc86f3b5f06 [fix][broker] fix UniformLoadShedder seleet wrong 
overloadbroker and underloadbroker (#21025)
fc86f3b5f06 is described below

commit fc86f3b5f066bc71840d42d1163e85f048216aa7
Author: AloysZhang <[email protected]>
AuthorDate: Tue Sep 12 16:25:42 2023 +0800

    [fix][broker] fix UniformLoadShedder seleet wrong overloadbroker and 
underloadbroker (#21025)
---
 .../loadbalance/impl/UniformLoadShedder.java       | 163 +++++++++++++--------
 .../loadbalance/impl/UniformLoadShedderTest.java   |  78 ++++++++++
 2 files changed, 178 insertions(+), 63 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
index b92af5b7c69..e3055246f4b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
@@ -25,7 +25,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.mutable.MutableDouble;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.commons.lang3.tuple.Triple;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.LoadData;
 import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
@@ -36,7 +36,7 @@ import 
org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
 
 /**
  * This strategy tends to distribute load uniformly across all brokers. This 
strategy checks load difference between
- * broker with highest load and broker with lowest load. If the difference is 
higher than configured thresholds
+ * broker with the highest load and broker with the lowest load. If the 
difference is higher than configured thresholds
  * {@link 
ServiceConfiguration#getLoadBalancerMsgRateDifferenceShedderThreshold()} or
  * {@link 
ServiceConfiguration#getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold()}
 then it finds out
  * bundles which can be unloaded to distribute traffic evenly across all 
brokers.
@@ -63,25 +63,37 @@ public class UniformLoadShedder implements 
LoadSheddingStrategy {
         Map<String, BundleData> loadBundleData = 
loadData.getBundleDataForLoadShedding();
         Map<String, Long> recentlyUnloadedBundles = 
loadData.getRecentlyUnloadedBundles();
 
-        MutableObject<String> overloadedBroker = new MutableObject<>();
-        MutableObject<String> underloadedBroker = new MutableObject<>();
+        MutableObject<String> msgRateOverloadedBroker = new MutableObject<>();
+        MutableObject<String> msgThroughputOverloadedBroker = new 
MutableObject<>();
+        MutableObject<String> msgRateUnderloadedBroker = new MutableObject<>();
+        MutableObject<String> msgThroughputUnderloadedBroker = new 
MutableObject<>();
         MutableDouble maxMsgRate = new MutableDouble(-1);
-        MutableDouble maxThroughputRate = new MutableDouble(-1);
+        MutableDouble maxThroughput = new MutableDouble(-1);
         MutableDouble minMsgRate = new MutableDouble(Integer.MAX_VALUE);
-        MutableDouble minThroughputRate = new MutableDouble(Integer.MAX_VALUE);
+        MutableDouble minThroughput = new MutableDouble(Integer.MAX_VALUE);
+
         brokersData.forEach((broker, data) -> {
             double msgRate = data.getLocalData().getMsgRateIn() + 
data.getLocalData().getMsgRateOut();
             double throughputRate = data.getLocalData().getMsgThroughputIn()
                     + data.getLocalData().getMsgThroughputOut();
-            if (msgRate > maxMsgRate.getValue() || throughputRate > 
maxThroughputRate.getValue()) {
-                overloadedBroker.setValue(broker);
+            if (msgRate > maxMsgRate.getValue()) {
+                msgRateOverloadedBroker.setValue(broker);
                 maxMsgRate.setValue(msgRate);
-                maxThroughputRate.setValue(throughputRate);
             }
-            if (msgRate < minMsgRate.getValue() || throughputRate < 
minThroughputRate.getValue()) {
-                underloadedBroker.setValue(broker);
+
+            if (throughputRate > maxThroughput.getValue()) {
+                msgThroughputOverloadedBroker.setValue(broker);
+                maxThroughput.setValue(throughputRate);
+            }
+
+            if (msgRate < minMsgRate.getValue()) {
+                msgRateUnderloadedBroker.setValue(broker);
                 minMsgRate.setValue(msgRate);
-                minThroughputRate.setValue(throughputRate);
+            }
+
+            if (throughputRate < minThroughput.getValue()) {
+                msgThroughputUnderloadedBroker.setValue(broker);
+                minThroughput.setValue(throughputRate);
             }
         });
 
@@ -91,12 +103,12 @@ public class UniformLoadShedder implements 
LoadSheddingStrategy {
         if (minMsgRate.getValue() <= EPS && minMsgRate.getValue() >= -EPS) {
             minMsgRate.setValue(1.0);
         }
-        if (minThroughputRate.getValue() <= EPS && 
minThroughputRate.getValue() >= -EPS) {
-            minThroughputRate.setValue(1.0);
+        if (minThroughput.getValue() <= EPS && minThroughput.getValue() >= 
-EPS) {
+            minThroughput.setValue(1.0);
         }
         double msgRateDifferencePercentage = ((maxMsgRate.getValue() - 
minMsgRate.getValue()) * 100)
                 / (minMsgRate.getValue());
-        double msgThroughputDifferenceRate = maxThroughputRate.getValue() / 
minThroughputRate.getValue();
+        double msgThroughputDifferenceRate = maxThroughput.getValue() / 
minThroughput.getValue();
 
         // if the threshold matches then find out how much load needs to be 
unloaded by considering number of msgRate
         // and throughput.
@@ -105,66 +117,91 @@ public class UniformLoadShedder implements 
LoadSheddingStrategy {
         boolean isMsgThroughputThresholdExceeded = conf
                 
.getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold() > 0
                 && msgThroughputDifferenceRate > conf
-                        
.getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold();
+                
.getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold();
 
         if (isMsgRateThresholdExceeded || isMsgThroughputThresholdExceeded) {
-            if (log.isDebugEnabled()) {
-                log.debug(
-                        "Found bundles for uniform load balancing. "
-                                + "overloaded broker {} with 
(msgRate,throughput)= ({},{}) "
-                                + "and underloaded broker {} with 
(msgRate,throughput)= ({},{})",
-                        overloadedBroker.getValue(), maxMsgRate.getValue(), 
maxThroughputRate.getValue(),
-                        underloadedBroker.getValue(), minMsgRate.getValue(), 
minThroughputRate.getValue());
-            }
             MutableInt msgRateRequiredFromUnloadedBundles = new MutableInt(
                     (int) ((maxMsgRate.getValue() - minMsgRate.getValue()) * 
conf.getMaxUnloadPercentage()));
             MutableInt msgThroughputRequiredFromUnloadedBundles = new 
MutableInt(
-                    (int) ((maxThroughputRate.getValue() - 
minThroughputRate.getValue())
+                    (int) ((maxThroughput.getValue() - 
minThroughput.getValue())
                             * conf.getMaxUnloadPercentage()));
-            LocalBrokerData overloadedBrokerData = 
brokersData.get(overloadedBroker.getValue()).getLocalData();
-
-            if (overloadedBrokerData.getBundles().size() > 1
-                && (msgRateRequiredFromUnloadedBundles.getValue() >= 
conf.getMinUnloadMessage()
-                    || msgThroughputRequiredFromUnloadedBundles.getValue() >= 
conf.getMinUnloadMessageThroughput())) {
-                // Sort bundles by throughput, then pick the bundle which can 
help to reduce load uniformly with
-                // under-loaded broker
-                loadBundleData.entrySet().stream()
-                        .filter(e -> 
overloadedBrokerData.getBundles().contains(e.getKey()))
-                        .map((e) -> {
-                            String bundle = e.getKey();
-                            BundleData bundleData = e.getValue();
-                            TimeAverageMessageData shortTermData = 
bundleData.getShortTermData();
-                            double throughput = isMsgRateThresholdExceeded
-                                    ? shortTermData.getMsgRateIn() + 
shortTermData.getMsgRateOut()
-                                    : shortTermData.getMsgThroughputIn() + 
shortTermData.getMsgThroughputOut();
-                            return Triple.of(bundle, bundleData, throughput);
-                        }).filter(e -> 
!recentlyUnloadedBundles.containsKey(e.getLeft()))
-                        .sorted((e1, e2) -> Double.compare(e2.getRight(), 
e1.getRight())).forEach((e) -> {
-                            if (conf.getMaxUnloadBundleNumPerShedding() != -1
-                                    && selectedBundlesCache.size() >= 
conf.getMaxUnloadBundleNumPerShedding()) {
-                                return;
-                            }
-                            String bundle = e.getLeft();
-                            BundleData bundleData = e.getMiddle();
-                            TimeAverageMessageData shortTermData = 
bundleData.getShortTermData();
-                            double throughput = 
shortTermData.getMsgThroughputIn()
-                                    + shortTermData.getMsgThroughputOut();
-                            double bundleMsgRate = 
shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut();
-                            if (isMsgRateThresholdExceeded) {
+            if (isMsgRateThresholdExceeded) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Found bundles for uniform load balancing. "
+                                    + "msgRate overloaded broker: {} with 
msgRate: {}, "
+                                    + "msgRate underloaded broker: {} with 
msgRate: {}",
+                            msgRateOverloadedBroker.getValue(), 
maxMsgRate.getValue(),
+                            msgRateUnderloadedBroker.getValue(), 
minMsgRate.getValue());
+                }
+                LocalBrokerData overloadedBrokerData =
+                        
brokersData.get(msgRateOverloadedBroker.getValue()).getLocalData();
+                if (overloadedBrokerData.getBundles().size() > 1
+                        && (msgRateRequiredFromUnloadedBundles.getValue() >= 
conf.getMinUnloadMessage())) {
+                    // Sort bundles by throughput, then pick the bundle which 
can help to reduce load uniformly with
+                    // under-loaded broker
+                    loadBundleData.entrySet().stream()
+                            .filter(e -> 
overloadedBrokerData.getBundles().contains(e.getKey()))
+                            .map((e) -> {
+                                String bundle = e.getKey();
+                                TimeAverageMessageData shortTermData = 
e.getValue().getShortTermData();
+                                double msgRate = shortTermData.getMsgRateIn() 
+ shortTermData.getMsgRateOut();
+                                return Pair.of(bundle, msgRate);
+                            }).filter(e -> 
!recentlyUnloadedBundles.containsKey(e.getLeft()))
+                            .sorted((e1, e2) -> Double.compare(e2.getRight(), 
e1.getRight())).forEach((e) -> {
+                                if (conf.getMaxUnloadBundleNumPerShedding() != 
-1
+                                        && selectedBundlesCache.size() >= 
conf.getMaxUnloadBundleNumPerShedding()) {
+                                    return;
+                                }
+                                String bundle = e.getLeft();
+                                double bundleMsgRate = e.getRight();
                                 if (bundleMsgRate <= 
(msgRateRequiredFromUnloadedBundles.getValue()
                                         + 1000/* delta */)) {
                                     log.info("Found bundle to unload with 
msgRate {}", bundleMsgRate);
                                     
msgRateRequiredFromUnloadedBundles.add(-bundleMsgRate);
-                                    
selectedBundlesCache.put(overloadedBroker.getValue(), bundle);
+                                    
selectedBundlesCache.put(msgRateOverloadedBroker.getValue(), bundle);
                                 }
-                            } else {
-                                if (throughput <= 
(msgThroughputRequiredFromUnloadedBundles.getValue())) {
-                                    log.info("Found bundle to unload with 
throughput {}", throughput);
-                                    
msgThroughputRequiredFromUnloadedBundles.add(-throughput);
-                                    
selectedBundlesCache.put(overloadedBroker.getValue(), bundle);
+                            });
+                }
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("Found bundles for uniform load balancing. "
+                                    + "msgThroughput overloaded broker: {} 
with msgThroughput {}, "
+                                    + "msgThroughput underloaded broker: {} 
with msgThroughput: {}",
+                            msgThroughputOverloadedBroker.getValue(), 
maxThroughput.getValue(),
+                            msgThroughputUnderloadedBroker.getValue(), 
minThroughput.getValue());
+                }
+                LocalBrokerData overloadedBrokerData =
+                        
brokersData.get(msgThroughputOverloadedBroker.getValue()).getLocalData();
+                if (overloadedBrokerData.getBundles().size() > 1
+                        &&
+                        msgThroughputRequiredFromUnloadedBundles.getValue() >= 
conf.getMinUnloadMessageThroughput()) {
+                    // Sort bundles by throughput, then pick the bundle which 
can help to reduce load uniformly with
+                    // under-loaded broker
+                    loadBundleData.entrySet().stream()
+                            .filter(e -> 
overloadedBrokerData.getBundles().contains(e.getKey()))
+                            .map((e) -> {
+                                String bundle = e.getKey();
+                                TimeAverageMessageData shortTermData = 
e.getValue().getShortTermData();
+                                double msgThroughtput = 
shortTermData.getMsgThroughputIn()
+                                        + shortTermData.getMsgThroughputOut();
+                                return Pair.of(bundle, msgThroughtput);
+                            }).filter(e -> 
!recentlyUnloadedBundles.containsKey(e.getLeft()))
+                            .sorted((e1, e2) -> Double.compare(e2.getRight(), 
e1.getRight())).forEach((e) -> {
+                                if (conf.getMaxUnloadBundleNumPerShedding() != 
-1
+                                        && selectedBundlesCache.size() >= 
conf.getMaxUnloadBundleNumPerShedding()) {
+                                    return;
                                 }
-                            }
-                        });
+                                String bundle = e.getLeft();
+                                double msgThroughput = e.getRight();
+                                if (msgThroughput <= 
(msgThroughputRequiredFromUnloadedBundles.getValue()
+                                        + 1000/* delta */)) {
+                                    log.info("Found bundle to unload with 
msgThroughput {}", msgThroughput);
+                                    
msgThroughputRequiredFromUnloadedBundles.add(-msgThroughput);
+                                    
selectedBundlesCache.put(msgThroughputOverloadedBroker.getValue(), bundle);
+                                }
+                            });
+                }
+
             }
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedderTest.java
index 00182fffb8a..4b4042cf31a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedderTest.java
@@ -26,6 +26,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 
 @Test(groups = "broker")
 public class UniformLoadShedderTest {
@@ -119,4 +120,81 @@ public class UniformLoadShedderTest {
         assertFalse(bundlesToUnload.isEmpty());
     }
 
+    @Test
+    public void testOverloadBrokerSelect() {
+        conf.setMaxUnloadBundleNumPerShedding(1);
+        conf.setMaxUnloadPercentage(0.5);
+        int numBrokers = 5;
+        int numBundles = 5;
+        LoadData loadData = new LoadData();
+
+        LocalBrokerData[] localBrokerDatas = new LocalBrokerData[]{
+                new LocalBrokerData(),
+                new LocalBrokerData(),
+                new LocalBrokerData(),
+                new LocalBrokerData(),
+                new LocalBrokerData()};
+
+        String[] brokerNames = new String[]{"broker0", "broker1", "broker2", 
"broker3", "broker4"};
+
+        double[] brokerMsgRates = new double[]{
+                50000, // broker0
+                60000, // broker1
+                70000, // broker2
+                10000, // broker3
+                20000};// broker4
+
+        double[] brokerMsgThroughputs = new double[]{
+                50 * 1024 * 1024, // broker0
+                60 * 1024 * 1024, // broker1
+                70 * 1024 * 1024, // broker2
+                80 * 1024 * 1024, // broker3
+                10 * 1024 * 1024};// broker4
+
+
+        for (int brokerId = 0; brokerId < numBrokers; brokerId++) {
+            double msgRate = brokerMsgRates[brokerId] / numBundles;
+            double throughput = brokerMsgThroughputs[brokerId] / numBundles;
+            for (int i = 0; i < numBundles; ++i) {
+                String bundleName = "broker-" + brokerId + "-bundle-" + i;
+                localBrokerDatas[brokerId].getBundles().add(bundleName);
+                
localBrokerDatas[brokerId].setMsgRateIn(brokerMsgRates[brokerId]);
+                
localBrokerDatas[brokerId].setMsgThroughputIn(brokerMsgThroughputs[brokerId]);
+                BundleData bundle = new BundleData();
+
+                TimeAverageMessageData timeAverageMessageData = new 
TimeAverageMessageData();
+                timeAverageMessageData.setMsgRateIn(msgRate);
+                timeAverageMessageData.setMsgThroughputIn(throughput);
+                bundle.setShortTermData(timeAverageMessageData);
+                loadData.getBundleData().put(bundleName, bundle);
+            }
+           loadData.getBrokerData().put(brokerNames[brokerId], new 
BrokerData(localBrokerDatas[brokerId]));
+        }
+
+        // disable throughput based load shedding, enable rate based load 
shedding only
+        conf.setLoadBalancerMsgRateDifferenceShedderThreshold(50);
+        
conf.setLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold(0);
+
+        Multimap<String, String> bundlesToUnload = 
uniformLoadShedder.findBundlesForUnloading(loadData, conf);
+        assertEquals(bundlesToUnload.size(), 1);
+        assertTrue(bundlesToUnload.containsKey("broker2"));
+
+
+        // disable rate based load shedding, enable throughput based load 
shedding only
+        conf.setLoadBalancerMsgRateDifferenceShedderThreshold(0);
+        
conf.setLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold(2);
+
+        bundlesToUnload = uniformLoadShedder.findBundlesForUnloading(loadData, 
conf);
+        assertEquals(bundlesToUnload.size(), 1);
+        assertTrue(bundlesToUnload.containsKey("broker3"));
+
+        // enable both rate and throughput based load shedding, but rate based 
load shedding has higher priority
+        conf.setLoadBalancerMsgRateDifferenceShedderThreshold(50);
+        
conf.setLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold(2);
+
+        bundlesToUnload = uniformLoadShedder.findBundlesForUnloading(loadData, 
conf);
+        assertEquals(bundlesToUnload.size(), 1);
+        assertTrue(bundlesToUnload.containsKey("broker2"));
+    }
+
 }

Reply via email to