This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new c0a823a242a [fix][broker] Revert "[fix][load-balancer] skip 
mis-configured resource usage(>100%) in load balancer (#18645)
c0a823a242a is described below

commit c0a823a242a834dac35b9a6fcd6a2064a0e4bfb5
Author: Jiwei Guo <[email protected]>
AuthorDate: Mon Nov 28 14:28:54 2022 +0800

    [fix][broker] Revert "[fix][load-balancer] skip mis-configured resource 
usage(>100%) in load balancer (#18645)
---
 .../impl/LeastResourceUsageWithWeight.java         |  2 +-
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 10 ++-
 .../broker/loadbalance/impl/ThresholdShedder.java  | 80 +++-------------------
 .../loadbalance/impl/ThresholdShedderTest.java     |  6 --
 .../data/loadbalancer/LocalBrokerData.java         | 20 ------
 .../data/loadbalancer/LocalBrokerDataTest.java     |  5 --
 6 files changed, 16 insertions(+), 107 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java
index 4763eaf23da..cb24eed233b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java
@@ -91,7 +91,7 @@ public class LeastResourceUsageWithWeight implements 
ModularLoadManagerStrategy
                                                           ServiceConfiguration 
conf) {
         final double historyPercentage = 
conf.getLoadBalancerHistoryResourcePercentage();
         Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
-        double resourceUsage = 
brokerData.getLocalData().getMaxResourceUsageWithWeightWithinLimit(
+        double resourceUsage = 
brokerData.getLocalData().getMaxResourceUsageWithWeight(
                 conf.getLoadBalancerCPUResourceWeight(),
                 conf.getLoadBalancerMemoryResourceWeight(),
                 conf.getLoadBalancerDirectMemoryResourceWeight(),
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 0c86602695a..9444737fb79 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -448,9 +448,8 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
         long timeSinceLastReportWrittenToStore = System.currentTimeMillis() - 
localData.getLastUpdate();
         if (timeSinceLastReportWrittenToStore > updateMaxIntervalMillis) {
             log.info("Writing local data to metadata store because time since 
last"
-                            + " update exceeded threshold of {} minutes. 
ResourceUsage:[{}]",
-                    conf.getLoadBalancerReportUpdateMaxIntervalMinutes(),
-                    localData.printResourceUsage());
+                            + " update exceeded threshold of {} minutes",
+                    conf.getLoadBalancerReportUpdateMaxIntervalMinutes());
             // Always update after surpassing the maximum interval.
             return true;
         }
@@ -464,10 +463,9 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
                                         
percentChange(lastData.getNumBundles(), localData.getNumBundles()))));
         if (maxChange > conf.getLoadBalancerReportUpdateThresholdPercentage()) 
{
             log.info("Writing local data to metadata store because maximum 
change {}% exceeded threshold {}%; "
-                            + "time since last report written is {} seconds. 
ResourceUsage:[{}]", maxChange,
+                            + "time since last report written is {} seconds", 
maxChange,
                     conf.getLoadBalancerReportUpdateThresholdPercentage(),
-                    timeSinceLastReportWrittenToStore / 1000.0,
-                    localData.printResourceUsage());
+                    timeSinceLastReportWrittenToStore / 1000.0);
             return true;
         }
         return false;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
