This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 222e69a53ff [improve][broker] PIP-192 Improve TransferSheder for 
overload outlier for large clusters (#20059)
222e69a53ff is described below

commit 222e69a53ffc098115251253a2bc4fe8542ce55b
Author: Heesung Sohn <[email protected]>
AuthorDate: Tue Apr 11 01:59:04 2023 -0700

    [improve][broker] PIP-192 Improve TransferSheder for overload outlier for 
large clusters (#20059)
---
 .../extensions/scheduler/TransferShedder.java      | 69 +++++++++++++------
 .../extensions/scheduler/TransferShedderTest.java  | 78 ++++++++++++++++++++++
 2 files changed, 126 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
index 98e05296d60..07d521a28af 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
@@ -291,6 +291,10 @@ public class TransferShedder implements 
NamespaceUnloadStrategy {
             return brokersSortedByLoad.get(minBrokerIndex).getKey();
         }
 
+        String peekMaxBroker() {
+            return brokersSortedByLoad.get(maxBrokerIndex).getKey();
+        }
+
         String pollMaxBroker() {
             return brokersSortedByLoad.get(maxBrokerIndex--).getKey();
         }
@@ -358,7 +362,9 @@ public class TransferShedder implements 
NamespaceUnloadStrategy {
 
             final double targetStd = conf.getLoadBalancerBrokerLoadTargetStd();
             boolean transfer = conf.isLoadBalancerTransferEnabled();
-            if (stats.std() > targetStd || isUnderLoaded(context, 
stats.peekMinBroker(), stats.avg)) {
+            if (stats.std() > targetStd
+                    || isUnderLoaded(context, stats.peekMinBroker(), stats.avg)
+                    || isOverLoaded(context, stats.peekMaxBroker(), 
stats.avg)) {
                 unloadConditionHitCount++;
             } else {
                 unloadConditionHitCount = 0;
@@ -383,28 +389,36 @@ public class TransferShedder implements 
NamespaceUnloadStrategy {
                     break;
                 }
                 UnloadDecision.Reason reason;
-                if (stats.std() <= targetStd) {
-                    if (!isUnderLoaded(context, stats.peekMinBroker(), 
stats.avg)) {
-                        if (debugMode) {
-                            log.info(CANNOT_CONTINUE_UNLOAD_MSG
-                                            + "The overall cluster load meets 
the target, std:{} <= targetStd:{},"
-                                            + " and minBroker:{} is not 
underloaded.",
-                                    stats.std(), targetStd, 
stats.peekMinBroker());
-                        }
-                        break;
-                    } else {
-                        reason = Underloaded;
-                        if (debugMode) {
-                            log.info(String.format("broker:%s is 
underloaded:%s although "
-                                            + "load std:%.2f <= 
targetStd:%.2f. "
-                                            + "Continuing unload for this 
underloaded broker.",
-                                    stats.peekMinBroker(),
-                                    
context.brokerLoadDataStore().get(stats.peekMinBroker()).get(),
-                                    stats.std(), targetStd));
-                        }
+                if (stats.std() > targetStd) {
+                    reason = Overloaded;
+                } else if (isUnderLoaded(context, stats.peekMinBroker(), 
stats.avg)) {
+                    reason = Underloaded;
+                    if (debugMode) {
+                        log.info(String.format("broker:%s is underloaded:%s 
although "
+                                        + "load std:%.2f <= targetStd:%.2f. "
+                                        + "Continuing unload for this 
underloaded broker.",
+                                stats.peekMinBroker(),
+                                
context.brokerLoadDataStore().get(stats.peekMinBroker()).get(),
+                                stats.std(), targetStd));
                     }
-                } else {
+                } else if (isOverLoaded(context, stats.peekMaxBroker(), 
stats.avg)) {
                     reason = Overloaded;
+                    if (debugMode) {
+                        log.info(String.format("broker:%s is overloaded:%s 
although "
+                                        + "load std:%.2f <= targetStd:%.2f. "
+                                        + "Continuing unload for this 
overloaded broker.",
+                                stats.peekMaxBroker(),
+                                
context.brokerLoadDataStore().get(stats.peekMaxBroker()).get(),
+                                stats.std(), targetStd));
+                    }
+                } else {
+                    if (debugMode) {
+                        log.info(CANNOT_CONTINUE_UNLOAD_MSG
+                                        + "The overall cluster load meets the 
target, std:{} <= targetStd:{}."
+                                        + "minBroker:{} is not underloaded. 
maxBroker:{} is not overloaded.",
+                                stats.std(), targetStd, stats.peekMinBroker(), 
stats.peekMaxBroker());
+                    }
+                    break;
                 }
 
                 String maxBroker = stats.pollMaxBroker();
@@ -671,6 +685,19 @@ public class TransferShedder implements 
NamespaceUnloadStrategy {
                 
context.brokerConfiguration().getLoadBalancerBrokerLoadTargetStd() / 2));
     }
 
+    private boolean isOverLoaded(LoadManagerContext context, String broker, 
double avgLoad) {
+        var brokerLoadDataOptional = context.brokerLoadDataStore().get(broker);
+        if (brokerLoadDataOptional.isEmpty()) {
+            return false;
+        }
+        var conf = context.brokerConfiguration();
+        var overloadThreshold = 
conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
+        var targetStd = conf.getLoadBalancerBrokerLoadTargetStd();
+        var brokerLoadData = brokerLoadDataOptional.get();
+        var load = brokerLoadData.getWeightedMaxEMA();
+        return load > overloadThreshold && load > avgLoad + targetStd;
+    }
+
 
     private boolean isTransferable(LoadManagerContext context,
                                    Map<String, BrokerLookupData> 
availableBrokers,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
index 3279cb4e475..d673722d6fb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -200,6 +200,54 @@ public class TransferShedderTest {
         return ctx;
     }
 
+    public LoadManagerContext setupContextLoadSkewedOverload(int clusterSize) {
+        var ctx = getContext();
+
+        var brokerLoadDataStore = ctx.brokerLoadDataStore();
+        var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+
+        int i = 0;
+        for (; i < clusterSize-1; i++) {
+            int brokerLoad = 1;
+            topBundlesLoadDataStore.pushAsync("broker" + i, 
getTopBundlesLoad("my-tenant/my-namespace" + i,
+                    300_000, 700_000));
+            brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx,  
brokerLoad, "broker" + i));
+        }
+        int brokerLoad = 100;
+        topBundlesLoadDataStore.pushAsync("broker" + i, 
getTopBundlesLoad("my-tenant/my-namespace" + i,
+                30_000_000, 70_000_000));
+        brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx,  
brokerLoad, "broker" + i));
+
+        return ctx;
+    }
+
+    public LoadManagerContext setupContextLoadSkewedUnderload(int clusterSize) 
{
+        var ctx = getContext();
+
+        var brokerLoadDataStore = ctx.brokerLoadDataStore();
+        var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+
+        int i = 0;
+        for (; i < clusterSize-2; i++) {
+            int brokerLoad = 98;
+            topBundlesLoadDataStore.pushAsync("broker" + i, 
getTopBundlesLoad("my-tenant/my-namespace" + i,
+                    30_000_000, 70_000_000));
+            brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx,  
brokerLoad, "broker" + i));
+        }
+
+        int brokerLoad = 99;
+        topBundlesLoadDataStore.pushAsync("broker" + i, 
getTopBundlesLoad("my-tenant/my-namespace" + i,
+                30_000_000, 70_000_000));
+        brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx,  
brokerLoad, "broker" + i));
+        i++;
+
+        brokerLoad = 1;
+        topBundlesLoadDataStore.pushAsync("broker" + i, 
getTopBundlesLoad("my-tenant/my-namespace" + i,
+                300_000, 700_000));
+        brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx,  
brokerLoad, "broker" + i));
+        return ctx;
+    }
+
     public BrokerLoadData getCpuLoad(LoadManagerContext ctx, int load, String 
broker) {
         var loadData = new BrokerLoadData();
         SystemResourceUsage usage1 = new SystemResourceUsage();
@@ -1176,6 +1224,36 @@ public class TransferShedderTest {
         }
     }
 
