This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f6cc450  Load manager should offload multiple bundles when overloaded 
(#1488)
f6cc450 is described below

commit f6cc45037c215ef1085b4a0bfc7e3a3fd7ff9f1c
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Wed Apr 4 08:53:37 2018 -0700

    Load manager should offload multiple bundles when overloaded (#1488)
    
    * Load manager should offload multiple bundles when overloaded
    
    * Added test
    
    * Fixed import
    
    * Fixed logger import
---
 conf/broker.conf                                   |   9 +-
 conf/standalone.conf                               |   4 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |   4 +-
 .../broker/loadbalance/LoadSheddingStrategy.java   |   6 +-
 .../broker/loadbalance/impl/DeviationShedder.java  |  10 +-
 .../loadbalance/impl/ModularLoadManagerImpl.java   |  47 +++---
 .../broker/loadbalance/impl/OverloadShedder.java   | 124 ++++++++------
 .../loadbalance/impl/OverloadShedderTest.java      | 182 +++++++++++++++++++++
 .../data/loadbalancer/LocalBrokerData.java         |  24 ++-
 9 files changed, 313 insertions(+), 97 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index cec3c0f..d78fbbc 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -189,7 +189,7 @@ maxConsumersPerSubscription=0
 proxyRoles=
 
 # If this flag is set then the broker authenticates the original Auth data
-# else it just accepts the originalPrincipal and authorizes it (if required).  
+# else it just accepts the originalPrincipal and authorizes it (if required).
 authenticateOriginalAuthData=false
 
 # Enable TLS
@@ -356,7 +356,7 @@ loadBalancerHostUsageCheckIntervalMinutes=1
 
 # Load shedding interval. Broker periodically checks whether some traffic 
should be offload from
 # some over-loaded broker to other under-loaded brokers
-loadBalancerSheddingIntervalMinutes=5
+loadBalancerSheddingIntervalMinutes=1
 
 # Prevent the same topics to be shed and moved to other broker more that once 
within this timeframe
 loadBalancerSheddingGracePeriodMinutes=30
@@ -388,11 +388,11 @@ loadBalancerNamespaceBundleMaxBandwidthMbytes=100
 # maximum number of bundles in a namespace
 loadBalancerNamespaceMaximumBundles=128
 
-# Override the auto-detection of the network interfaces max speed. 
+# Override the auto-detection of the network interfaces max speed.
 # This option is useful in some environments (eg: EC2 VMs) where the max speed
 # reported by Linux is not reflecting the real bandwidth available to the 
broker.
 # Since the network usage is employed by the load manager to decide when a 
broker
-# is overloaded, it is important to make sure the info is correct or override 
it 
+# is overloaded, it is important to make sure the info is correct or override 
it
 # with the right value here. The configured value can be a double (eg: 0.8) 
and that
 # can be used to trigger load-shedding even before hitting on NIC limits.
 loadBalancerOverrideBrokerNicSpeedGbps=
@@ -452,4 +452,3 @@ exposeTopicLevelMetricsInPrometheus=true
 
 # Enable Functions Worker Service in Broker
 functionsWorkerEnabled=false
-
diff --git a/conf/standalone.conf b/conf/standalone.conf
index dc6f1b7..b593f00 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -172,7 +172,7 @@ maxConsumersPerSubscription=0
 proxyRoles=
 
 # If this flag is set then the broker authenticates the original Auth data
-# else it just accepts the originalPrincipal and authorizes it (if required). 
+# else it just accepts the originalPrincipal and authorizes it (if required).
 authenticateOriginalAuthData=false
 
 # Enable authentication
@@ -318,7 +318,7 @@ loadBalancerHostUsageCheckIntervalMinutes=1
 
 # Load shedding interval. Broker periodically checks whether some traffic 
should be offload from
 # some over-loaded broker to other under-loaded brokers
-loadBalancerSheddingIntervalMinutes=5
+loadBalancerSheddingIntervalMinutes=1
 
 # Prevent the same topics to be shed and moved to other broker more that once 
within this timeframe
 loadBalancerSheddingGracePeriodMinutes=30
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2e09ba9..dd9cabd 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -365,7 +365,7 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     private boolean loadBalancerSheddingEnabled = true;
     // Load shedding interval. Broker periodically checks whether some traffic 
should be offload from some over-loaded
     // broker to other under-loaded brokers
-    private int loadBalancerSheddingIntervalMinutes = 5;
+    private int loadBalancerSheddingIntervalMinutes = 1;
     // Prevent the same topics to be shed and moved to other broker more that
     // once within this timeframe
     private long loadBalancerSheddingGracePeriodMinutes = 30;
@@ -375,7 +375,9 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     // Usage threshold to allocate max number of topics to broker
     @FieldContext(dynamic = true)
     private int loadBalancerBrokerMaxTopics = 50000;
+
     // Usage threshold to determine a broker as over-loaded
+    @FieldContext(dynamic = true)
     private int loadBalancerBrokerOverloadedThresholdPercentage = 85;
     // Interval to flush dynamic resource quota to ZooKeeper
     private int loadBalancerResourceQuotaUpdateIntervalMinutes = 15;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java
index c5e8f97..ef2e546 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.loadbalance;
 
-import java.util.Map;
+import com.google.common.collect.Multimap;
 
 import org.apache.pulsar.broker.ServiceConfiguration;
 
@@ -29,12 +29,12 @@ public interface LoadSheddingStrategy {
 
     /**
      * Recommend that all of the returned bundles be unloaded.
-     * 
+     *
      * @param loadData
      *            The load data to used to make the unloading decision.
      * @param conf
      *            The service configuration.
      * @return A map from all selected bundles to the brokers on which they 
reside.
      */
-    Map<String, String> findBundlesForUnloading(LoadData loadData, 
ServiceConfiguration conf);
+    Multimap<String, String> findBundlesForUnloading(LoadData loadData, 
ServiceConfiguration conf);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/DeviationShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/DeviationShedder.java
index cb9d6d0..b096018 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/DeviationShedder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/DeviationShedder.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
-import java.util.HashMap;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
 import java.util.Map;
 import java.util.TreeSet;
 
@@ -61,7 +63,7 @@ public abstract class DeviationShedder implements 
LoadSheddingStrategy {
     /**
      * Recommend that all of the returned bundles be unloaded based on 
observing excessive standard deviations according
      * to some metric.
-     * 
+     *
      * @param loadData
      *            The load data to used to make the unloading decision.
      * @param conf
@@ -69,8 +71,8 @@ public abstract class DeviationShedder implements 
LoadSheddingStrategy {
      * @return A map from all selected bundles to the brokers on which they 
reside.
      */
     @Override
-    public Map<String, String> findBundlesForUnloading(final LoadData 
loadData, final ServiceConfiguration conf) {
-        final Map<String, String> result = new HashMap<>();
+    public Multimap<String, String> findBundlesForUnloading(final LoadData 
loadData, final ServiceConfiguration conf) {
+        final Multimap<String, String> result = ArrayListMultimap.create();
         bundleTreeSetCache.clear();
         metricTreeSetCache.clear();
         double sum = 0;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 4b72a10..23f651e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -22,6 +22,11 @@ import static 
org.apache.pulsar.broker.admin.AdminResource.jsonMapper;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.web.PulsarWebResource.path;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -56,6 +61,7 @@ import 
org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
 import org.apache.pulsar.broker.loadbalance.ModularLoadManager;
 import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
 import 
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
@@ -79,10 +85,6 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-
 public class ModularLoadManagerImpl implements ModularLoadManager, 
ZooKeeperCacheListener<LocalBrokerData> {
     private static final Logger log = 
LoggerFactory.getLogger(ModularLoadManagerImpl.class);
 
@@ -192,7 +194,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, ZooKeeperCach
         filterPipeline = new ArrayList<>();
         loadData = new LoadData();
         loadSheddingPipeline = new ArrayList<>();
-        loadSheddingPipeline.add(new OverloadShedder(conf));
+        loadSheddingPipeline.add(new OverloadShedder());
         preallocatedBundleToBroker = new ConcurrentHashMap<>();
         scheduler = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("pulsar-modular-load-manager"));
         this.brokerToFailureDomainMap = Maps.newHashMap();
@@ -583,28 +585,27 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, ZooKeeperCach
                 - 
TimeUnit.MINUTES.toMillis(conf.getLoadBalancerSheddingGracePeriodMinutes());
         final Map<String, Long> recentlyUnloadedBundles = 
loadData.getRecentlyUnloadedBundles();
         recentlyUnloadedBundles.keySet().removeIf(e -> 
recentlyUnloadedBundles.get(e) < timeout);
+
         for (LoadSheddingStrategy strategy : loadSheddingPipeline) {
-            final Map<String, String> bundlesToUnload = 
strategy.findBundlesForUnloading(loadData, conf);
-            if (bundlesToUnload != null && !bundlesToUnload.isEmpty()) {
-                try {
-                    for (Map.Entry<String, String> entry : 
bundlesToUnload.entrySet()) {
-                        final String broker = entry.getKey();
-                        final String bundle = entry.getValue();
-
-                        final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
-                        final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundle);
-                        if(!shouldAntiAffinityNamespaceUnload(namespaceName, 
bundleRange, broker)) {
-                            continue;
-                        }
-                        log.info("Unloading bundle: {} from broker {}", 
bundle, broker);
+            final Multimap<String, String> bundlesToUnload = 
strategy.findBundlesForUnloading(loadData, conf);
+
+            bundlesToUnload.asMap().forEach((broker, bundles) -> {
+                bundles.forEach(bundle -> {
+                    final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+                    final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundle);
+                    if (!shouldAntiAffinityNamespaceUnload(namespaceName, 
bundleRange, broker)) {
+                        return;
+                    }
+
+                    log.info("[Overload shedder] Unloading bundle: {} from 
broker {}", bundle, broker);
+                    try {
                         
pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, 
bundleRange);
                         loadData.getRecentlyUnloadedBundles().put(bundle, 
System.currentTimeMillis());
+                    } catch (PulsarServerException | PulsarAdminException e) {
+                        log.warn("Error when trying to perform load shedding 
on {} for broker {}", bundle, broker, e);
                     }
-                } catch (Exception e) {
-                    log.warn("Error when trying to perform load shedding", e);
-                }
-                return;
-            }
+                });
+            });
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
index ed71def..972bfc9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
@@ -18,10 +18,14 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
-import java.util.HashMap;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
 import java.util.Map;
 
-import org.apache.pulsar.broker.BrokerData;
+import org.apache.bookkeeper.mledger.util.Pair;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableDouble;
 import org.apache.pulsar.broker.BundleData;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.TimeAverageMessageData;
@@ -40,21 +44,15 @@ import org.slf4j.LoggerFactory;
  * rate that has not been recently unloaded.
  */
 public class OverloadShedder implements LoadSheddingStrategy {
+
     private static final Logger log = 
LoggerFactory.getLogger(OverloadShedder.class);
-    private Map<String, String> selectedBundlesCache;
 
-    /**
-     * Create an OverloadShedder with the service configuration.
-     *
-     * @param conf
-     *            Service configuration to create from.
-     */
-    public OverloadShedder(final ServiceConfiguration conf) {
-        selectedBundlesCache = new HashMap<>();
-    }
+    private final Multimap<String, String> selectedBundlesCache = 
ArrayListMultimap.create();
+
+    private final static double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05;
 
     /**
-     * Attempt to shed one bundle off every broker which is overloaded.
+     * Attempt to shed some bundles off every broker which is overloaded.
      *
      * @param loadData
      *            The load data to used to make the unloading decision.
@@ -62,51 +60,73 @@ public class OverloadShedder implements 
LoadSheddingStrategy {
      *            The service configuration.
      * @return A map from bundles to unload to the brokers on which they are 
loaded.
      */
-    public Map<String, String> findBundlesForUnloading(final LoadData 
loadData, final ServiceConfiguration conf) {
+    public Multimap<String, String> findBundlesForUnloading(final LoadData 
loadData, final ServiceConfiguration conf) {
         selectedBundlesCache.clear();
         final double overloadThreshold = 
conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
         final Map<String, Long> recentlyUnloadedBundles = 
loadData.getRecentlyUnloadedBundles();
-        for (final Map.Entry<String, BrokerData> entry : 
loadData.getBrokerData().entrySet()) {
-            final String broker = entry.getKey();
-            final BrokerData brokerData = entry.getValue();
+
+        // Check every broker and select
+        loadData.getBrokerData().forEach((broker, brokerData) -> {
+
             final LocalBrokerData localData = brokerData.getLocalData();
-            final double maxUsage = localData.getMaxResourceUsage();
-            if (maxUsage >= overloadThreshold) {
-                log.info("Attempting to shed load on {}, which has max 
resource usage {}%", broker, maxUsage);
-                double maxMessageRate = Double.NEGATIVE_INFINITY;
-                String mostTaxingBundle = null;
-                if (localData.getBundles().size() > 1) {
-                    for (final String bundle : localData.getBundles()) {
-                        final BundleData bundleData = 
loadData.getBundleData().get(bundle);
-                        if (bundleData == null) {
-                            continue;
-                        }
-
-                        // Consider short-term message rate to address system 
resource burden
-                        final TimeAverageMessageData shortTermData = 
bundleData.getShortTermData();
-                        final double messageRate = 
shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut();
-                        // The burden of checking the timestamp is for the 
load manager, not the strategy.
-                        if (messageRate > maxMessageRate && 
!recentlyUnloadedBundles.containsKey(bundle)) {
-                            maxMessageRate = messageRate;
-                            mostTaxingBundle = bundle;
-                        }
-                    }
-                    if (mostTaxingBundle != null) {
-                        selectedBundlesCache.put(broker, mostTaxingBundle);
-                    } else {
-                        log.warn("Load shedding could not be performed on 
broker {} because all bundles assigned to it "
-                                + "have recently been unloaded");
-                    }
-                } else if (localData.getBundles().size() == 1) {
-                    log.warn(
-                            "HIGH USAGE WARNING : Sole namespace bundle {} is 
overloading broker {}. "
-                                    + "No Load Shedding will be done on this 
broker",
-                            localData.getBundles().iterator().next(), broker);
-                } else {
-                    log.warn("Broker {} is overloaded despite having no 
bundles", broker);
+            final double currentUsage = localData.getMaxResourceUsage();
+            if (currentUsage < overloadThreshold) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Broker is not overloaded, ignoring at this 
point", broker);
                 }
+                return;
             }
-        }
+
+            // We want to offload enough traffic such that this broker will go 
below the overload threshold
+            // Also, add a small margin so that this broker won't be very 
close to the threshold edge.
+            double percentOfTrafficToOffload = currentUsage - 
overloadThreshold + ADDITIONAL_THRESHOLD_PERCENT_MARGIN;
+            double brokerCurrentThroughput = localData.getMsgThroughputIn() + 
localData.getMsgThroughputOut();
+
+            double minimumThroughputToOffload = brokerCurrentThroughput * 
percentOfTrafficToOffload;
+
+            log.info(
+                    "Attempting to shed load on {}, which has max resource 
usage above threshold {}% > {}% -- Offloading at least {} MByte/s of traffic",
+                    broker, currentUsage, overloadThreshold, 
minimumThroughputToOffload / 1024 / 1024);
+
+            MutableDouble trafficMarkedToOffload = new MutableDouble(0);
+            MutableBoolean atLeastOneBundleSelected = new 
MutableBoolean(false);
+
+            if (localData.getBundles().size() > 1) {
+                // Sort bundles by throughput, then pick the biggest N which 
combined make up for at least the minimum throughput to offload
+
+                loadData.getBundleData().entrySet().stream().map((e) -> {
+                    // Map to throughput value
+                    // Consider short-term byte rate to address system 
resource burden
+                    String bundle = e.getKey();
+                    BundleData bundleData = e.getValue();
+                    TimeAverageMessageData shortTermData = 
bundleData.getShortTermData();
+                    double throughput = shortTermData.getMsgThroughputIn() + 
shortTermData.getMsgThroughputOut();
+                    return Pair.create(bundle, throughput);
+                }).filter(e -> {
+                    // Only consider bundles that were not already unloaded 
recently
+                    return !recentlyUnloadedBundles.containsKey(e.first);
+                }).sorted((e1, e2) -> {
+                    // Sort by throughput in reverse order
+                    return Double.compare(e2.second, e1.second);
+                }).forEach(e -> {
+                    if (trafficMarkedToOffload.doubleValue() < 
minimumThroughputToOffload
+                            || atLeastOneBundleSelected.isFalse()) {
+                       selectedBundlesCache.put(broker, e.first);
+                       trafficMarkedToOffload.add(e.second);
+                       atLeastOneBundleSelected.setTrue();
+                   }
+                });
+            } else if (localData.getBundles().size() == 1) {
+                log.warn(
+                        "HIGH USAGE WARNING : Sole namespace bundle {} is 
overloading broker {}. "
+                                + "No Load Shedding will be done on this 
broker",
+                        localData.getBundles().iterator().next(), broker);
+            } else {
+                log.warn("Broker {} is overloaded despite having no bundles", 
broker);
+            }
+
+        });
+
         return selectedBundlesCache;
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedderTest.java
new file mode 100644
index 0000000..7b0df5d
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedderTest.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+import org.apache.pulsar.broker.BrokerData;
+import org.apache.pulsar.broker.BundleData;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TimeAverageMessageData;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.testng.annotations.Test;
+
+public class OverloadShedderTest {
+
+    private final OverloadShedder os = new OverloadShedder();
+    private final ServiceConfiguration conf;
+
+    public OverloadShedderTest() {
+        conf = new ServiceConfiguration();
+        conf.setLoadBalancerBrokerOverloadedThresholdPercentage(85);
+    }
+
+    @Test
+    public void testNoBrokers() {
+        LoadData loadData = new LoadData();
+        assertTrue(os.findBundlesForUnloading(loadData, conf).isEmpty());
+    }
+
+    @Test
+    public void testBrokersWithNoBundles() {
+        LoadData loadData = new LoadData();
+
+        LocalBrokerData broker1 = new LocalBrokerData();
+        broker1.setBandwidthIn(new ResourceUsage(999, 1000));
+        broker1.setBandwidthOut(new ResourceUsage(999, 1000));
+        loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+
+        assertTrue(os.findBundlesForUnloading(loadData, conf).isEmpty());
+    }
+
+    @Test
+    public void testBrokerNotOverloaded() {
+        LoadData loadData = new LoadData();
+
+        LocalBrokerData broker1 = new LocalBrokerData();
+        broker1.setBandwidthIn(new ResourceUsage(500, 1000));
+        broker1.setBandwidthOut(new ResourceUsage(500, 1000));
+        broker1.setBundles(Sets.newHashSet("bundle-1"));
+
+        BundleData bundle1 = new BundleData();
+        TimeAverageMessageData db1 = new TimeAverageMessageData();
+        db1.setMsgThroughputIn(1000);
+        db1.setMsgThroughputOut(1000);
+        bundle1.setShortTermData(db1);
+        loadData.getBundleData().put("bundle-1", bundle1);
+
+        loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+
+        assertTrue(os.findBundlesForUnloading(loadData, conf).isEmpty());
+    }
+
+    @Test
+    public void testBrokerWithSingleBundle() {
+        LoadData loadData = new LoadData();
+
+        LocalBrokerData broker1 = new LocalBrokerData();
+        broker1.setBandwidthIn(new ResourceUsage(999, 1000));
+        broker1.setBandwidthOut(new ResourceUsage(999, 1000));
+        broker1.setBundles(Sets.newHashSet("bundle-1"));
+
+        BundleData bundle1 = new BundleData();
+        TimeAverageMessageData db1 = new TimeAverageMessageData();
+        db1.setMsgThroughputIn(1000);
+        db1.setMsgThroughputOut(1000);
+        bundle1.setShortTermData(db1);
+        loadData.getBundleData().put("bundle-1", bundle1);
+
+        loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+
+        assertTrue(os.findBundlesForUnloading(loadData, conf).isEmpty());
+    }
+
+    @Test
+    public void testBrokerWithMultipleBundles() {
+        int numBundles = 10;
+        LoadData loadData = new LoadData();
+
+        LocalBrokerData broker1 = new LocalBrokerData();
+        broker1.setBandwidthIn(new ResourceUsage(999, 1000));
+        broker1.setBandwidthOut(new ResourceUsage(999, 1000));
+
+        double brokerThroghput = 0;
+
+        for (int i = 1; i <= numBundles; i++) {
+            broker1.getBundles().add("bundle-" + i);
+
+            BundleData bundle = new BundleData();
+            TimeAverageMessageData db = new TimeAverageMessageData();
+
+            double throughput = i * 1024 * 1024;
+            db.setMsgThroughputIn(throughput);
+            db.setMsgThroughputOut(throughput);
+            bundle.setShortTermData(db);
+            loadData.getBundleData().put("bundle-" + i, bundle);
+
+            brokerThroghput += throughput;
+        }
+
+        broker1.setMsgThroughputIn(brokerThroghput);
+        broker1.setMsgThroughputOut(brokerThroghput);
+
+        loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+
+        Multimap<String, String> bundlesToUnload = 
os.findBundlesForUnloading(loadData, conf);
+        assertFalse(bundlesToUnload.isEmpty());
+        assertEquals(bundlesToUnload.get("broker-1"), 
Lists.newArrayList("bundle-10", "bundle-9"));
+    }
+
+    @Test
+    public void testFilterRecentlyUnloaded() {
+        int numBundles = 10;
+        LoadData loadData = new LoadData();
+
+        LocalBrokerData broker1 = new LocalBrokerData();
+        broker1.setBandwidthIn(new ResourceUsage(999, 1000));
+        broker1.setBandwidthOut(new ResourceUsage(999, 1000));
+
+        double brokerThroghput = 0;
+
+        for (int i = 1; i <= numBundles; i++) {
+            broker1.getBundles().add("bundle-" + i);
+
+            BundleData bundle = new BundleData();
+            TimeAverageMessageData db = new TimeAverageMessageData();
+
+            double throughput = i * 1024 * 1024;
+            db.setMsgThroughputIn(throughput);
+            db.setMsgThroughputOut(throughput);
+            bundle.setShortTermData(db);
+            loadData.getBundleData().put("bundle-" + i, bundle);
+
+            brokerThroghput += throughput;
+        }
+
+        broker1.setMsgThroughputIn(brokerThroghput);
+        broker1.setMsgThroughputOut(brokerThroghput);
+
+        loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+
+        loadData.getRecentlyUnloadedBundles().put("bundle-10", 1L);
+        loadData.getRecentlyUnloadedBundles().put("bundle-9", 1L);
+
+        Multimap<String, String> bundlesToUnload = 
os.findBundlesForUnloading(loadData, conf);
+        assertFalse(bundlesToUnload.isEmpty());
+        assertEquals(bundlesToUnload.get("broker-1"), 
Lists.newArrayList("bundle-8", "bundle-7"));
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
index 4c754d2..129cf5d 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
@@ -108,7 +108,7 @@ public class LocalBrokerData extends JSONWritable 
implements LoadManagerReport {
 
     /**
      * Using the system resource usage and bundle stats acquired from the 
Pulsar client, update this LocalBrokerData.
-     * 
+     *
      * @param systemResourceUsage
      *            System resource usage (cpu, memory, and direct memory).
      * @param bundleStats
@@ -123,7 +123,7 @@ public class LocalBrokerData extends JSONWritable 
implements LoadManagerReport {
 
     /**
      * Using another LocalBrokerData, update this.
-     * 
+     *
      * @param other
      *            LocalBrokerData to update from.
      */
@@ -196,10 +196,20 @@ public class LocalBrokerData extends JSONWritable 
implements LoadManagerReport {
     }
 
     public double getMaxResourceUsage() {
-        return Math
-                .max(Math.max(Math.max(cpu.percentUsage(), 
memory.percentUsage()),
-                        Math.max(directMemory.percentUsage(), 
bandwidthIn.percentUsage())), bandwidthOut.percentUsage())
-                / 100;
+        return max(cpu.percentUsage(), memory.percentUsage(), 
directMemory.percentUsage(), bandwidthIn.percentUsage(),
+                bandwidthOut.percentUsage()) / 100;
+    }
+
+    private static float max(float...args) {
+        float max = Float.NEGATIVE_INFINITY;
+
+        for (float d : args) {
+            if (d > max) {
+                max = d;
+            }
+        }
+
+        return max;
     }
 
     public String getLoadReportType() {
@@ -392,7 +402,7 @@ public class LocalBrokerData extends JSONWritable 
implements LoadManagerReport {
     public String getPulsarServiceUrlTls() {
         return pulsarServiceUrlTls;
     }
-    
+
     @Override
     public boolean isPersistentTopicsEnabled() {
         return persistentTopicsEnabled;

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to