This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f6cc450 Load manager should offload multiple bundles when overloaded (#1488) f6cc450 is described below commit f6cc45037c215ef1085b4a0bfc7e3a3fd7ff9f1c Author: Matteo Merli <mme...@apache.org> AuthorDate: Wed Apr 4 08:53:37 2018 -0700 Load manager should offload multiple bundles when overloaded (#1488) * Load manager should offload multiple bundles when overloaded * Added test * Fixed import * Fixed logger import --- conf/broker.conf | 9 +- conf/standalone.conf | 4 +- .../apache/pulsar/broker/ServiceConfiguration.java | 4 +- .../broker/loadbalance/LoadSheddingStrategy.java | 6 +- .../broker/loadbalance/impl/DeviationShedder.java | 10 +- .../loadbalance/impl/ModularLoadManagerImpl.java | 47 +++--- .../broker/loadbalance/impl/OverloadShedder.java | 124 ++++++++------ .../loadbalance/impl/OverloadShedderTest.java | 182 +++++++++++++++++++++ .../data/loadbalancer/LocalBrokerData.java | 24 ++- 9 files changed, 313 insertions(+), 97 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index cec3c0f..d78fbbc 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -189,7 +189,7 @@ maxConsumersPerSubscription=0 proxyRoles= # If this flag is set then the broker authenticates the original Auth data -# else it just accepts the originalPrincipal and authorizes it (if required). +# else it just accepts the originalPrincipal and authorizes it (if required). authenticateOriginalAuthData=false # Enable TLS @@ -356,7 +356,7 @@ loadBalancerHostUsageCheckIntervalMinutes=1 # Load shedding interval. Broker periodically checks whether some traffic should be offload from # some over-loaded broker to other under-loaded brokers -loadBalancerSheddingIntervalMinutes=5 +loadBalancerSheddingIntervalMinutes=1 # Prevent the same topics to be shed and moved to other broker more that once within this timeframe loadBalancerSheddingGracePeriodMinutes=30 @@ -388,11 +388,11 @@ loadBalancerNamespaceBundleMaxBandwidthMbytes=100 # maximum number of bundles in a namespace loadBalancerNamespaceMaximumBundles=128 -# Override the auto-detection of the network interfaces max speed. +# Override the auto-detection of the network interfaces max speed. # This option is useful in some environments (eg: EC2 VMs) where the max speed # reported by Linux is not reflecting the real bandwidth available to the broker. # Since the network usage is employed by the load manager to decide when a broker -# is overloaded, it is important to make sure the info is correct or override it +# is overloaded, it is important to make sure the info is correct or override it # with the right value here. The configured value can be a double (eg: 0.8) and that # can be used to trigger load-shedding even before hitting on NIC limits. loadBalancerOverrideBrokerNicSpeedGbps= @@ -452,4 +452,3 @@ exposeTopicLevelMetricsInPrometheus=true # Enable Functions Worker Service in Broker functionsWorkerEnabled=false - diff --git a/conf/standalone.conf b/conf/standalone.conf index dc6f1b7..b593f00 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -172,7 +172,7 @@ maxConsumersPerSubscription=0 proxyRoles= # If this flag is set then the broker authenticates the original Auth data -# else it just accepts the originalPrincipal and authorizes it (if required). +# else it just accepts the originalPrincipal and authorizes it (if required). authenticateOriginalAuthData=false # Enable authentication @@ -318,7 +318,7 @@ loadBalancerHostUsageCheckIntervalMinutes=1 # Load shedding interval. Broker periodically checks whether some traffic should be offload from # some over-loaded broker to other under-loaded brokers -loadBalancerSheddingIntervalMinutes=5 +loadBalancerSheddingIntervalMinutes=1 # Prevent the same topics to be shed and moved to other broker more that once within this timeframe loadBalancerSheddingGracePeriodMinutes=30 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 2e09ba9..dd9cabd 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -365,7 +365,7 @@ public class ServiceConfiguration implements PulsarConfiguration { private boolean loadBalancerSheddingEnabled = true; // Load shedding interval. Broker periodically checks whether some traffic should be offload from some over-loaded // broker to other under-loaded brokers - private int loadBalancerSheddingIntervalMinutes = 5; + private int loadBalancerSheddingIntervalMinutes = 1; // Prevent the same topics to be shed and moved to other broker more that // once within this timeframe private long loadBalancerSheddingGracePeriodMinutes = 30; @@ -375,7 +375,9 @@ public class ServiceConfiguration implements PulsarConfiguration { // Usage threshold to allocate max number of topics to broker @FieldContext(dynamic = true) private int loadBalancerBrokerMaxTopics = 50000; + // Usage threshold to determine a broker as over-loaded + @FieldContext(dynamic = true) private int loadBalancerBrokerOverloadedThresholdPercentage = 85; // Interval to flush dynamic resource quota to ZooKeeper private int loadBalancerResourceQuotaUpdateIntervalMinutes = 15; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java index c5e8f97..ef2e546 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.broker.loadbalance; -import java.util.Map; +import com.google.common.collect.Multimap; import org.apache.pulsar.broker.ServiceConfiguration; @@ -29,12 +29,12 @@ public interface LoadSheddingStrategy { /** * Recommend that all of the returned bundles be unloaded. - * + * * @param loadData * The load data to used to make the unloading decision. * @param conf * The service configuration. * @return A map from all selected bundles to the brokers on which they reside. */ - Map<String, String> findBundlesForUnloading(LoadData loadData, ServiceConfiguration conf); + Multimap<String, String> findBundlesForUnloading(LoadData loadData, ServiceConfiguration conf); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/DeviationShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/DeviationShedder.java index cb9d6d0..b096018 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/DeviationShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/DeviationShedder.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.broker.loadbalance.impl; -import java.util.HashMap; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; + import java.util.Map; import java.util.TreeSet; @@ -61,7 +63,7 @@ public abstract class DeviationShedder implements LoadSheddingStrategy { /** * Recommend that all of the returned bundles be unloaded based on observing excessive standard deviations according * to some metric. - * + * * @param loadData * The load data to used to make the unloading decision. * @param conf @@ -69,8 +71,8 @@ public abstract class DeviationShedder implements LoadSheddingStrategy { * @return A map from all selected bundles to the brokers on which they reside. */ @Override - public Map<String, String> findBundlesForUnloading(final LoadData loadData, final ServiceConfiguration conf) { - final Map<String, String> result = new HashMap<>(); + public Multimap<String, String> findBundlesForUnloading(final LoadData loadData, final ServiceConfiguration conf) { + final Multimap<String, String> result = ArrayListMultimap.create(); bundleTreeSetCache.clear(); metricTreeSetCache.clear(); double sum = 0; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 4b72a10..23f651e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -22,6 +22,11 @@ import static org.apache.pulsar.broker.admin.AdminResource.jsonMapper; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.apache.pulsar.broker.web.PulsarWebResource.path; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; + +import io.netty.util.concurrent.DefaultThreadFactory; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -56,6 +61,7 @@ import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy; import org.apache.pulsar.broker.loadbalance.ModularLoadManager; import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; @@ -79,10 +85,6 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Maps; - -import io.netty.util.concurrent.DefaultThreadFactory; - public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCacheListener<LocalBrokerData> { private static final Logger log = LoggerFactory.getLogger(ModularLoadManagerImpl.class); @@ -192,7 +194,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach filterPipeline = new ArrayList<>(); loadData = new LoadData(); loadSheddingPipeline = new ArrayList<>(); - loadSheddingPipeline.add(new OverloadShedder(conf)); + loadSheddingPipeline.add(new OverloadShedder()); preallocatedBundleToBroker = new ConcurrentHashMap<>(); scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-modular-load-manager")); this.brokerToFailureDomainMap = Maps.newHashMap(); @@ -583,28 +585,27 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach - TimeUnit.MINUTES.toMillis(conf.getLoadBalancerSheddingGracePeriodMinutes()); final Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles(); recentlyUnloadedBundles.keySet().removeIf(e -> recentlyUnloadedBundles.get(e) < timeout); + for (LoadSheddingStrategy strategy : loadSheddingPipeline) { - final Map<String, String> bundlesToUnload = strategy.findBundlesForUnloading(loadData, conf); - if (bundlesToUnload != null && !bundlesToUnload.isEmpty()) { - try { - for (Map.Entry<String, String> entry : bundlesToUnload.entrySet()) { - final String broker = entry.getKey(); - final String bundle = entry.getValue(); - - final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle); - final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle); - if(!shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) { - continue; - } - log.info("Unloading bundle: {} from broker {}", bundle, broker); + final Multimap<String, String> bundlesToUnload = strategy.findBundlesForUnloading(loadData, conf); + + bundlesToUnload.asMap().forEach((broker, bundles) -> { + bundles.forEach(bundle -> { + final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle); + final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle); + if (!shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) { + return; + } + + log.info("[Overload shedder] Unloading bundle: {} from broker {}", bundle, broker); + try { pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, bundleRange); loadData.getRecentlyUnloadedBundles().put(bundle, System.currentTimeMillis()); + } catch (PulsarServerException | PulsarAdminException e) { + log.warn("Error when trying to perform load shedding on {} for broker {}", bundle, broker, e); } - } catch (Exception e) { - log.warn("Error when trying to perform load shedding", e); - } - return; - } + }); + }); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java index ed71def..972bfc9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java @@ -18,10 +18,14 @@ */ package org.apache.pulsar.broker.loadbalance.impl; -import java.util.HashMap; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; + import java.util.Map; -import org.apache.pulsar.broker.BrokerData; +import org.apache.bookkeeper.mledger.util.Pair; +import org.apache.commons.lang3.mutable.MutableBoolean; +import org.apache.commons.lang3.mutable.MutableDouble; import org.apache.pulsar.broker.BundleData; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.TimeAverageMessageData; @@ -40,21 +44,15 @@ import org.slf4j.LoggerFactory; * rate that has not been recently unloaded. */ public class OverloadShedder implements LoadSheddingStrategy { + private static final Logger log = LoggerFactory.getLogger(OverloadShedder.class); - private Map<String, String> selectedBundlesCache; - /** - * Create an OverloadShedder with the service configuration. - * - * @param conf - * Service configuration to create from. - */ - public OverloadShedder(final ServiceConfiguration conf) { - selectedBundlesCache = new HashMap<>(); - } + private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create(); + + private final static double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05; /** - * Attempt to shed one bundle off every broker which is overloaded. + * Attempt to shed some bundles off every broker which is overloaded. * * @param loadData * The load data to used to make the unloading decision. @@ -62,51 +60,73 @@ public class OverloadShedder implements LoadSheddingStrategy { * The service configuration. * @return A map from bundles to unload to the brokers on which they are loaded. */ - public Map<String, String> findBundlesForUnloading(final LoadData loadData, final ServiceConfiguration conf) { + public Multimap<String, String> findBundlesForUnloading(final LoadData loadData, final ServiceConfiguration conf) { selectedBundlesCache.clear(); final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0; final Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles(); - for (final Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) { - final String broker = entry.getKey(); - final BrokerData brokerData = entry.getValue(); + + // Check every broker and select + loadData.getBrokerData().forEach((broker, brokerData) -> { + final LocalBrokerData localData = brokerData.getLocalData(); - final double maxUsage = localData.getMaxResourceUsage(); - if (maxUsage >= overloadThreshold) { - log.info("Attempting to shed load on {}, which has max resource usage {}%", broker, maxUsage); - double maxMessageRate = Double.NEGATIVE_INFINITY; - String mostTaxingBundle = null; - if (localData.getBundles().size() > 1) { - for (final String bundle : localData.getBundles()) { - final BundleData bundleData = loadData.getBundleData().get(bundle); - if (bundleData == null) { - continue; - } - - // Consider short-term message rate to address system resource burden - final TimeAverageMessageData shortTermData = bundleData.getShortTermData(); - final double messageRate = shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut(); - // The burden of checking the timestamp is for the load manager, not the strategy. - if (messageRate > maxMessageRate && !recentlyUnloadedBundles.containsKey(bundle)) { - maxMessageRate = messageRate; - mostTaxingBundle = bundle; - } - } - if (mostTaxingBundle != null) { - selectedBundlesCache.put(broker, mostTaxingBundle); - } else { - log.warn("Load shedding could not be performed on broker {} because all bundles assigned to it " - + "have recently been unloaded"); - } - } else if (localData.getBundles().size() == 1) { - log.warn( - "HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. " - + "No Load Shedding will be done on this broker", - localData.getBundles().iterator().next(), broker); - } else { - log.warn("Broker {} is overloaded despite having no bundles", broker); + final double currentUsage = localData.getMaxResourceUsage(); + if (currentUsage < overloadThreshold) { + if (log.isDebugEnabled()) { + log.debug("[{}] Broker is not overloaded, ignoring at this point", broker); } + return; } - } + + // We want to offload enough traffic such that this broker will go below the overload threshold + // Also, add a small margin so that this broker won't be very close to the threshold edge. + double percentOfTrafficToOffload = currentUsage - overloadThreshold + ADDITIONAL_THRESHOLD_PERCENT_MARGIN; + double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut(); + + double minimumThroughputToOffload = brokerCurrentThroughput * percentOfTrafficToOffload; + + log.info( + "Attempting to shed load on {}, which has max resource usage above threshold {}% > {}% -- Offloading at least {} MByte/s of traffic", + broker, currentUsage, overloadThreshold, minimumThroughputToOffload / 1024 / 1024); + + MutableDouble trafficMarkedToOffload = new MutableDouble(0); + MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false); + + if (localData.getBundles().size() > 1) { + // Sort bundles by throughput, then pick the biggest N which combined make up for at least the minimum throughput to offload + + loadData.getBundleData().entrySet().stream().map((e) -> { + // Map to throughput value + // Consider short-term byte rate to address system resource burden + String bundle = e.getKey(); + BundleData bundleData = e.getValue(); + TimeAverageMessageData shortTermData = bundleData.getShortTermData(); + double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut(); + return Pair.create(bundle, throughput); + }).filter(e -> { + // Only consider bundles that were not already unloaded recently + return !recentlyUnloadedBundles.containsKey(e.first); + }).sorted((e1, e2) -> { + // Sort by throughput in reverse order + return Double.compare(e2.second, e1.second); + }).forEach(e -> { + if (trafficMarkedToOffload.doubleValue() < minimumThroughputToOffload + || atLeastOneBundleSelected.isFalse()) { + selectedBundlesCache.put(broker, e.first); + trafficMarkedToOffload.add(e.second); + atLeastOneBundleSelected.setTrue(); + } + }); + } else if (localData.getBundles().size() == 1) { + log.warn( + "HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. " + + "No Load Shedding will be done on this broker", + localData.getBundles().iterator().next(), broker); + } else { + log.warn("Broker {} is overloaded despite having no bundles", broker); + } + + }); + return selectedBundlesCache; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedderTest.java new file mode 100644 index 0000000..7b0df5d --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedderTest.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; + +import org.apache.pulsar.broker.BrokerData; +import org.apache.pulsar.broker.BundleData; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.TimeAverageMessageData; +import org.apache.pulsar.broker.loadbalance.LoadData; +import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; +import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; +import org.testng.annotations.Test; + +public class OverloadShedderTest { + + private final OverloadShedder os = new OverloadShedder(); + private final ServiceConfiguration conf; + + public OverloadShedderTest() { + conf = new ServiceConfiguration(); + conf.setLoadBalancerBrokerOverloadedThresholdPercentage(85); + } + + @Test + public void testNoBrokers() { + LoadData loadData = new LoadData(); + assertTrue(os.findBundlesForUnloading(loadData, conf).isEmpty()); + } + + @Test + public void testBrokersWithNoBundles() { + LoadData loadData = new LoadData(); + + LocalBrokerData broker1 = new LocalBrokerData(); + broker1.setBandwidthIn(new ResourceUsage(999, 1000)); + broker1.setBandwidthOut(new ResourceUsage(999, 1000)); + loadData.getBrokerData().put("broker-1", new BrokerData(broker1)); + + assertTrue(os.findBundlesForUnloading(loadData, conf).isEmpty()); + } + + @Test + public void testBrokerNotOverloaded() { + LoadData loadData = new LoadData(); + + LocalBrokerData broker1 = new LocalBrokerData(); + broker1.setBandwidthIn(new ResourceUsage(500, 1000)); + broker1.setBandwidthOut(new ResourceUsage(500, 1000)); + broker1.setBundles(Sets.newHashSet("bundle-1")); + + BundleData bundle1 = new BundleData(); + TimeAverageMessageData db1 = new TimeAverageMessageData(); + db1.setMsgThroughputIn(1000); + db1.setMsgThroughputOut(1000); + bundle1.setShortTermData(db1); + loadData.getBundleData().put("bundle-1", bundle1); + + loadData.getBrokerData().put("broker-1", new BrokerData(broker1)); + + assertTrue(os.findBundlesForUnloading(loadData, conf).isEmpty()); + } + + @Test + public void testBrokerWithSingleBundle() { + LoadData loadData = new LoadData(); + + LocalBrokerData broker1 = new LocalBrokerData(); + broker1.setBandwidthIn(new ResourceUsage(999, 1000)); + broker1.setBandwidthOut(new ResourceUsage(999, 1000)); + broker1.setBundles(Sets.newHashSet("bundle-1")); + + BundleData bundle1 = new BundleData(); + TimeAverageMessageData db1 = new TimeAverageMessageData(); + db1.setMsgThroughputIn(1000); + db1.setMsgThroughputOut(1000); + bundle1.setShortTermData(db1); + loadData.getBundleData().put("bundle-1", bundle1); + + loadData.getBrokerData().put("broker-1", new BrokerData(broker1)); + + assertTrue(os.findBundlesForUnloading(loadData, conf).isEmpty()); + } + + @Test + public void testBrokerWithMultipleBundles() { + int numBundles = 10; + LoadData loadData = new LoadData(); + + LocalBrokerData broker1 = new LocalBrokerData(); + broker1.setBandwidthIn(new ResourceUsage(999, 1000)); + broker1.setBandwidthOut(new ResourceUsage(999, 1000)); + + double brokerThroghput = 0; + + for (int i = 1; i <= numBundles; i++) { + broker1.getBundles().add("bundle-" + i); + + BundleData bundle = new BundleData(); + TimeAverageMessageData db = new TimeAverageMessageData(); + + double throughput = i * 1024 * 1024; + db.setMsgThroughputIn(throughput); + db.setMsgThroughputOut(throughput); + bundle.setShortTermData(db); + loadData.getBundleData().put("bundle-" + i, bundle); + + brokerThroghput += throughput; + } + + broker1.setMsgThroughputIn(brokerThroghput); + broker1.setMsgThroughputOut(brokerThroghput); + + loadData.getBrokerData().put("broker-1", new BrokerData(broker1)); + + Multimap<String, String> bundlesToUnload = os.findBundlesForUnloading(loadData, conf); + assertFalse(bundlesToUnload.isEmpty()); + assertEquals(bundlesToUnload.get("broker-1"), Lists.newArrayList("bundle-10", "bundle-9")); + } + + @Test + public void testFilterRecentlyUnloaded() { + int numBundles = 10; + LoadData loadData = new LoadData(); + + LocalBrokerData broker1 = new LocalBrokerData(); + broker1.setBandwidthIn(new ResourceUsage(999, 1000)); + broker1.setBandwidthOut(new ResourceUsage(999, 1000)); + + double brokerThroghput = 0; + + for (int i = 1; i <= numBundles; i++) { + broker1.getBundles().add("bundle-" + i); + + BundleData bundle = new BundleData(); + TimeAverageMessageData db = new TimeAverageMessageData(); + + double throughput = i * 1024 * 1024; + db.setMsgThroughputIn(throughput); + db.setMsgThroughputOut(throughput); + bundle.setShortTermData(db); + loadData.getBundleData().put("bundle-" + i, bundle); + + brokerThroghput += throughput; + } + + broker1.setMsgThroughputIn(brokerThroghput); + broker1.setMsgThroughputOut(brokerThroghput); + + loadData.getBrokerData().put("broker-1", new BrokerData(broker1)); + + loadData.getRecentlyUnloadedBundles().put("bundle-10", 1L); + loadData.getRecentlyUnloadedBundles().put("bundle-9", 1L); + + Multimap<String, String> bundlesToUnload = os.findBundlesForUnloading(loadData, conf); + assertFalse(bundlesToUnload.isEmpty()); + assertEquals(bundlesToUnload.get("broker-1"), Lists.newArrayList("bundle-8", "bundle-7")); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java index 4c754d2..129cf5d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java @@ -108,7 +108,7 @@ public class LocalBrokerData extends JSONWritable implements LoadManagerReport { /** * Using the system resource usage and bundle stats acquired from the Pulsar client, update this LocalBrokerData. - * + * * @param systemResourceUsage * System resource usage (cpu, memory, and direct memory). * @param bundleStats @@ -123,7 +123,7 @@ public class LocalBrokerData extends JSONWritable implements LoadManagerReport { /** * Using another LocalBrokerData, update this. - * + * * @param other * LocalBrokerData to update from. */ @@ -196,10 +196,20 @@ public class LocalBrokerData extends JSONWritable implements LoadManagerReport { } public double getMaxResourceUsage() { - return Math - .max(Math.max(Math.max(cpu.percentUsage(), memory.percentUsage()), - Math.max(directMemory.percentUsage(), bandwidthIn.percentUsage())), bandwidthOut.percentUsage()) - / 100; + return max(cpu.percentUsage(), memory.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(), + bandwidthOut.percentUsage()) / 100; + } + + private static float max(float...args) { + float max = Float.NEGATIVE_INFINITY; + + for (float d : args) { + if (d > max) { + max = d; + } + } + + return max; } public String getLoadReportType() { @@ -392,7 +402,7 @@ public class LocalBrokerData extends JSONWritable implements LoadManagerReport { public String getPulsarServiceUrlTls() { return pulsarServiceUrlTls; } - + @Override public boolean isPersistentTopicsEnabled() { return persistentTopicsEnabled; -- To stop receiving notification emails like this one, please contact mme...@apache.org.