This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b9bdfa1  Add ThresholdShedder Strategy for loadbalancer and expose 
loadbalance metric to prometheus (#6772)
b9bdfa1 is described below

commit b9bdfa12637fdeb67a23c410a799f1d4d4cb99d6
Author: hangc0276 <[email protected]>
AuthorDate: Wed Apr 22 16:48:41 2020 +0800

    Add ThresholdShedder Strategy for loadbalancer and expose loadbalance 
metric to prometheus (#6772)
    
    ### Motivation
    The Only one overload shedder strategy is `OverloadShedder`, which collects 
each broker's max resource usage and compare with threshold (default value is 
85%). When max resource usage reaches the threshold, it will trigger bundle 
unloading, which will migrate parts of bundles to other brokers. The overload 
shedder strategy has some drawbacks as follows:
    - Not support configure other overload shedder strategies
    - It is hard to determine the threshold value,  the default threshold is 
85%. But for a broker, the max resource usage is few to reach 85%, which will 
lead to unbalanced traffic between brokers. The heavy traffic broker's read 
cache hit rate will decrease.
    - When you restart the most brokers of the pulsar cluster at the same time, 
the whole traffic in the cluster will goes to the rest brokers. The restarted 
brokers will have no traffic for a long time, due to the rest brokers max 
resource usage not reach the threshold.
    
    ### Changes
    1. Support multiple overload shedder strategy, which only need to configure 
in `broker.conf`
    2. I develop `ThresholdShedder` strategy, the main idea as follow:
        - Calculate the average resource usage of the brokers, and individual 
broker resource usage will compare with the average value. If it greatter than 
average value plus threshold, the overload shedder will be triggered.
        `broker resource usage > average resource usage + threshold`
        - Each kind of resources (ie bandwithIn, bandwithOut, CPU, Memory, 
Direct Memory), has weight(default is 1.0) when calculate broker's resource 
usage.
        - Record the pulsar broker cluster history average resource usage, new 
average resource usage will be calculate as follow:
        `new_avg = old_avg * factor + (1-factor) * avg`
        `new_avg`: newest average resoruce usage
        `old_avg`: old average resource usge which is calculate in last round.
        `factor`: the decrease factor, default value is `0.9`
        `avg`: the average resource usage of the brokers
    3. expose load balance metric to prometheus
    4. fix a bug in `OverloadShedder`, which specify the unloaded bundle in the 
overload's own broker.
    
    Please help check this implementation, if it is ok, i will add test case.
---
 conf/broker.conf                                   |  37 +++++
 conf/standalone.conf                               |  34 +++++
 .../apache/pulsar/broker/ServiceConfiguration.java |  64 +++++++++
 .../broker/loadbalance/ModularLoadManager.java     |   9 ++
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 121 +++++++++++++++-
 .../impl/ModularLoadManagerWrapper.java            |   2 +-
 .../broker/loadbalance/impl/OverloadShedder.java   |   4 +-
 .../broker/loadbalance/impl/ThresholdShedder.java  | 158 +++++++++++++++++++++
 .../data/loadbalancer/LocalBrokerData.java         |  20 +++
 site2/docs/reference-metrics.md                    |  35 +++++
 10 files changed, 481 insertions(+), 3 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index b97deb6..2ac91bd 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -754,6 +754,43 @@ 
supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally
 # Default algorithm name for namespace bundle split
 defaultNamespaceBundleSplitAlgorithm=range_equally_divide
 
+# load shedding strategy, support OverloadShedder and ThresholdShedder, 
default is OverloadShedder
+loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.OverloadShedder
+
+# The broker resource usage threshold.
+# When the broker resource usage is gratter than the pulsar cluster average 
resource usge,
+# the threshold shedder will be triggered to offload bundles from the broker.
+# It only take effect in ThresholdSheddler strategy.
+loadBalancerBrokerThresholdShedderPercentage=10
+
+# When calculating new resource usage, the history usage accounts for.
+# It only take effect in ThresholdSheddler strategy.
+loadBalancerHistoryResourcePercentage=0.9
+
+# The BandWithIn usage weight when calculating new resourde usage.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerBandwithInResourceWeight=1.0
+
+# The BandWithOut usage weight when calculating new resourde usage.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerBandwithOutResourceWeight=1.0
+
+# The CPU usage weight when calculating new resourde usage.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerCPUResourceWeight=1.0
+
+# The heap memory usage weight when calculating new resourde usage.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerMemoryResourceWeight=1.0
+
+# The direct memory usage weight when calculating new resourde usage.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerDirectMemoryResourceWeight=1.0
+
+# Bundle unload minimum throughput threshold (MB), avoding bundle unload 
frequently.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerBundleUnloadMinThroughputThreshold=10
+
 ### --- Replication --- ###
 
 # Enable replication metrics
diff --git a/conf/standalone.conf b/conf/standalone.conf
index b2b0e83..43138f8 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -499,6 +499,40 @@ loadBalancerNamespaceBundleMaxBandwidthMbytes=100
 # maximum number of bundles in a namespace
 loadBalancerNamespaceMaximumBundles=128
 
+# The broker resource usage threshold.
+# When the broker resource usage is gratter than the pulsar cluster average 
resource usge,
+# the threshold shedder will be triggered to offload bundles from the broker.
+# It only take effect in ThresholdSheddler strategy.
+loadBalancerBrokerThresholdShedderPercentage=10
+
+# When calculating new resource usage, the history usage accounts for.
+# It only take effect in ThresholdSheddler strategy.
+loadBalancerHistoryResourcePercentage=0.9
+
+# The BandWithIn usage weight when calculating new resourde usage.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerBandwithInResourceWeight=1.0
+
+# The BandWithOut usage weight when calculating new resourde usage.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerBandwithOutResourceWeight=1.0
+
+# The CPU usage weight when calculating new resourde usage.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerCPUResourceWeight=1.0
+
+# The heap memory usage weight when calculating new resourde usage.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerMemoryResourceWeight=1.0
+
+# The direct memory usage weight when calculating new resourde usage.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerDirectMemoryResourceWeight=1.0
+
+# Bundle unload minimum throughput threshold (MB), avoding bundle unload 
frequently.
+# It only take effect in ThresholdShedder strategy.
+loadBalancerBundleUnloadMinThroughputThreshold=10
+
 ### --- Replication --- ###
 
 # Enable replication metrics
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 136b341..96ba8f2 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1146,6 +1146,13 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         doc = "load placement 
strategy[weightedRandomSelection/leastLoadedServer] (only used by 
SimpleLoadManagerImpl)"
     )
     private String loadBalancerPlacementStrategy = "leastLoadedServer"; // 
