merlimat closed pull request #1488: Load manager should offload multiple 
bundles when overloaded
URL: https://github.com/apache/incubator-pulsar/pull/1488
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/broker.conf b/conf/broker.conf
index cec3c0fc9..d78fbbc02 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -189,7 +189,7 @@ maxConsumersPerSubscription=0
 proxyRoles=
 
 # If this flag is set then the broker authenticates the original Auth data
-# else it just accepts the originalPrincipal and authorizes it (if required).  
+# else it just accepts the originalPrincipal and authorizes it (if required).
 authenticateOriginalAuthData=false
 
 # Enable TLS
@@ -356,7 +356,7 @@ loadBalancerHostUsageCheckIntervalMinutes=1
 
 # Load shedding interval. Broker periodically checks whether some traffic 
should be offload from
 # some over-loaded broker to other under-loaded brokers
-loadBalancerSheddingIntervalMinutes=5
+loadBalancerSheddingIntervalMinutes=1
 
 # Prevent the same topics to be shed and moved to other broker more that once 
within this timeframe
 loadBalancerSheddingGracePeriodMinutes=30
@@ -388,11 +388,11 @@ loadBalancerNamespaceBundleMaxBandwidthMbytes=100
 # maximum number of bundles in a namespace
 loadBalancerNamespaceMaximumBundles=128
 
-# Override the auto-detection of the network interfaces max speed. 
+# Override the auto-detection of the network interfaces max speed.
 # This option is useful in some environments (eg: EC2 VMs) where the max speed
 # reported by Linux is not reflecting the real bandwidth available to the 
broker.
 # Since the network usage is employed by the load manager to decide when a 
broker
-# is overloaded, it is important to make sure the info is correct or override 
it 
+# is overloaded, it is important to make sure the info is correct or override 
it
 # with the right value here. The configured value can be a double (eg: 0.8) 
and that
 # can be used to trigger load-shedding even before hitting on NIC limits.
 loadBalancerOverrideBrokerNicSpeedGbps=
@@ -452,4 +452,3 @@ exposeTopicLevelMetricsInPrometheus=true
 
 # Enable Functions Worker Service in Broker
 functionsWorkerEnabled=false
-
diff --git a/conf/standalone.conf b/conf/standalone.conf
index dc6f1b743..b593f00d6 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -172,7 +172,7 @@ maxConsumersPerSubscription=0
 proxyRoles=
 
 # If this flag is set then the broker authenticates the original Auth data
-# else it just accepts the originalPrincipal and authorizes it (if required). 
+# else it just accepts the originalPrincipal and authorizes it (if required).
 authenticateOriginalAuthData=false
 
 # Enable authentication
@@ -318,7 +318,7 @@ loadBalancerHostUsageCheckIntervalMinutes=1
 
 # Load shedding interval. Broker periodically checks whether some traffic 
should be offload from
 # some over-loaded broker to other under-loaded brokers
-loadBalancerSheddingIntervalMinutes=5
+loadBalancerSheddingIntervalMinutes=1
 
 # Prevent the same topics to be shed and moved to other broker more that once 
within this timeframe
 loadBalancerSheddingGracePeriodMinutes=30
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2e09ba9ad..dd9cabdc3 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -365,7 +365,7 @@
     private boolean loadBalancerSheddingEnabled = true;
     // Load shedding interval. Broker periodically checks whether some traffic 
should be offload from some over-loaded
     // broker to other under-loaded brokers
-    private int loadBalancerSheddingIntervalMinutes = 5;
+    private int loadBalancerSheddingIntervalMinutes = 1;
     // Prevent the same topics to be shed and moved to other broker more that
     // once within this timeframe
     private long loadBalancerSheddingGracePeriodMinutes = 30;
@@ -375,7 +375,9 @@
     // Usage threshold to allocate max number of topics to broker
     @FieldContext(dynamic = true)
     private int loadBalancerBrokerMaxTopics = 50000;
+
     // Usage threshold to determine a broker as over-loaded
