This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f6cc450 Load manager should offload multiple bundles when overloaded
(#1488)
f6cc450 is described below
commit f6cc45037c215ef1085b4a0bfc7e3a3fd7ff9f1c
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Apr 4 08:53:37 2018 -0700
Load manager should offload multiple bundles when overloaded (#1488)
* Load manager should offload multiple bundles when overloaded
* Added test
* Fixed import
* Fixed logger import
---
conf/broker.conf | 9 +-
conf/standalone.conf | 4 +-
.../apache/pulsar/broker/ServiceConfiguration.java | 4 +-
.../broker/loadbalance/LoadSheddingStrategy.java | 6 +-
.../broker/loadbalance/impl/DeviationShedder.java | 10 +-
.../loadbalance/impl/ModularLoadManagerImpl.java | 47 +++---
.../broker/loadbalance/impl/OverloadShedder.java | 124 ++++++++------
.../loadbalance/impl/OverloadShedderTest.java | 182 +++++++++++++++++++++
.../data/loadbalancer/LocalBrokerData.java | 24 ++-
9 files changed, 313 insertions(+), 97 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index cec3c0f..d78fbbc 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -189,7 +189,7 @@ maxConsumersPerSubscription=0
proxyRoles=
# If this flag is set then the broker authenticates the original Auth data
-# else it just accepts the originalPrincipal and authorizes it (if required).
+# else it just accepts the originalPrincipal and authorizes it (if required).
authenticateOriginalAuthData=false
# Enable TLS
@@ -356,7 +356,7 @@ loadBalancerHostUsageCheckIntervalMinutes=1
# Load shedding interval. Broker periodically checks whether some traffic
should be offload from
# some over-loaded broker to other under-loaded brokers
-loadBalancerSheddingIntervalMinutes=5
+loadBalancerSheddingIntervalMinutes=1
# Prevent the same topics to be shed and moved to other broker more that once
within this timeframe
loadBalancerSheddingGracePeriodMinutes=30
@@ -388,11 +388,11 @@ loadBalancerNamespaceBundleMaxBandwidthMbytes=100
# maximum number of bundles in a namespace
loadBalancerNamespaceMaximumBundles=128
-# Override the auto-detection of the network interfaces max speed.
+# Override the auto-detection of the network interfaces max speed.
# This option is useful in some environments (eg: EC2 VMs) where the max speed
# reported by Linux is not reflecting the real bandwidth available to the
broker.
# Since the network usage is employed by the load manager to decide when a
broker
-# is overloaded, it is important to make sure the info is correct or override
it
+# is overloaded, it is important to make sure the info is correct or override
it
# with the right value here. The configured value can be a double (eg: 0.8)
and that
# can be used to trigger load-shedding even before hitting on NIC limits.
loadBalancerOverrideBrokerNicSpeedGbps=
@@ -452,4 +452,3 @@ exposeTopicLevelMetricsInPrometheus=true
# Enable Functions Worker Service in Broker
functionsWorkerEnabled=false
-
diff --git a/conf/standalone.conf b/conf/standalone.conf
index dc6f1b7..b593f00 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -172,7 +172,7 @@ maxConsumersPerSubscription=0
proxyRoles=
# If this flag is set then the broker authenticates the original Auth data
-# else it just accepts the originalPrincipal and authorizes it (if required).
+# else it just accepts the originalPrincipal and authorizes it (if required).
authenticateOriginalAuthData=false
# Enable authentication
@@ -318,7 +318,7 @@ loadBalancerHostUsageCheckIntervalMinutes=1
# Load shedding interval. Broker periodically checks whether some traffic
should be offload from
# some over-loaded broker to other under-loaded brokers
-loadBalancerSheddingIntervalMinutes=5
+loadBalancerSheddingIntervalMinutes=1
# Prevent the same topics to be shed and moved to other broker more that once
within this timeframe
loadBalancerSheddingGracePeriodMinutes=30
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2e09ba9..dd9cabd 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -365,7 +365,7 @@ public class ServiceConfiguration implements
PulsarConfiguration {
private boolean loadBalancerSheddingEnabled = true;
// Load shedding interval. Broker periodically checks whether some traffic
should be offload from some over-loaded
// broker to other under-loaded brokers
- private int loadBalancerSheddingIntervalMinutes = 5;
+ private int loadBalancerSheddingIntervalMinutes = 1;
// Prevent the same topics to be shed and moved to other broker more that
// once within this timeframe
private long loadBalancerSheddingGracePeriodMinutes = 30;
@@ -375,7 +375,9 @@ public class ServiceConfiguration implements
PulsarConfiguration {
// Usage threshold to allocate max number of topics to broker
@FieldContext(dynamic = true)
private int loadBalancerBrokerMaxTopics = 50000;
+
// Usage threshold to determine a broker as over-loaded
+ @FieldContext(dynamic = true)
private int loadBalancerBrokerOverloadedThresholdPercentage = 85;
// Interval to flush dynamic resource quota to ZooKeeper
private int loadBalancerResourceQuotaUpdateIntervalMinutes = 15;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java
index c5e8f97..ef2e546 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance;
-import java.util.Map;
+import com.google.common.collect.Multimap;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -29,12 +29,12 @@ public interface LoadSheddingStrategy {
/**
* Recommend that all of the returned bundles be unloaded.
- *
+ *
* @param loadData
* The load data to used to make the unloading decision.
* @param conf
* The service configuration.
* @return A map from all selected bundles to the brokers on which they
reside.
*/
- Map<String, String> findBundlesForUnloading(LoadData loadData,
ServiceConfiguration conf);
+ Multimap<String, String> findBundlesForUnloading(LoadData loadData,
ServiceConfiguration conf);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/DeviationShedder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/DeviationShedder.java
index cb9d6d0..b096018 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/DeviationShedder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/DeviationShedder.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;
-import java.util.HashMap;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
import java.util.Map;
import java.util.TreeSet;
@@ -61,7 +63,7 @@ public abstract class DeviationShedder implements
LoadSheddingStrategy {
/**
* Recommend that all of the returned bundles be unloaded based on
observing excessive standard deviations according
* to some metric.
- *
+ *
* @param loadData
* The load data to used to make the unloading decision.
* @param conf
@@ -69,8 +71,8 @@ public abstract class DeviationShedder implements
LoadSheddingStrategy {
* @return A map from all selected bundles to the brokers on which they
reside.
*/
@Override
- public Map<String, String> findBundlesForUnloading(final LoadData
loadData, final ServiceConfiguration conf) {
- final Map<String, String> result = new HashMap<>();
+ public Multimap<String, String> findBundlesForUnloading(final LoadData
loadData, final ServiceConfiguration conf) {
+ final Multimap<String, String> result = ArrayListMultimap.create();
bundleTreeSetCache.clear();
metricTreeSetCache.clear();
double sum = 0;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 4b72a10..23f651e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -22,6 +22,11 @@ import static
org.apache.pulsar.broker.admin.AdminResource.jsonMapper;
import static
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.web.PulsarWebResource.path;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -56,6 +61,7 @@ import
org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.broker.loadbalance.ModularLoadManager;
import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
import
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
@@ -79,10 +85,6 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Maps;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-
public class ModularLoadManagerImpl implements ModularLoadManager,
ZooKeeperCacheListener<LocalBrokerData> {
private static final Logger log =
LoggerFactory.getLogger(ModularLoadManagerImpl.class);
@@ -192,7 +194,7 @@ public class ModularLoadManagerImpl implements
ModularLoadManager, ZooKeeperCach
filterPipeline = new ArrayList<>();
loadData = new LoadData();
loadSheddingPipeline = new ArrayList<>();
- loadSheddingPipeline.add(new OverloadShedder(conf));
+ loadSheddingPipeline.add(new OverloadShedder());
preallocatedBundleToBroker = new ConcurrentHashMap<>();
scheduler = Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("pulsar-modular-load-manager"));
this.brokerToFailureDomainMap = Maps.newHashMap();
@@ -583,28 +585,27 @@ public class ModularLoadManagerImpl implements
ModularLoadManager, ZooKeeperCach
-
TimeUnit.MINUTES.toMillis(conf.getLoadBalancerSheddingGracePeriodMinutes());
final Map<String, Long> recentlyUnloadedBundles =
loadData.getRecentlyUnloadedBundles();
recentlyUnloadedBundles.keySet().removeIf(e ->
recentlyUnloadedBundles.get(e) < timeout);
+
for (LoadSheddingStrategy strategy : loadSheddingPipeline) {
- final Map<String, String> bundlesToUnload =
strategy.findBundlesForUnloading(loadData, conf);
- if (bundlesToUnload != null && !bundlesToUnload.isEmpty()) {
- try {
- for (Map.Entry<String, String> entry :
bundlesToUnload.entrySet()) {
- final String broker = entry.getKey();
- final String bundle = entry.getValue();
-
- final String namespaceName =
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
- final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundle);
- if(!shouldAntiAffinityNamespaceUnload(namespaceName,
bundleRange, broker)) {
- continue;
- }
- log.info("Unloading bundle: {} from broker {}",
bundle, broker);
+ final Multimap<String, String> bundlesToUnload =
strategy.findBundlesForUnloading(loadData, conf);
+
+ bundlesToUnload.asMap().forEach((broker, bundles) -> {
+ bundles.forEach(bundle -> {
+ final String namespaceName =
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+ final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundle);
+ if (!shouldAntiAffinityNamespaceUnload(namespaceName,
bundleRange, broker)) {
+ return;
+ }
+
+ log.info("[Overload shedder] Unloading bundle: {} from
broker {}", bundle, broker);
+ try {
pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName,
bundleRange);
loadData.getRecentlyUnloadedBundles().put(bundle,
System.currentTimeMillis());
+ } catch (PulsarServerException | PulsarAdminException e) {
+ log.warn("Error when trying to perform load shedding
on {} for broker {}", bundle, broker, e);
}
- } catch (Exception e) {
- log.warn("Error when trying to perform load shedding", e);
- }
- return;
- }
+ });
+ });
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
index ed71def..972bfc9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
@@ -18,10 +18,14 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;
-import java.util.HashMap;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
import java.util.Map;
-import org.apache.pulsar.broker.BrokerData;
+import org.apache.bookkeeper.mledger.util.Pair;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableDouble;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TimeAverageMessageData;
@@ -40,21 +44,15 @@ import org.slf4j.LoggerFactory;
* rate that has not been recently unloaded.
*/
public class OverloadShedder implements LoadSheddingStrategy {
+
private static final Logger log =
LoggerFactory.getLogger(OverloadShedder.class);
- private Map<String, String> selectedBundlesCache;
- /**
- * Create an OverloadShedder with the service configuration.
- *
- * @param conf
- * Service configuration to create from.
- */
- public OverloadShedder(final ServiceConfiguration conf) {
- selectedBundlesCache = new HashMap<>();
- }
+ private final Multimap<String, String> selectedBundlesCache =
ArrayListMultimap.create();
+
+ private final static double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05;
/**
- * Attempt to shed one bundle off every broker which is overloaded.
+ * Attempt to shed some bundles off every broker which is overloaded.
*
* @param loadData
* The load data to used to make the unloading decision.
@@ -62,51 +60,73 @@ public class OverloadShedder implements
LoadSheddingStrategy {
* The service configuration.
* @return A map from bundles to unload to the brokers on which they are
loaded.
*/
- public Map<String, String> findBundlesForUnloading(final LoadData
loadData, final ServiceConfiguration conf) {
+ public Multimap<String, String> findBundlesForUnloading(final LoadData
loadData, final ServiceConfiguration conf) {
selectedBundlesCache.clear();
final double overloadThreshold =
conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
final Map<String, Long> recentlyUnloadedBundles =
loadData.getRecentlyUnloadedBundles();
- for (final Map.Entry<String, BrokerData> entry :
loadData.getBrokerData().entrySet()) {
- final String broker = entry.getKey();
- final BrokerData brokerData = entry.getValue();
+
+ // Check every broker and select
+ loadData.getBrokerData().forEach((broker, brokerData) -> {
+
final LocalBrokerData localData = brokerData.getLocalData();
- final double maxUsage = localData.getMaxResourceUsage();
- if (maxUsage >= overloadThreshold) {
- log.info("Attempting to shed load on {}, which has max
resource usage {}%", broker, maxUsage);
- double maxMessageRate = Double.NEGATIVE_INFINITY;
- String mostTaxingBundle = null;
- if (localData.getBundles().size() > 1) {
- for (final String bundle : localData.getBundles()) {
- final BundleData bundleData =
loadData.getBundleData().get(bundle);
- if (bundleData == null) {
- continue;
- }
-
- // Consider short-term message rate to address system
resource burden
- final TimeAverageMessageData shortTermData =
bundleData.getShortTermData();
- final double messageRate =
shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut();
- // The burden of checking the timestamp is for the
load manager, not the strategy.
- if (messageRate > maxMessageRate &&
!recentlyUnloadedBundles.containsKey(bundle)) {
- maxMessageRate = messageRate;
- mostTaxingBundle = bundle;
- }
- }
- if (mostTaxingBundle != null) {
- selectedBundlesCache.put(broker, mostTaxingBundle);
- } else {
- log.warn("Load shedding could not be performed on
broker {} because all bundles assigned to it "
- + "have recently been unloaded");
- }
- } else if (localData.getBundles().size() == 1) {
- log.warn(
- "HIGH USAGE WARNING : Sole namespace bundle {} is
overloading broker {}. "
- + "No Load Shedding will be done on this
broker",
- localData.getBundles().iterator().next(), broker);
- } else {
- log.warn("Broker {} is overloaded despite having no
bundles", broker);
+ final double currentUsage = localData.getMaxResourceUsage();
+ if (currentUsage < overloadThreshold) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Broker is not overloaded, ignoring at this
point", broker);
}
+ return;
}
- }
+
+ // We want to offload enough traffic such that this broker will go
below the overload threshold
+ // Also, add a small margin so that this broker won't be very
close to the threshold edge.
+ double percentOfTrafficToOffload = currentUsage -
overloadThreshold + ADDITIONAL_THRESHOLD_PERCENT_MARGIN;
+ double brokerCurrentThroughput = localData.getMsgThroughputIn() +
localData.getMsgThroughputOut();
+
+ double minimumThroughputToOffload = brokerCurrentThroughput *
percentOfTrafficToOffload;
+
+ log.info(
+ "Attempting to shed load on {}, which has max resource
usage above threshold {}% > {}% -- Offloading at least {} MByte/s of traffic",
+ broker, currentUsage, overloadThreshold,
minimumThroughputToOffload / 1024 / 1024);
+
+ MutableDouble trafficMarkedToOffload = new MutableDouble(0);
+ MutableBoolean atLeastOneBundleSelected = new
MutableBoolean(false);
+
+ if (localData.getBundles().size() > 1) {
+ // Sort bundles by throughput, then pick the biggest N which
combined make up for at least the minimum throughput to offload
+
+ loadData.getBundleData().entrySet().stream().map((e) -> {
+ // Map to throughput value
+ // Consider short-term byte rate to address system
resource burden
+ String bundle = e.getKey();
+ BundleData bundleData = e.getValue();
+ TimeAverageMessageData shortTermData =
bundleData.getShortTermData();
+ double throughput = shortTermData.getMsgThroughputIn() +
shortTermData.getMsgThroughputOut();
+ return Pair.create(bundle, throughput);
+ }).filter(e -> {
+ // Only consider bundles that were not already unloaded
recently
+ return !recentlyUnloadedBundles.containsKey(e.first);
+ }).sorted((e1, e2) -> {
+ // Sort by throughput in reverse order
+ return Double.compare(e2.second, e1.second);
+ }).forEach(e -> {
+ if (trafficMarkedToOffload.doubleValue() <
minimumThroughputToOffload
+ || atLeastOneBundleSelected.isFalse()) {
+ selectedBundlesCache.put(broker, e.first);
+ trafficMarkedToOffload.add(e.second);
+ atLeastOneBundleSelected.setTrue();
+ }
+ });
+ } else if (localData.getBundles().size() == 1) {
+ log.warn(
+ "HIGH USAGE WARNING : Sole namespace bundle {} is
overloading broker {}. "
+ + "No Load Shedding will be done on this
broker",
+ localData.getBundles().iterator().next(), broker);
+ } else {
+ log.warn("Broker {} is overloaded despite having no bundles",
broker);
+ }
+
+ });
+
return selectedBundlesCache;
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedderTest.java
new file mode 100644
index 0000000..7b0df5d
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedderTest.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+import org.apache.pulsar.broker.BrokerData;
+import org.apache.pulsar.broker.BundleData;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TimeAverageMessageData;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.testng.annotations.Test;
+
+public class OverloadShedderTest {
+
+ private final OverloadShedder os = new OverloadShedder();
+ private final ServiceConfiguration conf;
+
+ public OverloadShedderTest() {
+ conf = new ServiceConfiguration();
+ conf.setLoadBalancerBrokerOverloadedThresholdPercentage(85);
+ }
+
+ @Test
+ public void testNoBrokers() {
+ LoadData loadData = new LoadData();
+ assertTrue(os.findBundlesForUnloading(loadData, conf).isEmpty());
+ }
+
+ @Test
+ public void testBrokersWithNoBundles() {
+ LoadData loadData = new LoadData();
+
+ LocalBrokerData broker1 = new LocalBrokerData();
+ broker1.setBandwidthIn(new ResourceUsage(999, 1000));
+ broker1.setBandwidthOut(new ResourceUsage(999, 1000));
+ loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+
+ assertTrue(os.findBundlesForUnloading(loadData, conf).isEmpty());
+ }
+
+ @Test
+ public void testBrokerNotOverloaded() {
+ LoadData loadData = new LoadData();
+
+ LocalBrokerData broker1 = new LocalBrokerData();
+ broker1.setBandwidthIn(new ResourceUsage(500, 1000));
+ broker1.setBandwidthOut(new ResourceUsage(500, 1000));
+ broker1.setBundles(Sets.newHashSet("bundle-1"));
+
+ BundleData bundle1 = new BundleData();
+ TimeAverageMessageData db1 = new TimeAverageMessageData();
+ db1.setMsgThroughputIn(1000);
+ db1.setMsgThroughputOut(1000);
+ bundle1.setShortTermData(db1);
+ loadData.getBundleData().put("bundle-1", bundle1);
+
+ loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+
+ assertTrue(os.findBundlesForUnloading(loadData, conf).isEmpty());
+ }
+
+ @Test
+ public void testBrokerWithSingleBundle() {
+ LoadData loadData = new LoadData();
+
+ LocalBrokerData broker1 = new LocalBrokerData();
+ broker1.setBandwidthIn(new ResourceUsage(999, 1000));
+ broker1.setBandwidthOut(new ResourceUsage(999, 1000));
+ broker1.setBundles(Sets.newHashSet("bundle-1"));
+
+ BundleData bundle1 = new BundleData();
+ TimeAverageMessageData db1 = new TimeAverageMessageData();
+ db1.setMsgThroughputIn(1000);
+ db1.setMsgThroughputOut(1000);
+ bundle1.setShortTermData(db1);
+ loadData.getBundleData().put("bundle-1", bundle1);
+
+ loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+
+ assertTrue(os.findBundlesForUnloading(loadData, conf).isEmpty());
+ }
+
+ @Test
+ public void testBrokerWithMultipleBundles() {
+ int numBundles = 10;
+ LoadData loadData = new LoadData();
+
+ LocalBrokerData broker1 = new LocalBrokerData();
+ broker1.setBandwidthIn(new ResourceUsage(999, 1000));
+ broker1.setBandwidthOut(new ResourceUsage(999, 1000));
+
+ double brokerThroghput = 0;
+
+ for (int i = 1; i <= numBundles; i++) {
+ broker1.getBundles().add("bundle-" + i);
+
+ BundleData bundle = new BundleData();
+ TimeAverageMessageData db = new TimeAverageMessageData();
+
+ double throughput = i * 1024 * 1024;
+ db.setMsgThroughputIn(throughput);
+ db.setMsgThroughputOut(throughput);
+ bundle.setShortTermData(db);
+ loadData.getBundleData().put("bundle-" + i, bundle);
+
+ brokerThroghput += throughput;
+ }
+
+ broker1.setMsgThroughputIn(brokerThroghput);
+ broker1.setMsgThroughputOut(brokerThroghput);
+
+ loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+
+ Multimap<String, String> bundlesToUnload =
os.findBundlesForUnloading(loadData, conf);
+ assertFalse(bundlesToUnload.isEmpty());
+ assertEquals(bundlesToUnload.get("broker-1"),
Lists.newArrayList("bundle-10", "bundle-9"));
+ }
+
+ @Test
+ public void testFilterRecentlyUnloaded() {
+ int numBundles = 10;
+ LoadData loadData = new LoadData();
+
+ LocalBrokerData broker1 = new LocalBrokerData();
+ broker1.setBandwidthIn(new ResourceUsage(999, 1000));
+ broker1.setBandwidthOut(new ResourceUsage(999, 1000));
+
+ double brokerThroghput = 0;
+
+ for (int i = 1; i <= numBundles; i++) {
+ broker1.getBundles().add("bundle-" + i);
+
+ BundleData bundle = new BundleData();
+ TimeAverageMessageData db = new TimeAverageMessageData();
+
+ double throughput = i * 1024 * 1024;
+ db.setMsgThroughputIn(throughput);
+ db.setMsgThroughputOut(throughput);
+ bundle.setShortTermData(db);
+ loadData.getBundleData().put("bundle-" + i, bundle);
+
+ brokerThroghput += throughput;
+ }
+
+ broker1.setMsgThroughputIn(brokerThroghput);
+ broker1.setMsgThroughputOut(brokerThroghput);
+
+ loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+
+ loadData.getRecentlyUnloadedBundles().put("bundle-10", 1L);
+ loadData.getRecentlyUnloadedBundles().put("bundle-9", 1L);
+
+ Multimap<String, String> bundlesToUnload =
os.findBundlesForUnloading(loadData, conf);
+ assertFalse(bundlesToUnload.isEmpty());
+ assertEquals(bundlesToUnload.get("broker-1"),
Lists.newArrayList("bundle-8", "bundle-7"));
+ }
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
index 4c754d2..129cf5d 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
@@ -108,7 +108,7 @@ public class LocalBrokerData extends JSONWritable
implements LoadManagerReport {
/**
* Using the system resource usage and bundle stats acquired from the
Pulsar client, update this LocalBrokerData.
- *
+ *
* @param systemResourceUsage
* System resource usage (cpu, memory, and direct memory).
* @param bundleStats
@@ -123,7 +123,7 @@ public class LocalBrokerData extends JSONWritable
implements LoadManagerReport {
/**
* Using another LocalBrokerData, update this.
- *
+ *
* @param other
* LocalBrokerData to update from.
*/
@@ -196,10 +196,20 @@ public class LocalBrokerData extends JSONWritable
implements LoadManagerReport {
}
public double getMaxResourceUsage() {
- return Math
- .max(Math.max(Math.max(cpu.percentUsage(),
memory.percentUsage()),
- Math.max(directMemory.percentUsage(),
bandwidthIn.percentUsage())), bandwidthOut.percentUsage())
- / 100;
+ return max(cpu.percentUsage(), memory.percentUsage(),
directMemory.percentUsage(), bandwidthIn.percentUsage(),
+ bandwidthOut.percentUsage()) / 100;
+ }
+
+ private static float max(float...args) {
+ float max = Float.NEGATIVE_INFINITY;
+
+ for (float d : args) {
+ if (d > max) {
+ max = d;
+ }
+ }
+
+ return max;
}
public String getLoadReportType() {
@@ -392,7 +402,7 @@ public class LocalBrokerData extends JSONWritable
implements LoadManagerReport {
public String getPulsarServiceUrlTls() {
return pulsarServiceUrlTls;
}
-
+
@Override
public boolean isPersistentTopicsEnabled() {
return persistentTopicsEnabled;
--
To stop receiving notification emails like this one, please contact
[email protected].