+    @Test
+    public void testOverloadOutlier() {
+        UnloadCounter counter = new UnloadCounter();
+        TransferShedder transferShedder = new TransferShedder(counter);
+        var ctx = setupContextLoadSkewedOverload(100);
+        var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), 
Map.of());
+        var expected = new HashSet<UnloadDecision>();
+        expected.add(new UnloadDecision(
+                new Unload("broker99", 
"my-tenant/my-namespace99/0x00000000_0x0FFFFFFF",
+                        Optional.of("broker52")), Success, Overloaded));
+        assertEquals(res, expected);
+        assertEquals(counter.getLoadAvg(), 0.019900000000000008);
+        assertEquals(counter.getLoadStd(), 0.09850375627355534);
+    }
+
+    @Test
+    public void testUnderloadOutlier() {
+        UnloadCounter counter = new UnloadCounter();
+        TransferShedder transferShedder = new TransferShedder(counter);
+        var ctx = setupContextLoadSkewedUnderload(100);
+        var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), 
Map.of());
+        var expected = new HashSet<UnloadDecision>();
+        expected.add(new UnloadDecision(
+                new Unload("broker98", 
"my-tenant/my-namespace98/0x00000000_0x0FFFFFFF",
+                        Optional.of("broker99")), Success, Underloaded));
+        assertEquals(res, expected);
+        assertEquals(counter.getLoadAvg(), 0.9704000000000005);
+        assertEquals(counter.getLoadStd(), 0.09652895938523735);
+    }
+
     @Test
     public void testRandomLoadStats() {
         int numBrokers = 10;

Reply via email to