weighted random selection
+
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "load balance load shedding strategy"
+    )
+    private String loadBalancerLoadSheddingStrategy = 
"org.apache.pulsar.broker.loadbalance.impl.OverloadShedder";
+
     @FieldContext(
         dynamic = true,
         category = CATEGORY_LOAD_BALANCER,
@@ -1201,6 +1208,63 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         doc = "Usage threshold to determine a broker as over-loaded"
     )
     private int loadBalancerBrokerOverloadedThresholdPercentage = 85;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "Usage threshold to determine a broker whether to start 
threshold shedder"
+    )
+    private int loadBalancerBrokerThresholdShedderPercentage = 10;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "Resource history Usage Percentage When adding new resource 
usage info"
+    )
+    private double loadBalancerHistoryResourcePercentage = 0.9;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "BandwithIn Resource Usage Weight"
+    )
+    private double loadBalancerBandwithInResourceWeight = 1.0;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "BandwithOut Resource Usage Weight"
+    )
+    private double loadBalancerBandwithOutResourceWeight = 1.0;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "CPU Resource Usage Weight"
+    )
+    private double loadBalancerCPUResourceWeight = 1.0;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "Memory Resource Usage Weight"
+    )
+    private double loadBalancerMemoryResourceWeight = 1.0;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "Direct Memory Resource Usage Weight"
+    )
+    private double loadBalancerDirectMemoryResourceWeight = 1.0;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "Bundle unload minimum throughput threshold (MB)"
+    )
+    private double loadBalancerBundleUnloadMinThroughputThreshold = 10;
+
     @FieldContext(
         category = CATEGORY_LOAD_BALANCER,
         doc = "Interval to flush dynamic resource quota to ZooKeeper"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
index 658aeba..8538718 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
@@ -18,12 +18,14 @@
  */
 package org.apache.pulsar.broker.loadbalance;
 
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
 import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
@@ -119,4 +121,11 @@ public interface ModularLoadManager {
      * @return
      */
     LocalBrokerData getBrokerLocalData(String broker);
+
+    /**
+     * Fetch load balancing metrics.
+     *
+     * @return List of LoadBalancing Metrics
+     */
+    List<Metrics> getLoadBalancingMetrics();
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 11b410d..ba53837 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -22,6 +22,7 @@ import static 
org.apache.pulsar.broker.admin.AdminResource.jsonMapper;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.web.PulsarWebResource.path;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 
@@ -35,10 +36,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.HashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -67,6 +70,7 @@ import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.ResourceQuota;
+import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
@@ -185,6 +189,17 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, ZooKeeperCach
     private static final Deserializer<LocalBrokerData> loadReportDeserializer 
= (key, content) -> jsonMapper()
             .readValue(content, LocalBrokerData.class);
 
+    // record load balancing metrics
+    private AtomicReference<List<Metrics>> loadBalancingMetrics = new 
AtomicReference<>();
+    // record bundle unload metrics
+    private AtomicReference<List<Metrics>> bundleUnloadMetrics = new 
AtomicReference<>();
+    // record bundle split metrics
+    private AtomicReference<List<Metrics>> bundleSplitMetrics = new 
AtomicReference<>();
+
+    private long bundleSplitCount = 0;
+    private long unloadBrokerCount = 0;
+    private long unloadBundleCount = 0;
+
     /**
      * Initializes fields which do not depend on PulsarService. 
initialize(PulsarService) should subsequently be called.
      */
@@ -195,7 +210,6 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, ZooKeeperCach
         filterPipeline = new ArrayList<>();
         loadData = new LoadData();
         loadSheddingPipeline = new ArrayList<>();
-        loadSheddingPipeline.add(new OverloadShedder());
         preallocatedBundleToBroker = new ConcurrentHashMap<>();
         scheduler = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("pulsar-modular-load-manager"));
         this.brokerToFailureDomainMap = Maps.newHashMap();
@@ -275,6 +289,25 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, ZooKeeperCach
                 .registerListener((path, data, stat) -> scheduler.execute(() 
-> refreshBrokerToFailureDomainMap()));
         pulsar.getConfigurationCache().failureDomainCache()
                 .registerListener((path, data, stat) -> scheduler.execute(() 
-> refreshBrokerToFailureDomainMap()));
+
+        loadSheddingPipeline.add(createLoadSheddingStrategy());
+    }
+
+    private LoadSheddingStrategy createLoadSheddingStrategy() {
+        try {
+            Class<?> loadSheddingClass = 
Class.forName(conf.getLoadBalancerLoadSheddingStrategy());
+            Object loadSheddingInstance = loadSheddingClass.newInstance();
+            if (loadSheddingInstance instanceof LoadSheddingStrategy) {
+                return (LoadSheddingStrategy) loadSheddingInstance;
+            } else {
+                log.error("create load shedding strategy failed. using 
OverloadShedder instead.");
+                return new OverloadShedder();
+            }
+        } catch (Exception e) {
+            log.error("Error when trying to create load shedding strategy: ", 
e);
+        }
+
+        return new OverloadShedder();
     }
 
     /**
@@ -593,9 +626,32 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, ZooKeeperCach
                     }
                 });
             });
+
+            updateBundleUnloadingMetrics(bundlesToUnload);
         }
     }
 
+    /**
+     * As leader broker, update bundle unloading metrics.
+     *
+     * @param bundlesToUnload
+     */
+    private void updateBundleUnloadingMetrics(Multimap<String, String> 
bundlesToUnload) {
+        unloadBrokerCount += bundlesToUnload.keySet().size();
+        unloadBundleCount += bundlesToUnload.values().size();
+
+        List<Metrics> metrics = Lists.newArrayList();
+        Map<String, String> dimensions = new HashMap<>();
+
+        dimensions.put("metric", "bundleUnloading");
+
+        Metrics m = Metrics.create(dimensions);
+        m.put("brk_lb_unload_broker_count", unloadBrokerCount);
+        m.put("brk_lb_unload_bundle_count", unloadBundleCount);
+        metrics.add(m);
+        this.bundleUnloadMetrics.set(metrics);
+    }
+
     public boolean shouldAntiAffinityNamespaceUnload(String namespace, String 
bundle, String currentBroker) {
         try {
             Optional<Policies> nsPolicies = 
pulsar.getConfigurationCache().policiesCache()
@@ -659,11 +715,32 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, ZooKeeperCach
                     log.error("Failed to split namespace bundle {}", 
bundleName, e);
                 }
             }
+
+            updateBundleSplitMetrics(bundlesToBeSplit);
         }
 
     }
 
     /**
+     * As leader broker, update bundle split metrics.
+     *
+     * @param bundlesToBeSplit
+     */
+    private void updateBundleSplitMetrics(Set<String> bundlesToBeSplit) {
+        bundleSplitCount += bundlesToBeSplit.size();
+
+        List<Metrics> metrics = Lists.newArrayList();
+        Map<String, String> dimensions = new HashMap<>();
+
+        dimensions.put("metric", "bundlesSplit");
+
+        Metrics m = Metrics.create(dimensions);
+        m.put("brk_lb_bundles_split_count", bundleSplitCount);
+        metrics.add(m);
+        this.bundleSplitMetrics.set(metrics);
+    }
+
+    /**
      * When the broker data ZooKeeper nodes are updated, update the broker 
data map.
      */
     @Override
