This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b9bdfa1 Add ThresholdShedder Strategy for loadbalancer and expose
loadbalance metric to prometheus (#6772)
b9bdfa1 is described below
commit b9bdfa12637fdeb67a23c410a799f1d4d4cb99d6
Author: hangc0276 <[email protected]>
AuthorDate: Wed Apr 22 16:48:41 2020 +0800
Add ThresholdShedder Strategy for loadbalancer and expose loadbalance
metric to prometheus (#6772)
### Motivation
The Only one overload shedder strategy is `OverloadShedder`, which collects
each broker's max resource usage and compare with threshold (default value is
85%). When max resource usage reaches the threshold, it will trigger bundle
unloading, which will migrate parts of bundles to other brokers. The overload
shedder strategy has some drawbacks as follows:
- Not support configure other overload shedder strategies
- It is hard to determine the threshold value, the default threshold is
85%. But for a broker, the max resource usage is few to reach 85%, which will
lead to unbalanced traffic between brokers. The heavy traffic broker's read
cache hit rate will decrease.
- When you restart the most brokers of the pulsar cluster at the same time,
the whole traffic in the cluster will goes to the rest brokers. The restarted
brokers will have no traffic for a long time, due to the rest brokers max
resource usage not reach the threshold.
### Changes
1. Support multiple overload shedder strategy, which only need to configure
in `broker.conf`
2. I develop `ThresholdShedder` strategy, the main idea as follow:
- Calculate the average resource usage of the brokers, and individual
broker resource usage will compare with the average value. If it greatter than
average value plus threshold, the overload shedder will be triggered.
`broker resource usage > average resource usage + threshold`
- Each kind of resources (ie bandwithIn, bandwithOut, CPU, Memory,
Direct Memory), has weight(default is 1.0) when calculate broker's resource
usage.
- Record the pulsar broker cluster history average resource usage, new
average resource usage will be calculate as follow:
`new_avg = old_avg * factor + (1-factor) * avg`
`new_avg`: newest average resoruce usage
`old_avg`: old average resource usge which is calculate in last round.
`factor`: the decrease factor, default value is `0.9`
`avg`: the average resource usage of the brokers
3. expose load balance metric to prometheus
4. fix a bug in `OverloadShedder`, which specify the unloaded bundle in the
overload's own broker.
Please help check this implementation, if it is ok, i will add test case.
---
conf/broker.conf | 37 +++++
conf/standalone.conf | 34 +++++
.../apache/pulsar/broker/ServiceConfiguration.java | 64 +++++++++
.../broker/loadbalance/ModularLoadManager.java | 9 ++
.../loadbalance/impl/ModularLoadManagerImpl.java | 121 +++++++++++++++-
.../impl/ModularLoadManagerWrapper.java | 2 +-
.../broker/loadbalance/impl/OverloadShedder.java | 4 +-
.../broker/loadbalance/impl/ThresholdShedder.java | 158 +++++++++++++++++++++
.../data/loadbalancer/LocalBrokerData.java | 20 +++
site2/docs/reference-metrics.md | 35 +++++
10 files changed, 481 insertions(+), 3 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index b97deb6..2ac91bd 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -754,6 +754,43 @@
supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally
# Default algorithm name for namespace bundle split
defaultNamespaceBundleSplitAlgorithm=range_equally_divide
+# load shedding strategy, support OverloadShedder and ThresholdShedder,
default is OverloadShedder
+loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.OverloadShedder
+
+# The broker resource usage threshold.
+# When the broker resource usage is gratter than the pulsar cluster average
resource usge,
+# the threshold shedder will be triggered to offload bundles from the broker.
+# It only take effect in ThresholdSheddler strategy.
+loadBalancerBrokerThresholdShedderPercentage=10
+
+# When calculating new resource usage, the history usage accounts for.
+# It only take effect in ThresholdSheddler strategy.
+loadBalancerHistoryResourcePercentage=0.9
+
+# The BandWithIn usage weight when calculating new resourde usage.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerBandwithInResourceWeight=1.0
+
+# The BandWithOut usage weight when calculating new resourde usage.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerBandwithOutResourceWeight=1.0
+
+# The CPU usage weight when calculating new resourde usage.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerCPUResourceWeight=1.0
+
+# The heap memory usage weight when calculating new resourde usage.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerMemoryResourceWeight=1.0
+
+# The direct memory usage weight when calculating new resourde usage.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerDirectMemoryResourceWeight=1.0
+
+# Bundle unload minimum throughput threshold (MB), avoding bundle unload
frequently.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerBundleUnloadMinThroughputThreshold=10
+
### --- Replication --- ###
# Enable replication metrics
diff --git a/conf/standalone.conf b/conf/standalone.conf
index b2b0e83..43138f8 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -499,6 +499,40 @@ loadBalancerNamespaceBundleMaxBandwidthMbytes=100
# maximum number of bundles in a namespace
loadBalancerNamespaceMaximumBundles=128
+# The broker resource usage threshold.
+# When the broker resource usage is gratter than the pulsar cluster average
resource usge,
+# the threshold shedder will be triggered to offload bundles from the broker.
+# It only take effect in ThresholdSheddler strategy.
+loadBalancerBrokerThresholdShedderPercentage=10
+
+# When calculating new resource usage, the history usage accounts for.
+# It only take effect in ThresholdSheddler strategy.
+loadBalancerHistoryResourcePercentage=0.9
+
+# The BandWithIn usage weight when calculating new resourde usage.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerBandwithInResourceWeight=1.0
+
+# The BandWithOut usage weight when calculating new resourde usage.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerBandwithOutResourceWeight=1.0
+
+# The CPU usage weight when calculating new resourde usage.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerCPUResourceWeight=1.0
+
+# The heap memory usage weight when calculating new resourde usage.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerMemoryResourceWeight=1.0
+
+# The direct memory usage weight when calculating new resourde usage.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerDirectMemoryResourceWeight=1.0
+
+# Bundle unload minimum throughput threshold (MB), avoding bundle unload
frequently.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerBundleUnloadMinThroughputThreshold=10
+
### --- Replication --- ###
# Enable replication metrics
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 136b341..96ba8f2 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1146,6 +1146,13 @@ public class ServiceConfiguration implements
PulsarConfiguration {
doc = "load placement
strategy[weightedRandomSelection/leastLoadedServer] (only used by
SimpleLoadManagerImpl)"
)
private String loadBalancerPlacementStrategy = "leastLoadedServer"; //
weighted random selection
+
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "load balance load shedding strategy"
+ )
+ private String loadBalancerLoadSheddingStrategy =
"org.apache.pulsar.broker.loadbalance.impl.OverloadShedder";
+
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
@@ -1201,6 +1208,63 @@ public class ServiceConfiguration implements
PulsarConfiguration {
doc = "Usage threshold to determine a broker as over-loaded"
)
private int loadBalancerBrokerOverloadedThresholdPercentage = 85;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Usage threshold to determine a broker whether to start
threshold shedder"
+ )
+ private int loadBalancerBrokerThresholdShedderPercentage = 10;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Resource history Usage Percentage When adding new resource
usage info"
+ )
+ private double loadBalancerHistoryResourcePercentage = 0.9;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "BandwithIn Resource Usage Weight"
+ )
+ private double loadBalancerBandwithInResourceWeight = 1.0;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "BandwithOut Resource Usage Weight"
+ )
+ private double loadBalancerBandwithOutResourceWeight = 1.0;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "CPU Resource Usage Weight"
+ )
+ private double loadBalancerCPUResourceWeight = 1.0;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Memory Resource Usage Weight"
+ )
+ private double loadBalancerMemoryResourceWeight = 1.0;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Direct Memory Resource Usage Weight"
+ )
+ private double loadBalancerDirectMemoryResourceWeight = 1.0;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Bundle unload minimum throughput threshold (MB)"
+ )
+ private double loadBalancerBundleUnloadMinThroughputThreshold = 10;
+
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
doc = "Interval to flush dynamic resource quota to ZooKeeper"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
index 658aeba..8538718 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
@@ -18,12 +18,14 @@
*/
package org.apache.pulsar.broker.loadbalance;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
@@ -119,4 +121,11 @@ public interface ModularLoadManager {
* @return
*/
LocalBrokerData getBrokerLocalData(String broker);
+
+ /**
+ * Fetch load balancing metrics.
+ *
+ * @return List of LoadBalancing Metrics
+ */
+ List<Metrics> getLoadBalancingMetrics();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 11b410d..ba53837 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -22,6 +22,7 @@ import static
org.apache.pulsar.broker.admin.AdminResource.jsonMapper;
import static
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.web.PulsarWebResource.path;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
@@ -35,10 +36,12 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.StringUtils;
@@ -67,6 +70,7 @@ import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.ResourceQuota;
+import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
@@ -185,6 +189,17 @@ public class ModularLoadManagerImpl implements
ModularLoadManager, ZooKeeperCach
private static final Deserializer<LocalBrokerData> loadReportDeserializer
= (key, content) -> jsonMapper()
.readValue(content, LocalBrokerData.class);
+ // record load balancing metrics
+ private AtomicReference<List<Metrics>> loadBalancingMetrics = new
AtomicReference<>();
+ // record bundle unload metrics
+ private AtomicReference<List<Metrics>> bundleUnloadMetrics = new
AtomicReference<>();
+ // record bundle split metrics
+ private AtomicReference<List<Metrics>> bundleSplitMetrics = new
AtomicReference<>();
+
+ private long bundleSplitCount = 0;
+ private long unloadBrokerCount = 0;
+ private long unloadBundleCount = 0;
+
/**
* Initializes fields which do not depend on PulsarService.
initialize(PulsarService) should subsequently be called.
*/
@@ -195,7 +210,6 @@ public class ModularLoadManagerImpl implements
ModularLoadManager, ZooKeeperCach
filterPipeline = new ArrayList<>();
loadData = new LoadData();
loadSheddingPipeline = new ArrayList<>();
- loadSheddingPipeline.add(new OverloadShedder());
preallocatedBundleToBroker = new ConcurrentHashMap<>();
scheduler = Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("pulsar-modular-load-manager"));
this.brokerToFailureDomainMap = Maps.newHashMap();
@@ -275,6 +289,25 @@ public class ModularLoadManagerImpl implements
ModularLoadManager, ZooKeeperCach
.registerListener((path, data, stat) -> scheduler.execute(()
-> refreshBrokerToFailureDomainMap()));
pulsar.getConfigurationCache().failureDomainCache()
.registerListener((path, data, stat) -> scheduler.execute(()
-> refreshBrokerToFailureDomainMap()));
+
+ loadSheddingPipeline.add(createLoadSheddingStrategy());
+ }
+
+ private LoadSheddingStrategy createLoadSheddingStrategy() {
+ try {
+ Class<?> loadSheddingClass =
Class.forName(conf.getLoadBalancerLoadSheddingStrategy());
+ Object loadSheddingInstance = loadSheddingClass.newInstance();
+ if (loadSheddingInstance instanceof LoadSheddingStrategy) {
+ return (LoadSheddingStrategy) loadSheddingInstance;
+ } else {
+ log.error("create load shedding strategy failed. using
OverloadShedder instead.");
+ return new OverloadShedder();
+ }
+ } catch (Exception e) {
+ log.error("Error when trying to create load shedding strategy: ",
e);
+ }
+
+ return new OverloadShedder();
}
/**
@@ -593,9 +626,32 @@ public class ModularLoadManagerImpl implements
ModularLoadManager, ZooKeeperCach
}
});
});
+
+ updateBundleUnloadingMetrics(bundlesToUnload);
}
}
+ /**
+ * As leader broker, update bundle unloading metrics.
+ *
+ * @param bundlesToUnload
+ */
+ private void updateBundleUnloadingMetrics(Multimap<String, String>
bundlesToUnload) {
+ unloadBrokerCount += bundlesToUnload.keySet().size();
+ unloadBundleCount += bundlesToUnload.values().size();
+
+ List<Metrics> metrics = Lists.newArrayList();
+ Map<String, String> dimensions = new HashMap<>();
+
+ dimensions.put("metric", "bundleUnloading");
+
+ Metrics m = Metrics.create(dimensions);
+ m.put("brk_lb_unload_broker_count", unloadBrokerCount);
+ m.put("brk_lb_unload_bundle_count", unloadBundleCount);
+ metrics.add(m);
+ this.bundleUnloadMetrics.set(metrics);
+ }
+
public boolean shouldAntiAffinityNamespaceUnload(String namespace, String
bundle, String currentBroker) {
try {
Optional<Policies> nsPolicies =
pulsar.getConfigurationCache().policiesCache()
@@ -659,11 +715,32 @@ public class ModularLoadManagerImpl implements
ModularLoadManager, ZooKeeperCach
log.error("Failed to split namespace bundle {}",
bundleName, e);
}
}
+
+ updateBundleSplitMetrics(bundlesToBeSplit);
}
}
/**
+ * As leader broker, update bundle split metrics.
+ *
+ * @param bundlesToBeSplit
+ */
+ private void updateBundleSplitMetrics(Set<String> bundlesToBeSplit) {
+ bundleSplitCount += bundlesToBeSplit.size();
+
+ List<Metrics> metrics = Lists.newArrayList();
+ Map<String, String> dimensions = new HashMap<>();
+
+ dimensions.put("metric", "bundlesSplit");
+
+ Metrics m = Metrics.create(dimensions);
+ m.put("brk_lb_bundles_split_count", bundleSplitCount);
+ metrics.add(m);
+ this.bundleSplitMetrics.set(metrics);
+ }
+
+ /**
* When the broker data ZooKeeper nodes are updated, update the broker
data map.
*/
@Override
@@ -851,6 +928,7 @@ public class ModularLoadManagerImpl implements
ModularLoadManager, ZooKeeperCach
try {
final SystemResourceUsage systemResourceUsage =
LoadManagerShared.getSystemResourceUsage(brokerHostUsage);
localData.update(systemResourceUsage, getBundleStats());
+ updateLoadBalancingMetrics(systemResourceUsage);
} catch (Exception e) {
log.warn("Error when attempting to update local broker data", e);
}
@@ -858,6 +936,28 @@ public class ModularLoadManagerImpl implements
ModularLoadManager, ZooKeeperCach
}
/**
+ * As any broker, update System Resource Usage Percentage.
+ *
+ * @param systemResourceUsage
+ */
+ private void updateLoadBalancingMetrics(final SystemResourceUsage
systemResourceUsage) {
+ List<Metrics> metrics = Lists.newArrayList();
+ Map<String, String> dimensions = new HashMap<>();
+
+ dimensions.put("broker", conf.getAdvertisedAddress());
+ dimensions.put("metric", "loadBalancing");
+
+ Metrics m = Metrics.create(dimensions);
+ m.put("brk_lb_cpu_usage", systemResourceUsage.getCpu().percentUsage());
+ m.put("brk_lb_memory_usage",
systemResourceUsage.getMemory().percentUsage());
+ m.put("brk_lb_directMemory_usage",
systemResourceUsage.getDirectMemory().percentUsage());
+ m.put("brk_lb_bandwidth_in_usage",
systemResourceUsage.getBandwidthIn().percentUsage());
+ m.put("brk_lb_bandwidth_out_usage",
systemResourceUsage.getBandwidthOut().percentUsage());
+ metrics.add(m);
+ this.loadBalancingMetrics.set(metrics);
+ }
+
+ /**
* As any broker, write the local broker data to ZooKeeper.
*/
@Override
@@ -987,4 +1087,23 @@ public class ModularLoadManagerImpl implements
ModularLoadManager, ZooKeeperCach
return null;
}
}
+
+ @Override
+ public List<Metrics> getLoadBalancingMetrics() {
+ List<Metrics> metricsCollection = new ArrayList<>();
+
+ if (this.loadBalancingMetrics.get() != null) {
+ metricsCollection.addAll(this.loadBalancingMetrics.get());
+ }
+
+ if (this.bundleUnloadMetrics.get() != null) {
+ metricsCollection.addAll(this.bundleUnloadMetrics.get());
+ }
+
+ if (this.bundleSplitMetrics.get() != null) {
+ metricsCollection.addAll(this.bundleSplitMetrics.get());
+ }
+
+ return metricsCollection;
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index 8450df5..701a217 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -83,7 +83,7 @@ public class ModularLoadManagerWrapper implements LoadManager
{
@Override
public List<Metrics> getLoadBalancingMetrics() {
- return Collections.emptyList();
+ return loadManager.getLoadBalancingMetrics();
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
index bda69fe..d67944c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
@@ -110,7 +110,9 @@ public class OverloadShedder implements
LoadSheddingStrategy {
}).filter(e -> {
// Only consider bundles that were not already unloaded
recently
return !recentlyUnloadedBundles.containsKey(e.getLeft());
- }).sorted((e1, e2) -> {
+ }).filter(e ->
+ localData.getBundles().contains(e.getLeft())
+ ).sorted((e1, e2) -> {
// Sort by throughput in reverse order
return Double.compare(e2.getRight(), e1.getRight());
}).forEach(e -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
new file mode 100644
index 0000000..a7bac07
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableDouble;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.BrokerData;
+import org.apache.pulsar.broker.BundleData;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TimeAverageMessageData;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThresholdShedder implements LoadSheddingStrategy {
+ private static final Logger log =
LoggerFactory.getLogger(ThresholdShedder.class);
+
+ private final Multimap<String, String> selectedBundlesCache =
ArrayListMultimap.create();
+
+ private final static double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05;
+
+ private final double MB = 1024 * 1024;
+
+ private final Map<String, Double> brokerAvgResourceUsage = new HashMap<>();
+
+ @Override
+ public Multimap<String, String> findBundlesForUnloading(final LoadData
loadData, final ServiceConfiguration conf) {
+ selectedBundlesCache.clear();
+ final double threshold =
conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+ final Map<String, Long> recentlyUnloadedBundles =
loadData.getRecentlyUnloadedBundles();
+ final double minThroughputThreshold =
conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+
+ final double avgUsage = getBrokerAvgUsage(loadData,
conf.getLoadBalancerHistoryResourcePercentage(), conf);
+
+ if (avgUsage == 0) {
+ log.warn("average max resource usage is 0");
+ return selectedBundlesCache;
+ }
+
+ loadData.getBrokerData().forEach((broker, brokerData) -> {
+ final LocalBrokerData localData = brokerData.getLocalData();
+ final double currentUsage =
brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+
+ if (currentUsage < avgUsage + threshold) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] broker is not overloaded, ignoring at this
point", broker);
+ }
+ return;
+ }
+
+ double percentOfTrafficToOffload = currentUsage - avgUsage -
threshold + ADDITIONAL_THRESHOLD_PERCENT_MARGIN;
+ double brokerCurrentThroughput = localData.getMsgThroughputIn() +
localData.getMsgThroughputOut();
+ double minimumThroughputToOffload = brokerCurrentThroughput *
percentOfTrafficToOffload;
+
+ if (minimumThroughputToOffload < minThroughputThreshold) {
+ if (log.isDebugEnabled()) {
+ log.info("[{}] broker is planning to shed throughput {}
MByte/s less than " +
+ "minimumThroughputThreshold {} MByte/s,
skipping bundle unload.",
+ broker, minimumThroughputToOffload / MB,
minThroughputThreshold / MB);
+ }
+ return;
+ }
+
+ log.info(
+ "Attempting to shed load on {}, which has max resource
usage above avgUsage and threshold {}%" +
+ " > {}% + {}% -- Offloading at least {} MByte/s of
traffic, left throughput {} MByte/s",
+ broker, currentUsage, avgUsage, threshold,
minimumThroughputToOffload / MB,
+ (brokerCurrentThroughput - minimumThroughputToOffload) /
MB);
+
+ MutableDouble trafficMarkedToOffload = new MutableDouble(0);
+ MutableBoolean atLeastOneBundleSelected = new
MutableBoolean(false);
+
+ if (localData.getBundles().size() > 1) {
+ loadData.getBundleData().entrySet().stream().map((e) -> {
+ String bundle = e.getKey();
+ BundleData bundleData = e.getValue();
+ TimeAverageMessageData shortTermData =
bundleData.getShortTermData();
+ double throughput = shortTermData.getMsgThroughputIn() +
shortTermData.getMsgThroughputOut();
+ return Pair.of(bundle, throughput);
+ }).filter(e ->
+ !recentlyUnloadedBundles.containsKey(e.getLeft())
+ ).filter(e ->
+ localData.getBundles().contains(e.getLeft())
+ ).sorted((e1, e2) ->
+ Double.compare(e2.getRight(), e1.getRight())
+ ).forEach(e -> {
+ if (trafficMarkedToOffload.doubleValue() <
minimumThroughputToOffload
+ || atLeastOneBundleSelected.isFalse()) {
+ selectedBundlesCache.put(broker, e.getLeft());
+ trafficMarkedToOffload.add(e.getRight());
+ atLeastOneBundleSelected.setTrue();
+ }
+ });
+ } else if (localData.getBundles().size() == 1) {
+ log.warn(
+ "HIGH USAGE WARNING : Sole namespace bundle {} is
overloading broker {}. " +
+ "No Load Shadding will be done on this broker",
+ localData.getBundles().iterator().next(), broker);
+ } else {
+ log.warn("Broker {} is overloaded despit having no bundles",
broker);
+ }
+ });
+
+ return selectedBundlesCache;
+ }
+
+ private double getBrokerAvgUsage(final LoadData loadData, final double
historyPercentage,
+ final ServiceConfiguration conf) {
+ double totalUsage = 0.0;
+ int totalBrokers = 0;
+
+ for (Map.Entry<String, BrokerData> entry :
loadData.getBrokerData().entrySet()) {
+ LocalBrokerData localBrokerData = entry.getValue().getLocalData();
+ String broker = entry.getKey();
+ updateAvgResourceUsage(broker, localBrokerData, historyPercentage,
conf);
+ totalUsage += brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+ totalBrokers++;
+ }
+
+ return totalBrokers > 0 ? totalUsage / totalBrokers : 0;
+ }
+
+ private void updateAvgResourceUsage(String broker, LocalBrokerData
localBrokerData, final double historyPercentage,
+ final ServiceConfiguration conf) {
+ double historyUsage = brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+ historyUsage = historyUsage * historyPercentage +
+ (1 - historyPercentage) *
localBrokerData.getMaxResourceUsageWithWeight(conf.getLoadBalancerCPUResourceWeight(),
+ conf.getLoadBalancerMemoryResourceWeight(),
conf.getLoadBalancerDirectMemoryResourceWeight(),
+ conf.getLoadBalancerBandwithInResourceWeight(),
conf.getLoadBalancerBandwithOutResourceWeight());
+ brokerAvgResourceUsage.put(broker, historyUsage);
+ }
+
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
index a2d9814..6b4ea64 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
@@ -212,6 +212,26 @@ public class LocalBrokerData extends JSONWritable
implements LoadManagerReport {
bandwidthOut.percentUsage());
}
+ public double getMaxResourceUsageWithWeight(final double cpuWeight, final
double memoryWeight,
+ final double
directMemoryWeight, final double bandwithInWeight,
+ final double
bandWithOutWeight) {
+ return max(cpu.percentUsage() * cpuWeight, memory.percentUsage() *
memoryWeight,
+ directMemory.percentUsage() * directMemoryWeight,
bandwidthIn.percentUsage() * bandwithInWeight,
+ bandwidthOut.percentUsage() * bandWithOutWeight) / 100;
+ }
+
+ private static double max(double... args) {
+ double max = Double.NEGATIVE_INFINITY;
+
+ for (double d : args) {
+ if (d > max) {
+ max = d;
+ }
+ }
+
+ return max;
+ }
+
private static float max(float...args) {
float max = Float.NEGATIVE_INFINITY;
diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md
index f629344..70c3120 100644
--- a/site2/docs/reference-metrics.md
+++ b/site2/docs/reference-metrics.md
@@ -100,6 +100,9 @@ Broker has the following kinds of metrics:
* [Replication metrics](#replication-metrics)
* [Topic metrics](#topic-metrics)
* [Replication metrics](#replication-metrics-1)
+* [LoadBalancing metrics](#loadbalancing-metrics)
+ * [BundleUnloading metrics](#bundleunloading-metrics)
+ * [BundleSplit metrics](#bundlesplit-metrics)
* [Subscription metrics](#subscription-metrics)
* [Consumer metrics](#consumer-metrics)
@@ -190,6 +193,38 @@ All the replication metrics will also be labelled with
`remoteCluster=${pulsar_r
| pulsar_replication_throughput_out | Gauge | The total throughput of the
topic replicating to remote cluster (bytes/second). |
| pulsar_replication_backlog | Gauge | The total backlog of the topic
replicating to remote cluster (messages). |
+### LoadBalancing metrics
+All the loadbalancing metrics are labelled with the following labels:
+- cluster: cluster=${pulsar_cluster}. ${pulsar_cluster} is the cluster name
that you configured in broker.conf.
+- broker: broker=${broker}. ${broker} is the ip address of the broker
+- metric: metric="loadBalancing".
+
+| Name | Type | Description |
+| --- | --- | --- |
+| pulsar_lb_bandwidth_in_usage | Gauge | The broker bandwith in usage |
+| pulsar_lb_bandwidth_out_usage | Gauge | The broker bandwith out usage |
+| pulsar_lb_cpu_usage | Gauge | The broker cpu usage |
+| pulsar_lb_directMemory_usage | Gauge | The broker process direct memory
usage |
+| pulsar_lb_memory_usage | Gauge | The broker process memory usage |
+
+#### BundleUnloading metrics
+All the bundleUnloading metrics are labelled with the following labels:
+- cluster: cluster=${pulsar_cluster}. ${pulsar_cluster} is the cluster name
that you configured in broker.conf.
+- metric: metric="bundleUnloading".
+
+| Name | Type | Description |
+| --- | --- | --- |
+| pulsar_lb_unload_broker_count | Counter | Unload broker count in this bundle
unloading |
+| pulsar_lb_unload_bundle_count | Counter | Bundle unload count in this bundle
unloading |
+
+#### BundleSplit metrics
+All the bundleUnloading metrics are labelled with the following labels:
+- cluster: cluster=${pulsar_cluster}. ${pulsar_cluster} is the cluster name
that you configured in broker.conf.
+- metric: metric="bundlesSplit".
+
+| Name | Type | Description |
+| --- | --- | --- |
+| pulsar_lb_bundles_split_count | Counter | bundle split count in this bundle
splitting check interval |
### Subscription metrics