This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new c0a823a242a [fix][broker] Revert "[fix][load-balancer] skip
mis-configured resource usage(>100%) in load balancer (#18645)
c0a823a242a is described below
commit c0a823a242a834dac35b9a6fcd6a2064a0e4bfb5
Author: Jiwei Guo <[email protected]>
AuthorDate: Mon Nov 28 14:28:54 2022 +0800
[fix][broker] Revert "[fix][load-balancer] skip mis-configured resource
usage(>100%) in load balancer (#18645)
---
.../impl/LeastResourceUsageWithWeight.java | 2 +-
.../loadbalance/impl/ModularLoadManagerImpl.java | 10 ++-
.../broker/loadbalance/impl/ThresholdShedder.java | 80 +++-------------------
.../loadbalance/impl/ThresholdShedderTest.java | 6 --
.../data/loadbalancer/LocalBrokerData.java | 20 ------
.../data/loadbalancer/LocalBrokerDataTest.java | 5 --
6 files changed, 16 insertions(+), 107 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java
index 4763eaf23da..cb24eed233b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java
@@ -91,7 +91,7 @@ public class LeastResourceUsageWithWeight implements
ModularLoadManagerStrategy
ServiceConfiguration
conf) {
final double historyPercentage =
conf.getLoadBalancerHistoryResourcePercentage();
Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
- double resourceUsage =
brokerData.getLocalData().getMaxResourceUsageWithWeightWithinLimit(
+ double resourceUsage =
brokerData.getLocalData().getMaxResourceUsageWithWeight(
conf.getLoadBalancerCPUResourceWeight(),
conf.getLoadBalancerMemoryResourceWeight(),
conf.getLoadBalancerDirectMemoryResourceWeight(),
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 0c86602695a..9444737fb79 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -448,9 +448,8 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
long timeSinceLastReportWrittenToStore = System.currentTimeMillis() -
localData.getLastUpdate();
if (timeSinceLastReportWrittenToStore > updateMaxIntervalMillis) {
log.info("Writing local data to metadata store because time since
last"
- + " update exceeded threshold of {} minutes.
ResourceUsage:[{}]",
- conf.getLoadBalancerReportUpdateMaxIntervalMinutes(),
- localData.printResourceUsage());
+ + " update exceeded threshold of {} minutes",
+ conf.getLoadBalancerReportUpdateMaxIntervalMinutes());
// Always update after surpassing the maximum interval.
return true;
}
@@ -464,10 +463,9 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
percentChange(lastData.getNumBundles(), localData.getNumBundles()))));
if (maxChange > conf.getLoadBalancerReportUpdateThresholdPercentage())
{
log.info("Writing local data to metadata store because maximum
change {}% exceeded threshold {}%; "
- + "time since last report written is {} seconds.
ResourceUsage:[{}]", maxChange,
+ + "time since last report written is {} seconds",
maxChange,
conf.getLoadBalancerReportUpdateThresholdPercentage(),
- timeSinceLastReportWrittenToStore / 1000.0,
- localData.printResourceUsage());
+ timeSinceLastReportWrittenToStore / 1000.0);
return true;
}
return false;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
index 928f045369c..8c0063a5d2b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
@@ -56,37 +56,16 @@ public class ThresholdShedder implements
LoadSheddingStrategy {
private static final double LOWER_BOUNDARY_THRESHOLD_MARGIN = 0.5;
private static final double MB = 1024 * 1024;
-
- private static final long LOAD_LOG_SAMPLE_DELAY_IN_SEC = 5 * 60; // 5 mins
private final Map<String, Double> brokerAvgResourceUsage = new HashMap<>();
- private long lastSampledLoadLogTS = 0;
-
-
- private static int toPercentage(double usage) {
- return (int) (usage * 100);
- }
-
- private boolean canSampleLog() {
- long now = System.currentTimeMillis() / 1000;
- boolean sampleLog = now - lastSampledLoadLogTS >=
LOAD_LOG_SAMPLE_DELAY_IN_SEC;
- if (sampleLog) {
- lastSampledLoadLogTS = now;
- }
- return sampleLog;
- }
@Override
public Multimap<String, String> findBundlesForUnloading(final LoadData
loadData, final ServiceConfiguration conf) {
selectedBundlesCache.clear();
- boolean sampleLog = canSampleLog();
final double threshold =
conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
final Map<String, Long> recentlyUnloadedBundles =
loadData.getRecentlyUnloadedBundles();
final double minThroughputThreshold =
conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
- final double avgUsage = getBrokerAvgUsage(loadData, conf, sampleLog);
- if (sampleLog) {
- log.info("brokers' resource avgUsage:{}%", toPercentage(avgUsage));
- }
+ final double avgUsage = getBrokerAvgUsage(loadData,
conf.getLoadBalancerHistoryResourcePercentage(), conf);
if (avgUsage == 0) {
log.warn("average max resource usage is 0");
@@ -98,9 +77,8 @@ public class ThresholdShedder implements LoadSheddingStrategy
{
final double currentUsage =
brokerAvgResourceUsage.getOrDefault(broker, 0.0);
if (currentUsage < avgUsage + threshold) {
- if (sampleLog) {
- log.info("[{}] broker is not overloaded, ignoring at this
point, currentUsage:{}%",
- broker, toPercentage(currentUsage));
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] broker is not overloaded, ignoring at this
point", broker);
}
return;
}
@@ -111,13 +89,14 @@ public class ThresholdShedder implements
LoadSheddingStrategy {
double minimumThroughputToOffload = brokerCurrentThroughput *
percentOfTrafficToOffload;
if (minimumThroughputToOffload < minThroughputThreshold) {
- if (sampleLog) {
- log.info("[{}] broker is planning to shed throughput {}
MByte/s less than "
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] broker is planning to shed throughput {}
MByte/s less than "
+ "minimumThroughputThreshold {} MByte/s,
skipping bundle unload.",
broker, minimumThroughputToOffload / MB,
minThroughputThreshold / MB);
}
return;
}
+
log.info(
"Attempting to shed load on {}, which has max resource
usage above avgUsage and threshold {}%"
+ " > {}% + {}% -- Offloading at least {} MByte/s
of traffic, left throughput {} MByte/s",
@@ -168,16 +147,15 @@ public class ThresholdShedder implements
LoadSheddingStrategy {
});
}
- private double getBrokerAvgUsage(final LoadData loadData,
- final ServiceConfiguration conf, boolean
sampleLog) {
- double historyPercentage =
conf.getLoadBalancerHistoryResourcePercentage();
+ private double getBrokerAvgUsage(final LoadData loadData, final double
historyPercentage,
+ final ServiceConfiguration conf) {
double totalUsage = 0.0;
int totalBrokers = 0;
for (Map.Entry<String, BrokerData> entry :
loadData.getBrokerData().entrySet()) {
LocalBrokerData localBrokerData = entry.getValue().getLocalData();
String broker = entry.getKey();
- totalUsage += updateAvgResourceUsage(broker, localBrokerData,
historyPercentage, conf, sampleLog);
+ totalUsage += updateAvgResourceUsage(broker, localBrokerData,
historyPercentage, conf);
totalBrokers++;
}
@@ -185,8 +163,7 @@ public class ThresholdShedder implements
LoadSheddingStrategy {
}
private double updateAvgResourceUsage(String broker, LocalBrokerData
localBrokerData,
- final double historyPercentage,
final ServiceConfiguration conf,
- boolean sampleLog) {
+ final double historyPercentage,
final ServiceConfiguration conf) {
Double historyUsage =
brokerAvgResourceUsage.get(broker);
double resourceUsage = localBrokerData.getMaxResourceUsageWithWeight(
@@ -194,41 +171,6 @@ public class ThresholdShedder implements
LoadSheddingStrategy {
conf.getLoadBalancerMemoryResourceWeight(),
conf.getLoadBalancerDirectMemoryResourceWeight(),
conf.getLoadBalancerBandwithInResourceWeight(),
conf.getLoadBalancerBandwithOutResourceWeight());
-
- if (sampleLog) {
- log.info("{} broker load: historyUsage={}%, resourceUsage={}%",
- broker,
- historyUsage == null ? 0 : toPercentage(historyUsage),
- toPercentage(resourceUsage));
- }
-
- // wrap if resourceUsage is bigger than 1.0
- if (resourceUsage > 1.0) {
- log.error("{} broker resourceUsage is bigger than 100%. "
- + "Some of the resource limits are mis-configured.
"
- + "Try to disable the error resource signals by
setting their weights to zero "
- + "or fix the resource limit configurations. "
- +
"Ref:https://pulsar.apache.org/docs/administration-load-balance/#thresholdshedder
"
- + "ResourceUsage:[{}], "
- + "CPUResourceWeight:{}, MemoryResourceWeight:{},
DirectMemoryResourceWeight:{}, "
- + "BandwithInResourceWeight:{},
BandwithOutResourceWeight:{}",
- broker,
- localBrokerData.printResourceUsage(),
- conf.getLoadBalancerCPUResourceWeight(),
- conf.getLoadBalancerMemoryResourceWeight(),
- conf.getLoadBalancerDirectMemoryResourceWeight(),
- conf.getLoadBalancerBandwithInResourceWeight(),
- conf.getLoadBalancerBandwithOutResourceWeight());
-
- resourceUsage =
localBrokerData.getMaxResourceUsageWithWeightWithinLimit(
- conf.getLoadBalancerCPUResourceWeight(),
- conf.getLoadBalancerMemoryResourceWeight(),
conf.getLoadBalancerDirectMemoryResourceWeight(),
- conf.getLoadBalancerBandwithInResourceWeight(),
- conf.getLoadBalancerBandwithOutResourceWeight());
-
- log.warn("{} broker recomputed max resourceUsage={}%. Skipped
usage signals bigger than 100%",
- broker, toPercentage(resourceUsage));
- }
historyUsage = historyUsage == null
? resourceUsage : historyUsage * historyPercentage + (1 -
historyPercentage) * resourceUsage;
@@ -239,7 +181,7 @@ public class ThresholdShedder implements
LoadSheddingStrategy {
private void tryLowerBoundaryShedding(LoadData loadData,
ServiceConfiguration conf) {
// Select the broker with the most resource usage.
final double threshold =
conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
- final double avgUsage = getBrokerAvgUsage(loadData, conf,
canSampleLog());
+ final double avgUsage = getBrokerAvgUsage(loadData,
conf.getLoadBalancerHistoryResourcePercentage(), conf);
Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold,
avgUsage);
boolean hasBrokerBelowLowerBound = result.getLeft();
String maxUsageBroker = result.getRight();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java
index 8461f8ce74c..28c6b90759c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java
@@ -74,9 +74,6 @@ public class ThresholdShedderTest {
LoadData loadData = new LoadData();
LocalBrokerData broker1 = new LocalBrokerData();
- broker1.setCpu(new ResourceUsage(1000, 100));
- broker1.setMemory(new ResourceUsage(5000, 100));
- broker1.setDirectMemory(new ResourceUsage(5000, 100));
broker1.setBandwidthIn(new ResourceUsage(500, 1000));
broker1.setBandwidthOut(new ResourceUsage(500, 1000));
broker1.setBundles(Sets.newHashSet("bundle-1"));
@@ -119,9 +116,6 @@ public class ThresholdShedderTest {
LoadData loadData = new LoadData();
LocalBrokerData broker1 = new LocalBrokerData();
- broker1.setCpu(new ResourceUsage(1000, 100));
- broker1.setMemory(new ResourceUsage(5000, 100));
- broker1.setDirectMemory(new ResourceUsage(5000, 100));
broker1.setBandwidthIn(new ResourceUsage(999, 1000));
broker1.setBandwidthOut(new ResourceUsage(999, 1000));
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
index 3c97439f814..9af8f854642 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
@@ -256,16 +256,6 @@ public class LocalBrokerData implements LoadManagerReport {
bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
}
- public double getMaxResourceUsageWithWeightWithinLimit(final double
cpuWeight, final double memoryWeight,
- final double
directMemoryWeight,
- final double
bandwidthInWeight,
- final double
bandwidthOutWeight) {
- return maxWithinLimit(100.0d,
- cpu.percentUsage() * cpuWeight, memory.percentUsage() *
memoryWeight,
- directMemory.percentUsage() * directMemoryWeight,
bandwidthIn.percentUsage() * bandwidthInWeight,
- bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
- }
-
private static double max(double... args) {
double max = Double.NEGATIVE_INFINITY;
@@ -290,16 +280,6 @@ public class LocalBrokerData implements LoadManagerReport {
return max;
}
- private static double maxWithinLimit(double limit, double...args) {
- double max = 0.0;
- for (double d : args) {
- if (d > max && d <= limit) {
- max = d;
- }
- }
- return max;
- }
-
public String getLoadReportType() {
return loadReportType;
}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
index 69d4a7f4cd1..8115a8064d1 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
@@ -53,10 +53,5 @@ public class LocalBrokerDataTest {
assertEquals(
data.getMaxResourceUsageWithWeight(
weight, weight, weight, weight, weight), 2.0, epsilon);
-
- assertEquals(
- data.getMaxResourceUsageWithWeightWithinLimit(
- weight, weight, weight, weight, weight), 0.02,
epsilon);
-
}
}