@@ -851,6 +928,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, ZooKeeperCach
         try {
             final SystemResourceUsage systemResourceUsage = 
LoadManagerShared.getSystemResourceUsage(brokerHostUsage);
             localData.update(systemResourceUsage, getBundleStats());
+            updateLoadBalancingMetrics(systemResourceUsage);
         } catch (Exception e) {
             log.warn("Error when attempting to update local broker data", e);
         }
@@ -858,6 +936,28 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, ZooKeeperCach
     }
 
     /**
+     * As any broker, update System Resource Usage Percentage.
+     *
+     * @param systemResourceUsage
+     */
+    private void updateLoadBalancingMetrics(final SystemResourceUsage 
systemResourceUsage) {
+        List<Metrics> metrics = Lists.newArrayList();
+        Map<String, String> dimensions = new HashMap<>();
+
+        dimensions.put("broker", conf.getAdvertisedAddress());
+        dimensions.put("metric", "loadBalancing");
+
+        Metrics m = Metrics.create(dimensions);
+        m.put("brk_lb_cpu_usage", systemResourceUsage.getCpu().percentUsage());
+        m.put("brk_lb_memory_usage", 
systemResourceUsage.getMemory().percentUsage());
+        m.put("brk_lb_directMemory_usage", 
systemResourceUsage.getDirectMemory().percentUsage());
+        m.put("brk_lb_bandwidth_in_usage", 
systemResourceUsage.getBandwidthIn().percentUsage());
+        m.put("brk_lb_bandwidth_out_usage", 
systemResourceUsage.getBandwidthOut().percentUsage());
+        metrics.add(m);
+        this.loadBalancingMetrics.set(metrics);
+    }
+
+    /**
      * As any broker, write the local broker data to ZooKeeper.
      */
     @Override