index 928f045369c..8c0063a5d2b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
@@ -56,37 +56,16 @@ public class ThresholdShedder implements 
LoadSheddingStrategy {
     private static final double LOWER_BOUNDARY_THRESHOLD_MARGIN = 0.5;
 
     private static final double MB = 1024 * 1024;
-
-    private static final long LOAD_LOG_SAMPLE_DELAY_IN_SEC = 5 * 60; // 5 mins
     private final Map<String, Double> brokerAvgResourceUsage = new HashMap<>();
-    private long lastSampledLoadLogTS = 0;
-
-
-    private static int toPercentage(double usage) {
-        return (int) (usage * 100);
-    }
-
-    private boolean canSampleLog() {
-        long now = System.currentTimeMillis() / 1000;
-        boolean sampleLog = now - lastSampledLoadLogTS >= 
LOAD_LOG_SAMPLE_DELAY_IN_SEC;
-        if (sampleLog) {
-            lastSampledLoadLogTS = now;
-        }
-        return sampleLog;
-    }
 
     @Override
     public Multimap<String, String> findBundlesForUnloading(final LoadData 
loadData, final ServiceConfiguration conf) {
         selectedBundlesCache.clear();
-        boolean sampleLog = canSampleLog();
         final double threshold = 
conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
         final Map<String, Long> recentlyUnloadedBundles = 
loadData.getRecentlyUnloadedBundles();
         final double minThroughputThreshold = 
conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
 
-        final double avgUsage = getBrokerAvgUsage(loadData, conf, sampleLog);
-        if (sampleLog) {
-            log.info("brokers' resource avgUsage:{}%", toPercentage(avgUsage));
-        }
+        final double avgUsage = getBrokerAvgUsage(loadData, 
conf.getLoadBalancerHistoryResourcePercentage(), conf);
 
         if (avgUsage == 0) {
             log.warn("average max resource usage is 0");
@@ -98,9 +77,8 @@ public class ThresholdShedder implements LoadSheddingStrategy 
{
             final double currentUsage = 
brokerAvgResourceUsage.getOrDefault(broker, 0.0);
 
             if (currentUsage < avgUsage + threshold) {
-                if (sampleLog) {
-                    log.info("[{}] broker is not overloaded, ignoring at this 
point, currentUsage:{}%",
-                            broker, toPercentage(currentUsage));
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] broker is not overloaded, ignoring at this 
point", broker);
                 }
                 return;
             }
@@ -111,13 +89,14 @@ public class ThresholdShedder implements 
LoadSheddingStrategy {
             double minimumThroughputToOffload = brokerCurrentThroughput * 
percentOfTrafficToOffload;
 
             if (minimumThroughputToOffload < minThroughputThreshold) {
-                if (sampleLog) {
-                    log.info("[{}] broker is planning to shed throughput {} 
MByte/s less than "
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] broker is planning to shed throughput {} 
MByte/s less than "
                                     + "minimumThroughputThreshold {} MByte/s, 
skipping bundle unload.",
                             broker, minimumThroughputToOffload / MB, 
minThroughputThreshold / MB);
                 }
                 return;
             }
+
             log.info(
                     "Attempting to shed load on {}, which has max resource 
usage above avgUsage  and threshold {}%"
                             + " > {}% + {}% -- Offloading at least {} MByte/s 
of traffic, left throughput {} MByte/s",
@@ -168,16 +147,15 @@ public class ThresholdShedder implements 
LoadSheddingStrategy {
                 });
     }
 
-    private double getBrokerAvgUsage(final LoadData loadData,
-                                     final ServiceConfiguration conf, boolean 
sampleLog) {
-        double historyPercentage = 
conf.getLoadBalancerHistoryResourcePercentage();
+    private double getBrokerAvgUsage(final LoadData loadData, final double 
historyPercentage,
+                                     final ServiceConfiguration conf) {
         double totalUsage = 0.0;
         int totalBrokers = 0;
 
         for (Map.Entry<String, BrokerData> entry : 
loadData.getBrokerData().entrySet()) {
             LocalBrokerData localBrokerData = entry.getValue().getLocalData();
             String broker = entry.getKey();
-            totalUsage += updateAvgResourceUsage(broker, localBrokerData, 
historyPercentage, conf, sampleLog);
+            totalUsage += updateAvgResourceUsage(broker, localBrokerData, 
historyPercentage, conf);
             totalBrokers++;
         }
 
@@ -185,8 +163,7 @@ public class ThresholdShedder implements 
LoadSheddingStrategy {
     }
 
     private double updateAvgResourceUsage(String broker, LocalBrokerData 
localBrokerData,
-                                          final double historyPercentage, 
final ServiceConfiguration conf,
-                                          boolean sampleLog) {
+                                          final double historyPercentage, 
final ServiceConfiguration conf) {
         Double historyUsage =
                 brokerAvgResourceUsage.get(broker);
         double resourceUsage = localBrokerData.getMaxResourceUsageWithWeight(
@@ -194,41 +171,6 @@ public class ThresholdShedder implements 
LoadSheddingStrategy {
                 conf.getLoadBalancerMemoryResourceWeight(), 
conf.getLoadBalancerDirectMemoryResourceWeight(),
                 conf.getLoadBalancerBandwithInResourceWeight(),
                 conf.getLoadBalancerBandwithOutResourceWeight());
-
-        if (sampleLog) {
-            log.info("{} broker load: historyUsage={}%, resourceUsage={}%",
-                    broker,
-                    historyUsage == null ? 0 : toPercentage(historyUsage),
-                    toPercentage(resourceUsage));
-        }
-
-        // wrap if resourceUsage is bigger than 1.0
-        if (resourceUsage > 1.0) {
-            log.error("{} broker resourceUsage is bigger than 100%. "
-                            + "Some of the resource limits are mis-configured. 
"
-                            + "Try to disable the error resource signals by 
setting their weights to zero "
-                            + "or fix the resource limit configurations. "
-                            + 
"Ref:https://pulsar.apache.org/docs/administration-load-balance/#thresholdshedder
 "
-                            + "ResourceUsage:[{}], "
-                            + "CPUResourceWeight:{}, MemoryResourceWeight:{}, 
DirectMemoryResourceWeight:{}, "
-                            + "BandwithInResourceWeight:{}, 
BandwithOutResourceWeight:{}",
-                    broker,
-                    localBrokerData.printResourceUsage(),
-                    conf.getLoadBalancerCPUResourceWeight(),
-                    conf.getLoadBalancerMemoryResourceWeight(),
-                    conf.getLoadBalancerDirectMemoryResourceWeight(),
-                    conf.getLoadBalancerBandwithInResourceWeight(),
-                    conf.getLoadBalancerBandwithOutResourceWeight());
-
-            resourceUsage = 
localBrokerData.getMaxResourceUsageWithWeightWithinLimit(
-                    conf.getLoadBalancerCPUResourceWeight(),
-                    conf.getLoadBalancerMemoryResourceWeight(), 
conf.getLoadBalancerDirectMemoryResourceWeight(),
-                    conf.getLoadBalancerBandwithInResourceWeight(),
-                    conf.getLoadBalancerBandwithOutResourceWeight());
-
-            log.warn("{} broker recomputed max resourceUsage={}%. Skipped 
usage signals bigger than 100%",
-                    broker, toPercentage(resourceUsage));
-        }
         historyUsage = historyUsage == null
                 ? resourceUsage : historyUsage * historyPercentage + (1 - 
historyPercentage) * resourceUsage;
 
@@ -239,7 +181,7 @@ public class ThresholdShedder implements 
LoadSheddingStrategy {
     private void tryLowerBoundaryShedding(LoadData loadData, 
ServiceConfiguration conf) {
         // Select the broker with the most resource usage.
         final double threshold = 
conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
-        final double avgUsage = getBrokerAvgUsage(loadData, conf, 
canSampleLog());
+        final double avgUsage = getBrokerAvgUsage(loadData, 
conf.getLoadBalancerHistoryResourcePercentage(), conf);
         Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, 
avgUsage);
         boolean hasBrokerBelowLowerBound = result.getLeft();
         String maxUsageBroker = result.getRight();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java
index 8461f8ce74c..28c6b90759c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java
@@ -74,9 +74,6 @@ public class ThresholdShedderTest {
         LoadData loadData = new LoadData();
 
         LocalBrokerData broker1 = new LocalBrokerData();
-        broker1.setCpu(new ResourceUsage(1000, 100));
-        broker1.setMemory(new ResourceUsage(5000, 100));
-        broker1.setDirectMemory(new ResourceUsage(5000, 100));
         broker1.setBandwidthIn(new ResourceUsage(500, 1000));
         broker1.setBandwidthOut(new ResourceUsage(500, 1000));
         broker1.setBundles(Sets.newHashSet("bundle-1"));
@@ -119,9 +116,6 @@ public class ThresholdShedderTest {
         LoadData loadData = new LoadData();
         
         LocalBrokerData broker1 = new LocalBrokerData();
-        broker1.setCpu(new ResourceUsage(1000, 100));
-        broker1.setMemory(new ResourceUsage(5000, 100));
-        broker1.setDirectMemory(new ResourceUsage(5000, 100));
         broker1.setBandwidthIn(new ResourceUsage(999, 1000));
         broker1.setBandwidthOut(new ResourceUsage(999, 1000));
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
index 3c97439f814..9af8f854642 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
@@ -256,16 +256,6 @@ public class LocalBrokerData implements LoadManagerReport {
                 bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
     }
 
-    public double getMaxResourceUsageWithWeightWithinLimit(final double 
cpuWeight, final double memoryWeight,
-                                                           final double 
directMemoryWeight,
-                                                           final double 
bandwidthInWeight,
-                                                           final double 
bandwidthOutWeight) {
-        return maxWithinLimit(100.0d,
-                cpu.percentUsage() * cpuWeight, memory.percentUsage() * 
memoryWeight,
-                directMemory.percentUsage() * directMemoryWeight, 
bandwidthIn.percentUsage() * bandwidthInWeight,
-                bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
-    }
-
     private static double max(double... args) {
         double max = Double.NEGATIVE_INFINITY;
 
@@ -290,16 +280,6 @@ public class LocalBrokerData implements LoadManagerReport {
         return max;
     }
 
-    private static double maxWithinLimit(double limit, double...args) {
-        double max = 0.0;
-        for (double d : args) {
-            if (d > max && d <= limit) {
-                max = d;
-            }
-        }
-        return max;
-    }
-
     public String getLoadReportType() {
         return loadReportType;
     }
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
index 69d4a7f4cd1..8115a8064d1 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
@@ -53,10 +53,5 @@ public class LocalBrokerDataTest {
         assertEquals(
                 data.getMaxResourceUsageWithWeight(
                         weight, weight, weight, weight, weight), 2.0, epsilon);
-
-        assertEquals(
-                data.getMaxResourceUsageWithWeightWithinLimit(
-                        weight, weight, weight, weight, weight), 0.02, 
epsilon);
-
     }
 }

Reply via email to