+    @FieldContext(dynamic = true)
     private int loadBalancerBrokerOverloadedThresholdPercentage = 85;
     // Interval to flush dynamic resource quota to ZooKeeper
     private int loadBalancerResourceQuotaUpdateIntervalMinutes = 15;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java
index c5e8f9745..ef2e546fa 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.loadbalance;
 
-import java.util.Map;
+import com.google.common.collect.Multimap;
 
 import org.apache.pulsar.broker.ServiceConfiguration;
 
@@ -29,12 +29,12 @@
 
     /**
      * Recommend that all of the returned bundles be unloaded.
-     * 
+     *
      * @param loadData
      *            The load data to used to make the unloading decision.
      * @param conf
      *            The service configuration.
      * @return A map from all selected bundles to the brokers on which they 
reside.
      */
-    Map<String, String> findBundlesForUnloading(LoadData loadData, 
ServiceConfiguration conf);
+    Multimap<String, String> findBundlesForUnloading(LoadData loadData, 
ServiceConfiguration conf);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/DeviationShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/DeviationShedder.java
index cb9d6d038..b096018bd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/DeviationShedder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/DeviationShedder.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
-import java.util.HashMap;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
 import java.util.Map;
 import java.util.TreeSet;
 
@@ -61,7 +63,7 @@ public DeviationShedder() {
     /**
      * Recommend that all of the returned bundles be unloaded based on 
observing excessive standard deviations according
      * to some metric.
-     * 
+     *
      * @param loadData
      *            The load data to used to make the unloading decision.
      * @param conf
@@ -69,8 +71,8 @@ public DeviationShedder() {
      * @return A map from all selected bundles to the brokers on which they 
reside.
      */
     @Override
-    public Map<String, String> findBundlesForUnloading(final LoadData 
loadData, final ServiceConfiguration conf) {
-        final Map<String, String> result = new HashMap<>();
+    public Multimap<String, String> findBundlesForUnloading(final LoadData 
loadData, final ServiceConfiguration conf) {
+        final Multimap<String, String> result = ArrayListMultimap.create();
         bundleTreeSetCache.clear();
         metricTreeSetCache.clear();
         double sum = 0;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 4b72a1070..23f651e71 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -22,6 +22,11 @@
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.web.PulsarWebResource.path;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -56,6 +61,7 @@
 import org.apache.pulsar.broker.loadbalance.ModularLoadManager;
 import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
 import 
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
@@ -79,10 +85,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-
 public class ModularLoadManagerImpl implements ModularLoadManager, 
ZooKeeperCacheListener<LocalBrokerData> {
     private static final Logger log = 
LoggerFactory.getLogger(ModularLoadManagerImpl.class);
 
@@ -192,7 +194,7 @@ public ModularLoadManagerImpl() {
         filterPipeline = new ArrayList<>();
         loadData = new LoadData();
         loadSheddingPipeline = new ArrayList<>();
-        loadSheddingPipeline.add(new OverloadShedder(conf));
+        loadSheddingPipeline.add(new OverloadShedder());
         preallocatedBundleToBroker = new ConcurrentHashMap<>();
         scheduler = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("pulsar-modular-load-manager"));
         this.brokerToFailureDomainMap = Maps.newHashMap();
@@ -583,28 +585,27 @@ public synchronized void doLoadShedding() {
                 - 
TimeUnit.MINUTES.toMillis(conf.getLoadBalancerSheddingGracePeriodMinutes());
         final Map<String, Long> recentlyUnloadedBundles = 
loadData.getRecentlyUnloadedBundles();
         recentlyUnloadedBundles.keySet().removeIf(e -> 
recentlyUnloadedBundles.get(e) < timeout);
+
         for (LoadSheddingStrategy strategy : loadSheddingPipeline) {
-            final Map<String, String> bundlesToUnload = 
strategy.findBundlesForUnloading(loadData, conf);
-            if (bundlesToUnload != null && !bundlesToUnload.isEmpty()) {
-                try {
-                    for (Map.Entry<String, String> entry : 
bundlesToUnload.entrySet()) {
-                        final String broker = entry.getKey();
-                        final String bundle = entry.getValue();
-
-                        final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
-                        final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundle);
-                        if(!shouldAntiAffinityNamespaceUnload(namespaceName, 
bundleRange, broker)) {
-                            continue;
-                        }
-                        log.info("Unloading bundle: {} from broker {}", 
bundle, broker);
+            final Multimap<String, String> bundlesToUnload = 
strategy.findBundlesForUnloading(loadData, conf);
+
+            bundlesToUnload.asMap().forEach((broker, bundles) -> {
+                bundles.forEach(bundle -> {
+                    final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+                    final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundle);
+                    if (!shouldAntiAffinityNamespaceUnload(namespaceName, 
bundleRange, broker)) {
+                        return;
+                    }
+
+                    log.info("[Overload shedder] Unloading bundle: {} from 
broker {}", bundle, broker);
+                    try {
                         
pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, 
bundleRange);
                         loadData.getRecentlyUnloadedBundles().put(bundle, 
System.currentTimeMillis());
+                    } catch (PulsarServerException | PulsarAdminException e) {
+                        log.warn("Error when trying to perform load shedding 
on {} for broker {}", bundle, broker, e);
                     }
-                } catch (Exception e) {
-                    log.warn("Error when trying to perform load shedding", e);
-                }
-                return;
-            }
+                });
+            });
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
index ed71defd4..972bfc974 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
@@ -18,10 +18,14 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
-import java.util.HashMap;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
 import java.util.Map;
 
-import org.apache.pulsar.broker.BrokerData;
+import org.apache.bookkeeper.mledger.util.Pair;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableDouble;
 import org.apache.pulsar.broker.BundleData;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.TimeAverageMessageData;
@@ -40,21 +44,15 @@
  * rate that has not been recently unloaded.
  */
 public class OverloadShedder implements LoadSheddingStrategy {
+
     private static final Logger log = 
LoggerFactory.getLogger(OverloadShedder.class);
-    private Map<String, String> selectedBundlesCache;
 
-    /**
-     * Create an OverloadShedder with the service configuration.
-     *
-     * @param conf
-     *            Service configuration to create from.
-     */
-    public OverloadShedder(final ServiceConfiguration conf) {
-        selectedBundlesCache = new HashMap<>();
-    }
+    private final Multimap<String, String> selectedBundlesCache = 
ArrayListMultimap.create();
+
+    private final static double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05;
 
     /**
-     * Attempt to shed one bundle off every broker which is overloaded.
+     * Attempt to shed some bundles off every broker which is overloaded.
      *
      * @param loadData
      *            The load data to used to make the unloading decision.
@@ -62,51 +60,73 @@ public OverloadShedder(final ServiceConfiguration conf) {
      *            The service configuration.
      * @return A map from bundles to unload to the brokers on which they are 
loaded.
      */
-    public Map<String, String> findBundlesForUnloading(final LoadData 
loadData, final ServiceConfiguration conf) {
+    public Multimap<String, String> findBundlesForUnloading(final LoadData 
loadData, final ServiceConfiguration conf) {
         selectedBundlesCache.clear();
         final double overloadThreshold = 
conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
         final Map<String, Long> recentlyUnloadedBundles = 
loadData.getRecentlyUnloadedBundles();
-        for (final Map.Entry<String, BrokerData> entry : 
loadData.getBrokerData().entrySet()) {
-            final String broker = entry.getKey();
-            final BrokerData brokerData = entry.getValue();
+
+        // Check every broker and select
+        loadData.getBrokerData().forEach((broker, brokerData) -> {
+
             final LocalBrokerData localData = brokerData.getLocalData();
-            final double maxUsage = localData.getMaxResourceUsage();
-            if (maxUsage >= overloadThreshold) {
-                log.info("Attempting to shed load on {}, which has max 
resource usage {}%", broker, maxUsage);
-                double maxMessageRate = Double.NEGATIVE_INFINITY;
-                String mostTaxingBundle = null;
-                if (localData.getBundles().size() > 1) {
-                    for (final String bundle : localData.getBundles()) {
-                        final BundleData bundleData = 
loadData.getBundleData().get(bundle);
-                        if (bundleData == null) {
-                            continue;
-                        }
-
-                        // Consider short-term message rate to address system 
resource burden
-                        final TimeAverageMessageData shortTermData = 
bundleData.getShortTermData();
-                        final double messageRate = 
shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut();
-                        // The burden of checking the timestamp is for the 
load manager, not the strategy.
-                        if (messageRate > maxMessageRate && 
!recentlyUnloadedBundles.containsKey(bundle)) {
-                            maxMessageRate = messageRate;
-                            mostTaxingBundle = bundle;
-                        }
-                    }
-                    if (mostTaxingBundle != null) {
-                        selectedBundlesCache.put(broker, mostTaxingBundle);
-                    } else {
-                        log.warn("Load shedding could not be performed on 
broker {} because all bundles assigned to it "
-                                + "have recently been unloaded");
-                    }
-                } else if (localData.getBundles().size() == 1) {
-                    log.warn(
-                            "HIGH USAGE WARNING : Sole namespace bundle {} is 
overloading broker {}. "
-                                    + "No Load Shedding will be done on this 
broker",
-                            localData.getBundles().iterator().next(), broker);
-                } else {
-                    log.warn("Broker {} is overloaded despite having no 
bundles", broker);
+            final double currentUsage = localData.getMaxResourceUsage();
+            if (currentUsage < overloadThreshold) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Broker is not overloaded, ignoring at this 
point", broker);
                 }
+                return;
             }
-        }
+
+            // We want to offload enough traffic such that this broker will go 
below the overload threshold
+            // Also, add a small margin so that this broker won't be very 
close to the threshold edge.
+            double percentOfTrafficToOffload = currentUsage - 
overloadThreshold + ADDITIONAL_THRESHOLD_PERCENT_MARGIN;
+            double brokerCurrentThroughput = localData.getMsgThroughputIn() + 
localData.getMsgThroughputOut();
+
+            double minimumThroughputToOffload = brokerCurrentThroughput * 
percentOfTrafficToOffload;
+
+            log.info(
+                    "Attempting to shed load on {}, which has max resource 
usage above threshold {}% > {}% -- Offloading at least {} MByte/s of traffic",
+                    broker, currentUsage, overloadThreshold, 
minimumThroughputToOffload / 1024 / 1024);
+
+            MutableDouble trafficMarkedToOffload = new MutableDouble(0);
+            MutableBoolean atLeastOneBundleSelected = new 
MutableBoolean(false);
+
+            if (localData.getBundles().size() > 1) {
+                // Sort bundles by throughput, then pick the biggest N which 
combined make up for at least the minimum throughput to offload
+
+                loadData.getBundleData().entrySet().stream().map((e) -> {
+                    // Map to throughput value
+                    // Consider short-term byte rate to address system 
resource burden
+                    String bundle = e.getKey();
+                    BundleData bundleData = e.getValue();
+                    TimeAverageMessageData shortTermData = 
bundleData.getShortTermData();
+                    double throughput = shortTermData.getMsgThroughputIn() + 
shortTermData.getMsgThroughputOut();
+                    return Pair.create(bundle, throughput);
+                }).filter(e -> {
+                    // Only consider bundles that were not already unloaded 
recently
+                    return !recentlyUnloadedBundles.containsKey(e.first);
+                }).sorted((e1, e2) -> {
+                    // Sort by throughput in reverse order
+                    return Double.compare(e2.second, e1.second);
+                }).forEach(e -> {
+                    if (trafficMarkedToOffload.doubleValue() < 
minimumThroughputToOffload
+                            || atLeastOneBundleSelected.isFalse()) {
+                       selectedBundlesCache.put(broker, e.first);
+                       trafficMarkedToOffload.add(e.second);
+                       atLeastOneBundleSelected.setTrue();
+                   }
+                });
+            } else if (localData.getBundles().size() == 1) {
+                log.warn(
+                        "HIGH USAGE WARNING : Sole namespace bundle {} is 
overloading broker {}. "
+                                + "No Load Shedding will be done on this 
broker",
+                        localData.getBundles().iterator().next(), broker);
+            } else {
+                log.warn("Broker {} is overloaded despite having no bundles", 
broker);
+            }
+
+        });
+
         return selectedBundlesCache;
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedderTest.java
new file mode 100644
index 000000000..7b0df5d5e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedderTest.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+import org.apache.pulsar.broker.BrokerData;
+import org.apache.pulsar.broker.BundleData;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TimeAverageMessageData;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.testng.annotations.Test;
+
+public class OverloadShedderTest {
+
+    private final OverloadShedder os = new OverloadShedder();
+    private final ServiceConfiguration conf;
+
+    public OverloadShedderTest() {
+        conf = new ServiceConfiguration();
+        conf.setLoadBalancerBrokerOverloadedThresholdPercentage(85);
+    }
+
+    @Test
+    public void testNoBrokers() {
+        LoadData loadData = new LoadData();
+        assertTrue(os.findBundlesForUnloading(loadData, conf).isEmpty());
+    }
+
+    @Test
+    public void testBrokersWithNoBundles() {
+        LoadData loadData = new LoadData();
+
+        LocalBrokerData broker1 = new LocalBrokerData();
+        broker1.setBandwidthIn(new ResourceUsage(999, 1000));
+        broker1.setBandwidthOut(new ResourceUsage(999, 1000));
+        loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+
+        assertTrue(os.findBundlesForUnloading(loadData, conf).isEmpty());
+    }
+
+    @Test
+    public void testBrokerNotOverloaded() {
+        LoadData loadData = new LoadData();
+
+        LocalBrokerData broker1 = new LocalBrokerData();
+        broker1.setBandwidthIn(new ResourceUsage(500, 1000));
+        broker1.setBandwidthOut(new ResourceUsage(500, 1000));
+        broker1.setBundles(Sets.newHashSet("bundle-1"));
+
+        BundleData bundle1 = new BundleData();
+        TimeAverageMessageData db1 = new TimeAverageMessageData();
+        db1.setMsgThroughputIn(1000);
+        db1.setMsgThroughputOut(1000);
+        bundle1.setShortTermData(db1);
+        loadData.getBundleData().put("bundle-1", bundle1);
+
+        loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+
+        assertTrue(os.findBundlesForUnloading(loadData, conf).isEmpty());
+    }
+
+    @Test
+    public void testBrokerWithSingleBundle() {
+        LoadData loadData = new LoadData();
+
+        LocalBrokerData broker1 = new LocalBrokerData();
+        broker1.setBandwidthIn(new ResourceUsage(999, 1000));
+        broker1.setBandwidthOut(new ResourceUsage(999, 1000));
+        broker1.setBundles(Sets.newHashSet("bundle-1"));
+
+        BundleData bundle1 = new BundleData();
+        TimeAverageMessageData db1 = new TimeAverageMessageData();
+        db1.setMsgThroughputIn(1000);
+        db1.setMsgThroughputOut(1000);
+        bundle1.setShortTermData(db1);
+        loadData.getBundleData().put("bundle-1", bundle1);
+
+        loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+
+        assertTrue(os.findBundlesForUnloading(loadData, conf).isEmpty());
+    }
+
+    @Test
+    public void testBrokerWithMultipleBundles() {
+        int numBundles = 10;
+        LoadData loadData = new LoadData();
+
+        LocalBrokerData broker1 = new LocalBrokerData();
+        broker1.setBandwidthIn(new ResourceUsage(999, 1000));
+        broker1.setBandwidthOut(new ResourceUsage(999, 1000));
+
+        double brokerThroghput = 0;
+
+        for (int i = 1; i <= numBundles; i++) {
+            broker1.getBundles().add("bundle-" + i);
+
+            BundleData bundle = new BundleData();
+            TimeAverageMessageData db = new TimeAverageMessageData();
+
+            double throughput = i * 1024 * 1024;
+            db.setMsgThroughputIn(throughput);
+            db.setMsgThroughputOut(throughput);
+            bundle.setShortTermData(db);
+            loadData.getBundleData().put("bundle-" + i, bundle);
+
+            brokerThroghput += throughput;
+        }
+
+        broker1.setMsgThroughputIn(brokerThroghput);
+        broker1.setMsgThroughputOut(brokerThroghput);
+
+        loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+
+        Multimap<String, String> bundlesToUnload = 
os.findBundlesForUnloading(loadData, conf);
+        assertFalse(bundlesToUnload.isEmpty());
+        assertEquals(bundlesToUnload.get("broker-1"), 
Lists.newArrayList("bundle-10", "bundle-9"));
+    }
+
+    @Test
+    public void testFilterRecentlyUnloaded() {
+        int numBundles = 10;
+        LoadData loadData = new LoadData();
+
+        LocalBrokerData broker1 = new LocalBrokerData();
+        broker1.setBandwidthIn(new ResourceUsage(999, 1000));
+        broker1.setBandwidthOut(new ResourceUsage(999, 1000));
+
+        double brokerThroghput = 0;
+
+        for (int i = 1; i <= numBundles; i++) {
+            broker1.getBundles().add("bundle-" + i);
+
+            BundleData bundle = new BundleData();
+            TimeAverageMessageData db = new TimeAverageMessageData();
+
+            double throughput = i * 1024 * 1024;
+            db.setMsgThroughputIn(throughput);
+            db.setMsgThroughputOut(throughput);
+            bundle.setShortTermData(db);
+            loadData.getBundleData().put("bundle-" + i, bundle);
+
+            brokerThroghput += throughput;
+        }
+
+        broker1.setMsgThroughputIn(brokerThroghput);
+        broker1.setMsgThroughputOut(brokerThroghput);
+
+        loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+
+        loadData.getRecentlyUnloadedBundles().put("bundle-10", 1L);
+        loadData.getRecentlyUnloadedBundles().put("bundle-9", 1L);
+
+        Multimap<String, String> bundlesToUnload = 
os.findBundlesForUnloading(loadData, conf);
+        assertFalse(bundlesToUnload.isEmpty());
+        assertEquals(bundlesToUnload.get("broker-1"), 
Lists.newArrayList("bundle-8", "bundle-7"));
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
index 4c754d2fb..129cf5d81 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
@@ -108,7 +108,7 @@ public LocalBrokerData(final String webServiceUrl, final 
String webServiceUrlTls
 
     /**
      * Using the system resource usage and bundle stats acquired from the 
Pulsar client, update this LocalBrokerData.
-     * 
+     *
      * @param systemResourceUsage
      *            System resource usage (cpu, memory, and direct memory).
      * @param bundleStats
@@ -123,7 +123,7 @@ public void update(final SystemResourceUsage 
systemResourceUsage,
 
     /**
      * Using another LocalBrokerData, update this.
-     * 
+     *
      * @param other
      *            LocalBrokerData to update from.
      */
@@ -196,10 +196,20 @@ private void updateBundleData(final Map<String, 
NamespaceBundleStats> bundleStat
     }
 
     public double getMaxResourceUsage() {
-        return Math
-                .max(Math.max(Math.max(cpu.percentUsage(), 
memory.percentUsage()),
-                        Math.max(directMemory.percentUsage(), 
bandwidthIn.percentUsage())), bandwidthOut.percentUsage())
-                / 100;
+        return max(cpu.percentUsage(), memory.percentUsage(), 
directMemory.percentUsage(), bandwidthIn.percentUsage(),
+                bandwidthOut.percentUsage()) / 100;
+    }
+
+    private static float max(float...args) {
+        float max = Float.NEGATIVE_INFINITY;
+
+        for (float d : args) {
+            if (d > max) {
+                max = d;
+            }
+        }
+
+        return max;
     }
 
     public String getLoadReportType() {
@@ -392,7 +402,7 @@ public String getPulsarServiceUrl() {
     public String getPulsarServiceUrlTls() {
         return pulsarServiceUrlTls;
     }
-    
+
     @Override
     public boolean isPersistentTopicsEnabled() {
         return persistentTopicsEnabled;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to