@@ -987,4 +1087,23 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, ZooKeeperCach
             return null;
         }
     }
+
+    @Override
+    public List<Metrics> getLoadBalancingMetrics() {
+        List<Metrics> metricsCollection = new ArrayList<>();
+
+        if (this.loadBalancingMetrics.get() != null) {
+            metricsCollection.addAll(this.loadBalancingMetrics.get());
+        }
+
+        if (this.bundleUnloadMetrics.get() != null) {
+            metricsCollection.addAll(this.bundleUnloadMetrics.get());
+        }
+
+        if (this.bundleSplitMetrics.get() != null) {
+            metricsCollection.addAll(this.bundleSplitMetrics.get());
+        }
+
+        return metricsCollection;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index 8450df5..701a217 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -83,7 +83,7 @@ public class ModularLoadManagerWrapper implements LoadManager 
{
     
     @Override
     public List<Metrics> getLoadBalancingMetrics() {
-        return Collections.emptyList();
+        return loadManager.getLoadBalancingMetrics();
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
index bda69fe..d67944c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
@@ -110,7 +110,9 @@ public class OverloadShedder implements 
LoadSheddingStrategy {
                 }).filter(e -> {
                     // Only consider bundles that were not already unloaded 
recently
                     return !recentlyUnloadedBundles.containsKey(e.getLeft());
-                }).sorted((e1, e2) -> {
+                }).filter(e ->
+                        localData.getBundles().contains(e.getLeft())
+                ).sorted((e1, e2) -> {
                     // Sort by throughput in reverse order
                     return Double.compare(e2.getRight(), e1.getRight());
                 }).forEach(e -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
new file mode 100644
index 0000000..a7bac07
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableDouble;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.BrokerData;
+import org.apache.pulsar.broker.BundleData;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TimeAverageMessageData;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThresholdShedder implements LoadSheddingStrategy {
+    private static final Logger log = 
LoggerFactory.getLogger(ThresholdShedder.class);
+
+    private final Multimap<String, String> selectedBundlesCache = 
ArrayListMultimap.create();
+
+    private final static double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05;
+
+    private final double MB = 1024 * 1024;
+
+    private final Map<String, Double> brokerAvgResourceUsage = new HashMap<>();
+
+    @Override
+    public Multimap<String, String> findBundlesForUnloading(final LoadData 
loadData, final ServiceConfiguration conf) {
+        selectedBundlesCache.clear();
+        final double threshold = 
conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final Map<String, Long> recentlyUnloadedBundles = 
loadData.getRecentlyUnloadedBundles();
+        final double minThroughputThreshold = 
conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+
+        final double avgUsage = getBrokerAvgUsage(loadData, 
conf.getLoadBalancerHistoryResourcePercentage(), conf);
+
+        if (avgUsage == 0) {
+            log.warn("average max resource usage is 0");
+            return selectedBundlesCache;
+        }
+
+        loadData.getBrokerData().forEach((broker, brokerData) -> {
+            final LocalBrokerData localData = brokerData.getLocalData();
+            final double currentUsage = 
brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+
+            if (currentUsage < avgUsage + threshold) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] broker is not overloaded, ignoring at this 
point", broker);
+                }
+                return;
+            }
+
+            double percentOfTrafficToOffload = currentUsage - avgUsage - 
threshold + ADDITIONAL_THRESHOLD_PERCENT_MARGIN;
+            double brokerCurrentThroughput = localData.getMsgThroughputIn() + 
localData.getMsgThroughputOut();
+            double minimumThroughputToOffload = brokerCurrentThroughput * 
percentOfTrafficToOffload;
+
+            if (minimumThroughputToOffload < minThroughputThreshold) {
+                if (log.isDebugEnabled()) {
+                    log.info("[{}] broker is planning to shed throughput {} 
MByte/s less than " +
+                                    "minimumThroughputThreshold {} MByte/s, 
skipping bundle unload.",
+                            broker, minimumThroughputToOffload / MB, 
minThroughputThreshold / MB);
+                }
+                return;
+            }
+
+            log.info(
+                    "Attempting to shed load on {}, which has max resource 
usage above avgUsage  and threshold {}%" +
+                            " > {}% + {}% -- Offloading at least {} MByte/s of 
traffic, left throughput {} MByte/s",
+                    broker, currentUsage, avgUsage, threshold, 
minimumThroughputToOffload / MB,
+                    (brokerCurrentThroughput - minimumThroughputToOffload) / 
MB);
+
+            MutableDouble trafficMarkedToOffload = new MutableDouble(0);
+            MutableBoolean atLeastOneBundleSelected = new 
MutableBoolean(false);
+
+            if (localData.getBundles().size() > 1) {
+                loadData.getBundleData().entrySet().stream().map((e) -> {
+                    String bundle = e.getKey();
+                    BundleData bundleData = e.getValue();
+                    TimeAverageMessageData shortTermData = 
bundleData.getShortTermData();
+                    double throughput = shortTermData.getMsgThroughputIn() + 
shortTermData.getMsgThroughputOut();
+                    return Pair.of(bundle, throughput);
+                }).filter(e ->
+                        !recentlyUnloadedBundles.containsKey(e.getLeft())
+                ).filter(e ->
+                        localData.getBundles().contains(e.getLeft())
+                ).sorted((e1, e2) ->
+                        Double.compare(e2.getRight(), e1.getRight())
+                ).forEach(e -> {
+                    if (trafficMarkedToOffload.doubleValue() < 
minimumThroughputToOffload
+                            || atLeastOneBundleSelected.isFalse()) {
+                        selectedBundlesCache.put(broker, e.getLeft());
+                        trafficMarkedToOffload.add(e.getRight());
+                        atLeastOneBundleSelected.setTrue();
+                    }
+                });
+            } else if (localData.getBundles().size() == 1) {
+                log.warn(
+                        "HIGH USAGE WARNING : Sole namespace bundle {} is 
overloading broker {}. " +
+                                "No Load Shadding will be done on this broker",
+                        localData.getBundles().iterator().next(), broker);
+            } else {
+                log.warn("Broker {} is overloaded despit having no bundles", 
broker);
+            }
+        });
+
+        return selectedBundlesCache;
+    }
+
+    private double getBrokerAvgUsage(final LoadData loadData, final double 
historyPercentage,
+                                     final ServiceConfiguration conf) {
+        double totalUsage = 0.0;
+        int totalBrokers = 0;
+
+        for (Map.Entry<String, BrokerData> entry : 
loadData.getBrokerData().entrySet()) {
+            LocalBrokerData localBrokerData = entry.getValue().getLocalData();
+            String broker = entry.getKey();
+            updateAvgResourceUsage(broker, localBrokerData, historyPercentage, 
conf);
+            totalUsage += brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+            totalBrokers++;
+        }
+
+        return totalBrokers > 0 ? totalUsage / totalBrokers : 0;
+    }
+
+    private void updateAvgResourceUsage(String broker, LocalBrokerData 
localBrokerData, final double historyPercentage,
+                                        final ServiceConfiguration conf) {
+        double historyUsage = brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+        historyUsage = historyUsage * historyPercentage +
+                (1 - historyPercentage) * 
localBrokerData.getMaxResourceUsageWithWeight(conf.getLoadBalancerCPUResourceWeight(),
+                        conf.getLoadBalancerMemoryResourceWeight(), 
conf.getLoadBalancerDirectMemoryResourceWeight(),
+                        conf.getLoadBalancerBandwithInResourceWeight(), 
conf.getLoadBalancerBandwithOutResourceWeight());
+        brokerAvgResourceUsage.put(broker, historyUsage);
+    }
+
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
index a2d9814..6b4ea64 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
@@ -212,6 +212,26 @@ public class LocalBrokerData extends JSONWritable 
implements LoadManagerReport {
                 bandwidthOut.percentUsage());
     }
 
