merlimat closed pull request #1488: Load manager should offload multiple
bundles when overloaded
URL: https://github.com/apache/incubator-pulsar/pull/1488
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/conf/broker.conf b/conf/broker.conf
index cec3c0fc9..d78fbbc02 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -189,7 +189,7 @@ maxConsumersPerSubscription=0
proxyRoles=
# If this flag is set then the broker authenticates the original Auth data
-# else it just accepts the originalPrincipal and authorizes it (if required).
+# else it just accepts the originalPrincipal and authorizes it (if required).
authenticateOriginalAuthData=false
# Enable TLS
@@ -356,7 +356,7 @@ loadBalancerHostUsageCheckIntervalMinutes=1
# Load shedding interval. Broker periodically checks whether some traffic
should be offload from
# some over-loaded broker to other under-loaded brokers
-loadBalancerSheddingIntervalMinutes=5
+loadBalancerSheddingIntervalMinutes=1
# Prevent the same topics to be shed and moved to other broker more that once
within this timeframe
loadBalancerSheddingGracePeriodMinutes=30
@@ -388,11 +388,11 @@ loadBalancerNamespaceBundleMaxBandwidthMbytes=100
# maximum number of bundles in a namespace
loadBalancerNamespaceMaximumBundles=128
-# Override the auto-detection of the network interfaces max speed.
+# Override the auto-detection of the network interfaces max speed.
# This option is useful in some environments (eg: EC2 VMs) where the max speed
# reported by Linux is not reflecting the real bandwidth available to the
broker.
# Since the network usage is employed by the load manager to decide when a
broker
-# is overloaded, it is important to make sure the info is correct or override
it
+# is overloaded, it is important to make sure the info is correct or override
it
# with the right value here. The configured value can be a double (eg: 0.8)
and that
# can be used to trigger load-shedding even before hitting on NIC limits.
loadBalancerOverrideBrokerNicSpeedGbps=
@@ -452,4 +452,3 @@ exposeTopicLevelMetricsInPrometheus=true
# Enable Functions Worker Service in Broker
functionsWorkerEnabled=false
-
diff --git a/conf/standalone.conf b/conf/standalone.conf
index dc6f1b743..b593f00d6 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -172,7 +172,7 @@ maxConsumersPerSubscription=0
proxyRoles=
# If this flag is set then the broker authenticates the original Auth data
-# else it just accepts the originalPrincipal and authorizes it (if required).
+# else it just accepts the originalPrincipal and authorizes it (if required).
authenticateOriginalAuthData=false
# Enable authentication
@@ -318,7 +318,7 @@ loadBalancerHostUsageCheckIntervalMinutes=1
# Load shedding interval. Broker periodically checks whether some traffic
should be offload from
# some over-loaded broker to other under-loaded brokers
-loadBalancerSheddingIntervalMinutes=5
+loadBalancerSheddingIntervalMinutes=1
# Prevent the same topics to be shed and moved to other broker more that once
within this timeframe
loadBalancerSheddingGracePeriodMinutes=30
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2e09ba9ad..dd9cabdc3 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -365,7 +365,7 @@
private boolean loadBalancerSheddingEnabled = true;
// Load shedding interval. Broker periodically checks whether some traffic
should be offload from some over-loaded
// broker to other under-loaded brokers
- private int loadBalancerSheddingIntervalMinutes = 5;
+ private int loadBalancerSheddingIntervalMinutes = 1;
// Prevent the same topics to be shed and moved to other broker more that
// once within this timeframe
private long loadBalancerSheddingGracePeriodMinutes = 30;
@@ -375,7 +375,9 @@
// Usage threshold to allocate max number of topics to broker
@FieldContext(dynamic = true)
private int loadBalancerBrokerMaxTopics = 50000;
+
// Usage threshold to determine a broker as over-loaded
+ @FieldContext(dynamic = true)
private int loadBalancerBrokerOverloadedThresholdPercentage = 85;
// Interval to flush dynamic resource quota to ZooKeeper
private int loadBalancerResourceQuotaUpdateIntervalMinutes = 15;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java
index c5e8f9745..ef2e546fa 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance;
-import java.util.Map;
+import com.google.common.collect.Multimap;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -29,12 +29,12 @@
/**
* Recommend that all of the returned bundles be unloaded.
- *
+ *
* @param loadData
* The load data to used to make the unloading decision.
* @param conf
* The service configuration.
* @return A map from all selected bundles to the brokers on which they
reside.
*/
- Map<String, String> findBundlesForUnloading(LoadData loadData,
ServiceConfiguration conf);
+ Multimap<String, String> findBundlesForUnloading(LoadData loadData,
ServiceConfiguration conf);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/DeviationShedder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/DeviationShedder.java
index cb9d6d038..b096018bd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/DeviationShedder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/DeviationShedder.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;
-import java.util.HashMap;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
import java.util.Map;
import java.util.TreeSet;
@@ -61,7 +63,7 @@ public DeviationShedder() {
/**
* Recommend that all of the returned bundles be unloaded based on
observing excessive standard deviations according
* to some metric.
- *
+ *
* @param loadData
* The load data to used to make the unloading decision.
* @param conf
@@ -69,8 +71,8 @@ public DeviationShedder() {
* @return A map from all selected bundles to the brokers on which they
reside.
*/
@Override
- public Map<String, String> findBundlesForUnloading(final LoadData
loadData, final ServiceConfiguration conf) {
- final Map<String, String> result = new HashMap<>();
+ public Multimap<String, String> findBundlesForUnloading(final LoadData
loadData, final ServiceConfiguration conf) {
+ final Multimap<String, String> result = ArrayListMultimap.create();
bundleTreeSetCache.clear();
metricTreeSetCache.clear();
double sum = 0;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 4b72a1070..23f651e71 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -22,6 +22,11 @@
import static
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.web.PulsarWebResource.path;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -56,6 +61,7 @@
import org.apache.pulsar.broker.loadbalance.ModularLoadManager;
import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
import
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
@@ -79,10 +85,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Maps;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-
public class ModularLoadManagerImpl implements ModularLoadManager,
ZooKeeperCacheListener<LocalBrokerData> {
private static final Logger log =
LoggerFactory.getLogger(ModularLoadManagerImpl.class);
@@ -192,7 +194,7 @@ public ModularLoadManagerImpl() {
filterPipeline = new ArrayList<>();
loadData = new LoadData();
loadSheddingPipeline = new ArrayList<>();
- loadSheddingPipeline.add(new OverloadShedder(conf));
+ loadSheddingPipeline.add(new OverloadShedder());
preallocatedBundleToBroker = new ConcurrentHashMap<>();
scheduler = Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("pulsar-modular-load-manager"));
this.brokerToFailureDomainMap = Maps.newHashMap();
@@ -583,28 +585,27 @@ public synchronized void doLoadShedding() {
-
TimeUnit.MINUTES.toMillis(conf.getLoadBalancerSheddingGracePeriodMinutes());
final Map<String, Long> recentlyUnloadedBundles =
loadData.getRecentlyUnloadedBundles();
recentlyUnloadedBundles.keySet().removeIf(e ->
recentlyUnloadedBundles.get(e) < timeout);
+
for (LoadSheddingStrategy strategy : loadSheddingPipeline) {
- final Map<String, String> bundlesToUnload =
strategy.findBundlesForUnloading(loadData, conf);
- if (bundlesToUnload != null && !bundlesToUnload.isEmpty()) {
- try {
- for (Map.Entry<String, String> entry :
bundlesToUnload.entrySet()) {
- final String broker = entry.getKey();
- final String bundle = entry.getValue();
-
- final String namespaceName =
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
- final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundle);
- if(!shouldAntiAffinityNamespaceUnload(namespaceName,
bundleRange, broker)) {
- continue;
- }
- log.info("Unloading bundle: {} from broker {}",
bundle, broker);
+ final Multimap<String, String> bundlesToUnload =
strategy.findBundlesForUnloading(loadData, conf);
+
+ bundlesToUnload.asMap().forEach((broker, bundles) -> {
+ bundles.forEach(bundle -> {
+ final String namespaceName =
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+ final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundle);
+ if (!shouldAntiAffinityNamespaceUnload(namespaceName,
bundleRange, broker)) {
+ return;
+ }
+
+ log.info("[Overload shedder] Unloading bundle: {} from
broker {}", bundle, broker);
+ try {
pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName,
bundleRange);
loadData.getRecentlyUnloadedBundles().put(bundle,
System.currentTimeMillis());
+ } catch (PulsarServerException | PulsarAdminException e) {
+ log.warn("Error when trying to perform load shedding
on {} for broker {}", bundle, broker, e);
}
- } catch (Exception e) {
- log.warn("Error when trying to perform load shedding", e);
- }
- return;
- }
+ });
+ });
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
index ed71defd4..972bfc974 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
@@ -18,10 +18,14 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;
-import java.util.HashMap;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
import java.util.Map;
-import org.apache.pulsar.broker.BrokerData;
+import org.apache.bookkeeper.mledger.util.Pair;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableDouble;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TimeAverageMessageData;
@@ -40,21 +44,15 @@
* rate that has not been recently unloaded.
*/
public class OverloadShedder implements LoadSheddingStrategy {
+
private static final Logger log =
LoggerFactory.getLogger(OverloadShedder.class);
- private Map<String, String> selectedBundlesCache;
- /**
- * Create an OverloadShedder with the service configuration.
- *
- * @param conf
- * Service configuration to create from.
- */
- public OverloadShedder(final ServiceConfiguration conf) {
- selectedBundlesCache = new HashMap<>();
- }
+ private final Multimap<String, String> selectedBundlesCache =
ArrayListMultimap.create();
+
+ private final static double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05;
/**
- * Attempt to shed one bundle off every broker which is overloaded.
+ * Attempt to shed some bundles off every broker which is overloaded.
*
* @param loadData
* The load data to used to make the unloading decision.
@@ -62,51 +60,73 @@ public OverloadShedder(final ServiceConfiguration conf) {
* The service configuration.
* @return A map from bundles to unload to the brokers on which they are
loaded.
*/
- public Map<String, String> findBundlesForUnloading(final LoadData
loadData, final ServiceConfiguration conf) {
+ public Multimap<String, String> findBundlesForUnloading(final LoadData
loadData, final ServiceConfiguration conf) {
selectedBundlesCache.clear();
final double overloadThreshold =
conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
final Map<String, Long> recentlyUnloadedBundles =
loadData.getRecentlyUnloadedBundles();
- for (final Map.Entry<String, BrokerData> entry :
loadData.getBrokerData().entrySet()) {
- final String broker = entry.getKey();
- final BrokerData brokerData = entry.getValue();
+
+ // Check every broker and select
+ loadData.getBrokerData().forEach((broker, brokerData) -> {
+
final LocalBrokerData localData = brokerData.getLocalData();
- final double maxUsage = localData.getMaxResourceUsage();
- if (maxUsage >= overloadThreshold) {
- log.info("Attempting to shed load on {}, which has max
resource usage {}%", broker, maxUsage);
- double maxMessageRate = Double.NEGATIVE_INFINITY;
- String mostTaxingBundle = null;
- if (localData.getBundles().size() > 1) {
- for (final String bundle : localData.getBundles()) {
- final BundleData bundleData =
loadData.getBundleData().get(bundle);
- if (bundleData == null) {
- continue;
- }
-
- // Consider short-term message rate to address system
resource burden
- final TimeAverageMessageData shortTermData =
bundleData.getShortTermData();
- final double messageRate =
shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut();
- // The burden of checking the timestamp is for the
load manager, not the strategy.
- if (messageRate > maxMessageRate &&
!recentlyUnloadedBundles.containsKey(bundle)) {
- maxMessageRate = messageRate;
- mostTaxingBundle = bundle;
- }
- }
- if (mostTaxingBundle != null) {
- selectedBundlesCache.put(broker, mostTaxingBundle);
- } else {
- log.warn("Load shedding could not be performed on
broker {} because all bundles assigned to it "
- + "have recently been unloaded");
- }
- } else if (localData.getBundles().size() == 1) {
- log.warn(
- "HIGH USAGE WARNING : Sole namespace bundle {} is
overloading broker {}. "
- + "No Load Shedding will be done on this
broker",
- localData.getBundles().iterator().next(), broker);
- } else {
- log.warn("Broker {} is overloaded despite having no
bundles", broker);
+ final double currentUsage = localData.getMaxResourceUsage();
+ if (currentUsage < overloadThreshold) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Broker is not overloaded, ignoring at this
point", broker);
}
+ return;
}
- }
+
+ // We want to offload enough traffic such that this broker will go
below the overload threshold
+ // Also, add a small margin so that this broker won't be very
close to the threshold edge.
+ double percentOfTrafficToOffload = currentUsage -
overloadThreshold + ADDITIONAL_THRESHOLD_PERCENT_MARGIN;
+ double brokerCurrentThroughput = localData.getMsgThroughputIn() +
localData.getMsgThroughputOut();
+
+ double minimumThroughputToOffload = brokerCurrentThroughput *
percentOfTrafficToOffload;
+
+ log.info(
+ "Attempting to shed load on {}, which has max resource
usage above threshold {}% > {}% -- Offloading at least {} MByte/s of traffic",
+ broker, currentUsage, overloadThreshold,
minimumThroughputToOffload / 1024 / 1024);
+
+ MutableDouble trafficMarkedToOffload = new MutableDouble(0);
+ MutableBoolean atLeastOneBundleSelected = new
MutableBoolean(false);
+
+ if (localData.getBundles().size() > 1) {
+ // Sort bundles by throughput, then pick the biggest N which
combined make up for at least the minimum throughput to offload
+
+ loadData.getBundleData().entrySet().stream().map((e) -> {
+ // Map to throughput value
+ // Consider short-term byte rate to address system
resource burden
+ String bundle = e.getKey();
+ BundleData bundleData = e.getValue();
+ TimeAverageMessageData shortTermData =
bundleData.getShortTermData();
+ double throughput = shortTermData.getMsgThroughputIn() +
shortTermData.getMsgThroughputOut();
+ return Pair.create(bundle, throughput);
+ }).filter(e -> {
+ // Only consider bundles that were not already unloaded
recently
+ return !recentlyUnloadedBundles.containsKey(e.first);
+ }).sorted((e1, e2) -> {
+ // Sort by throughput in reverse order
+ return Double.compare(e2.second, e1.second);
+ }).forEach(e -> {
+ if (trafficMarkedToOffload.doubleValue() <
minimumThroughputToOffload
+ || atLeastOneBundleSelected.isFalse()) {
+ selectedBundlesCache.put(broker, e.first);
+ trafficMarkedToOffload.add(e.second);
+ atLeastOneBundleSelected.setTrue();
+ }
+ });
+ } else if (localData.getBundles().size() == 1) {
+ log.warn(
+ "HIGH USAGE WARNING : Sole namespace bundle {} is
overloading broker {}. "
+ + "No Load Shedding will be done on this
broker",
+ localData.getBundles().iterator().next(), broker);
+ } else {
+ log.warn("Broker {} is overloaded despite having no bundles",
broker);
+ }
+
+ });
+
return selectedBundlesCache;
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedderTest.java
new file mode 100644
index 000000000..7b0df5d5e
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedderTest.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+import org.apache.pulsar.broker.BrokerData;
+import org.apache.pulsar.broker.BundleData;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TimeAverageMessageData;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.testng.annotations.Test;
+
+public class OverloadShedderTest {
+
+ private final OverloadShedder os = new OverloadShedder();
+ private final ServiceConfiguration conf;
+
+ public OverloadShedderTest() {
+ conf = new ServiceConfiguration();
+ conf.setLoadBalancerBrokerOverloadedThresholdPercentage(85);
+ }
+
+ @Test
+ public void testNoBrokers() {
+ LoadData loadData = new LoadData();
+ assertTrue(os.findBundlesForUnloading(loadData, conf).isEmpty());
+ }
+
+ @Test
+ public void testBrokersWithNoBundles() {
+ LoadData loadData = new LoadData();
+
+ LocalBrokerData broker1 = new LocalBrokerData();
+ broker1.setBandwidthIn(new ResourceUsage(999, 1000));
+ broker1.setBandwidthOut(new ResourceUsage(999, 1000));
+ loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+
+ assertTrue(os.findBundlesForUnloading(loadData, conf).isEmpty());
+ }
+
+ @Test
+ public void testBrokerNotOverloaded() {
+ LoadData loadData = new LoadData();
+
+ LocalBrokerData broker1 = new LocalBrokerData();
+ broker1.setBandwidthIn(new ResourceUsage(500, 1000));
+ broker1.setBandwidthOut(new ResourceUsage(500, 1000));
+ broker1.setBundles(Sets.newHashSet("bundle-1"));
+
+ BundleData bundle1 = new BundleData();
+ TimeAverageMessageData db1 = new TimeAverageMessageData();
+ db1.setMsgThroughputIn(1000);
+ db1.setMsgThroughputOut(1000);
+ bundle1.setShortTermData(db1);
+ loadData.getBundleData().put("bundle-1", bundle1);
+
+ loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+
+ assertTrue(os.findBundlesForUnloading(loadData, conf).isEmpty());
+ }
+
+ @Test
+ public void testBrokerWithSingleBundle() {
+ LoadData loadData = new LoadData();
+
+ LocalBrokerData broker1 = new LocalBrokerData();
+ broker1.setBandwidthIn(new ResourceUsage(999, 1000));
+ broker1.setBandwidthOut(new ResourceUsage(999, 1000));
+ broker1.setBundles(Sets.newHashSet("bundle-1"));
+
+ BundleData bundle1 = new BundleData();
+ TimeAverageMessageData db1 = new TimeAverageMessageData();
+ db1.setMsgThroughputIn(1000);
+ db1.setMsgThroughputOut(1000);
+ bundle1.setShortTermData(db1);
+ loadData.getBundleData().put("bundle-1", bundle1);
+
+ loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+
+ assertTrue(os.findBundlesForUnloading(loadData, conf).isEmpty());
+ }
+
+ @Test
+ public void testBrokerWithMultipleBundles() {
+ int numBundles = 10;
+ LoadData loadData = new LoadData();
+
+ LocalBrokerData broker1 = new LocalBrokerData();
+ broker1.setBandwidthIn(new ResourceUsage(999, 1000));
+ broker1.setBandwidthOut(new ResourceUsage(999, 1000));
+
+ double brokerThroghput = 0;
+
+ for (int i = 1; i <= numBundles; i++) {
+ broker1.getBundles().add("bundle-" + i);
+
+ BundleData bundle = new BundleData();
+ TimeAverageMessageData db = new TimeAverageMessageData();
+
+ double throughput = i * 1024 * 1024;
+ db.setMsgThroughputIn(throughput);
+ db.setMsgThroughputOut(throughput);
+ bundle.setShortTermData(db);
+ loadData.getBundleData().put("bundle-" + i, bundle);
+
+ brokerThroghput += throughput;
+ }
+
+ broker1.setMsgThroughputIn(brokerThroghput);
+ broker1.setMsgThroughputOut(brokerThroghput);
+
+ loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+
+ Multimap<String, String> bundlesToUnload =
os.findBundlesForUnloading(loadData, conf);
+ assertFalse(bundlesToUnload.isEmpty());
+ assertEquals(bundlesToUnload.get("broker-1"),
Lists.newArrayList("bundle-10", "bundle-9"));
+ }
+
+ @Test
+ public void testFilterRecentlyUnloaded() {
+ int numBundles = 10;
+ LoadData loadData = new LoadData();
+
+ LocalBrokerData broker1 = new LocalBrokerData();
+ broker1.setBandwidthIn(new ResourceUsage(999, 1000));
+ broker1.setBandwidthOut(new ResourceUsage(999, 1000));
+
+ double brokerThroghput = 0;
+
+ for (int i = 1; i <= numBundles; i++) {
+ broker1.getBundles().add("bundle-" + i);
+
+ BundleData bundle = new BundleData();
+ TimeAverageMessageData db = new TimeAverageMessageData();
+
+ double throughput = i * 1024 * 1024;
+ db.setMsgThroughputIn(throughput);
+ db.setMsgThroughputOut(throughput);
+ bundle.setShortTermData(db);
+ loadData.getBundleData().put("bundle-" + i, bundle);
+
+ brokerThroghput += throughput;
+ }
+
+ broker1.setMsgThroughputIn(brokerThroghput);
+ broker1.setMsgThroughputOut(brokerThroghput);
+
+ loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+
+ loadData.getRecentlyUnloadedBundles().put("bundle-10", 1L);
+ loadData.getRecentlyUnloadedBundles().put("bundle-9", 1L);
+
+ Multimap<String, String> bundlesToUnload =
os.findBundlesForUnloading(loadData, conf);
+ assertFalse(bundlesToUnload.isEmpty());
+ assertEquals(bundlesToUnload.get("broker-1"),
Lists.newArrayList("bundle-8", "bundle-7"));
+ }
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
index 4c754d2fb..129cf5d81 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
@@ -108,7 +108,7 @@ public LocalBrokerData(final String webServiceUrl, final
String webServiceUrlTls
/**
* Using the system resource usage and bundle stats acquired from the
Pulsar client, update this LocalBrokerData.
- *
+ *
* @param systemResourceUsage
* System resource usage (cpu, memory, and direct memory).
* @param bundleStats
@@ -123,7 +123,7 @@ public void update(final SystemResourceUsage
systemResourceUsage,
/**
* Using another LocalBrokerData, update this.
- *
+ *
* @param other
* LocalBrokerData to update from.
*/
@@ -196,10 +196,20 @@ private void updateBundleData(final Map<String,
NamespaceBundleStats> bundleStat
}
public double getMaxResourceUsage() {
- return Math
- .max(Math.max(Math.max(cpu.percentUsage(),
memory.percentUsage()),
- Math.max(directMemory.percentUsage(),
bandwidthIn.percentUsage())), bandwidthOut.percentUsage())
- / 100;
+ return max(cpu.percentUsage(), memory.percentUsage(),
directMemory.percentUsage(), bandwidthIn.percentUsage(),
+ bandwidthOut.percentUsage()) / 100;
+ }
+
+ private static float max(float...args) {
+ float max = Float.NEGATIVE_INFINITY;
+
+ for (float d : args) {
+ if (d > max) {
+ max = d;
+ }
+ }
+
+ return max;
}
public String getLoadReportType() {
@@ -392,7 +402,7 @@ public String getPulsarServiceUrl() {
public String getPulsarServiceUrlTls() {
return pulsarServiceUrlTls;
}
-
+
@Override
public boolean isPersistentTopicsEnabled() {
return persistentTopicsEnabled;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services