+    public double getMaxResourceUsageWithWeight(final double cpuWeight, final 
double memoryWeight,
+                                                final double 
directMemoryWeight, final double bandwithInWeight,
+                                                final double 
bandWithOutWeight) {
+        return max(cpu.percentUsage() * cpuWeight, memory.percentUsage() * 
memoryWeight,
+                directMemory.percentUsage() * directMemoryWeight, 
bandwidthIn.percentUsage() * bandwithInWeight,
+                bandwidthOut.percentUsage() * bandWithOutWeight) / 100;
+    }
+
+    private static double max(double... args) {
+        double max = Double.NEGATIVE_INFINITY;
+
+        for (double d : args) {
+            if (d > max) {
+                max = d;
+            }
+        }
+
+        return max;
+    }
+
     private static float max(float...args) {
         float max = Float.NEGATIVE_INFINITY;
 
diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md
index f629344..70c3120 100644
--- a/site2/docs/reference-metrics.md
+++ b/site2/docs/reference-metrics.md
@@ -100,6 +100,9 @@ Broker has the following kinds of metrics:
     * [Replication metrics](#replication-metrics)
 * [Topic metrics](#topic-metrics)
     * [Replication metrics](#replication-metrics-1)
+* [LoadBalancing metrics](#loadbalancing-metrics)
+    * [BundleUnloading metrics](#bundleunloading-metrics)
+    * [BundleSplit metrics](#bundlesplit-metrics)
 * [Subscription metrics](#subscription-metrics)
 * [Consumer metrics](#consumer-metrics)
 
@@ -190,6 +193,38 @@ All the replication metrics will also be labelled with 
`remoteCluster=${pulsar_r
 | pulsar_replication_throughput_out | Gauge | The total throughput of the 
topic replicating to remote cluster (bytes/second). |
 | pulsar_replication_backlog | Gauge | The total backlog of the topic 
replicating to remote cluster (messages). |
 
+### LoadBalancing metrics
+All the loadbalancing metrics are labelled with the following labels:
+- cluster: cluster=${pulsar_cluster}. ${pulsar_cluster} is the cluster name 
that you configured in broker.conf.
+- broker: broker=${broker}. ${broker} is the ip address of the broker
+- metric: metric="loadBalancing". 
+
+| Name | Type | Description |
+| --- | --- | --- |
+| pulsar_lb_bandwidth_in_usage | Gauge | The broker bandwith in usage |
+| pulsar_lb_bandwidth_out_usage | Gauge | The broker bandwith out usage |
+| pulsar_lb_cpu_usage | Gauge | The broker cpu usage |
+| pulsar_lb_directMemory_usage | Gauge | The broker process direct memory 
usage |
+| pulsar_lb_memory_usage | Gauge | The broker process memory usage  |
+
+#### BundleUnloading metrics
+All the bundleUnloading metrics are labelled with the following labels:
+- cluster: cluster=${pulsar_cluster}. ${pulsar_cluster} is the cluster name 
that you configured in broker.conf.
+- metric: metric="bundleUnloading". 
+
+| Name | Type | Description |
+| --- | --- | --- |
+| pulsar_lb_unload_broker_count | Counter | Unload broker count in this bundle 
unloading |
+| pulsar_lb_unload_bundle_count | Counter | Bundle unload count in this bundle 
unloading |
+
+#### BundleSplit metrics
+All the bundleUnloading metrics are labelled with the following labels:
+- cluster: cluster=${pulsar_cluster}. ${pulsar_cluster} is the cluster name 
that you configured in broker.conf.
+- metric: metric="bundlesSplit". 
+
+| Name | Type | Description |
+| --- | --- | --- |
+| pulsar_lb_bundles_split_count | Counter | bundle split count in this bundle 
splitting check interval |
 
 ### Subscription metrics
 

Reply via email to