This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9e7a5c77c55 [improve][broker] PIP-192 Moved the common broker load
data feature(weightedMaxEMA) to BrokerLoadData (#19154)
9e7a5c77c55 is described below
commit 9e7a5c77c55cce5adc13b305896e99c2d900056c
Author: Heesung Sohn <[email protected]>
AuthorDate: Tue Jan 17 20:42:47 2023 -0800
[improve][broker] PIP-192 Moved the common broker load data
feature(weightedMaxEMA) to BrokerLoadData (#19154)
---
.../extensions/data/BrokerLoadData.java | 131 +++++++++++++----
.../strategy/LeastResourceUsageWithWeight.java | 79 +++-------
.../extensions/data/BrokerLoadDataTest.java | 122 +++++++++++++--
.../strategy/LeastResourceUsageWithWeightTest.java | 163 ++++++++++-----------
4 files changed, 312 insertions(+), 183 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java
index fbb5093939e..39419946992 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.data;
-import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
@@ -29,24 +31,42 @@ import
org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
* Migrate from {@link
org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
* And removed the lookup data, see {@link BrokerLookupData}
*/
-@Data
+@Getter
+@EqualsAndHashCode
public class BrokerLoadData {
+ private static final double DEFAULT_RESOURCE_USAGE = 1.0d;
+
// Most recently available system resource usage.
private ResourceUsage cpu;
private ResourceUsage memory;
private ResourceUsage directMemory;
-
private ResourceUsage bandwidthIn;
private ResourceUsage bandwidthOut;
// Message data from the most recent namespace bundle stats.
- private double msgThroughputIn;
- private ResourceUsage msgThroughputInUsage;
- private double msgThroughputOut;
- private ResourceUsage msgThroughputOutUsage;
- private double msgRateIn;
- private double msgRateOut;
+ private double msgThroughputIn; // bytes/sec
+ private double msgThroughputOut; // bytes/sec
+ private double msgRateIn; // messages/sec
+ private double msgRateOut; // messages/sec
+
+ // Load data features computed from the above resources.
+ private double maxResourceUsage; // max of resource usages
+ /**
+ * Exponential moving average(EMA) of max of weighted resource usages among
+ * cpu, memory, directMemory, bandwidthIn and bandwidthOut.
+ *
+ * The resource weights are configured by :
+ * loadBalancerCPUResourceWeight,
+ * loadBalancerMemoryResourceWeight,
+ * loadBalancerDirectMemoryResourceWeight,
+ * loadBalancerBandwithInResourceWeight, and
+ * loadBalancerBandwithOutResourceWeight.
+ *
+ * The historical resource percentage is configured by
loadBalancerHistoryResourcePercentage.
+ */
+ private double weightedMaxEMA;
+ private long updatedAt;
public BrokerLoadData() {
cpu = new ResourceUsage();
@@ -54,34 +74,56 @@ public class BrokerLoadData {
directMemory = new ResourceUsage();
bandwidthIn = new ResourceUsage();
bandwidthOut = new ResourceUsage();
- msgThroughputInUsage = new ResourceUsage();
- msgThroughputOutUsage = new ResourceUsage();
+ maxResourceUsage = DEFAULT_RESOURCE_USAGE;
+ weightedMaxEMA = DEFAULT_RESOURCE_USAGE;
}
/**
- * Using the system resource usage and bundle stats acquired from the
Pulsar client, update this LocalBrokerData.
+ * Using the system resource usage from the Pulsar client, update this
BrokerLoadData.
*
- * @param systemResourceUsage
+ * @param usage
* System resource usage (cpu, memory, and direct memory).
+ * @param msgThroughputIn
+ * broker-level message input throughput in bytes/s.
+ * @param msgThroughputOut
+ * broker-level message output throughput in bytes/s.
+ * @param msgRateIn
+ * broker-level message input rate in messages/s.
+ * @param msgRateOut
+ * broker-level message output rate in messages/s.
+ * @param conf
+ * Service configuration to compute load data features.
*/
- public void update(final SystemResourceUsage systemResourceUsage) {
- updateSystemResourceUsage(systemResourceUsage);
+ public void update(final SystemResourceUsage usage,
+ double msgThroughputIn,
+ double msgThroughputOut,
+ double msgRateIn,
+ double msgRateOut,
+ ServiceConfiguration conf) {
+ updateSystemResourceUsage(usage.cpu, usage.memory, usage.directMemory,
usage.bandwidthIn, usage.bandwidthOut);
+ this.msgThroughputIn = msgThroughputIn;
+ this.msgThroughputOut = msgThroughputOut;
+ this.msgRateIn = msgRateIn;
+ this.msgRateOut = msgRateOut;
+ updateFeatures(conf);
+ updatedAt = System.currentTimeMillis();
}
/**
- * Using another LocalBrokerData, update this.
+ * Using another BrokerLoadData, update this.
*
* @param other
- * LocalBrokerData to update from.
+ * BrokerLoadData to update from.
*/
public void update(final BrokerLoadData other) {
updateSystemResourceUsage(other.cpu, other.memory, other.directMemory,
other.bandwidthIn, other.bandwidthOut);
- }
-
- // Set the cpu, memory, and direct memory to that of the new system
resource usage data.
- private void updateSystemResourceUsage(final SystemResourceUsage
systemResourceUsage) {
- updateSystemResourceUsage(systemResourceUsage.cpu,
systemResourceUsage.memory, systemResourceUsage.directMemory,
- systemResourceUsage.bandwidthIn,
systemResourceUsage.bandwidthOut);
+ msgThroughputIn = other.msgThroughputIn;
+ msgThroughputOut = other.msgThroughputOut;
+ msgRateIn = other.msgRateIn;
+ msgRateOut = other.msgRateOut;
+ weightedMaxEMA = other.weightedMaxEMA;
+ maxResourceUsage = other.maxResourceUsage;
+ updatedAt = other.updatedAt;
}
// Update resource usage given each individual usage.
@@ -95,12 +137,18 @@ public class BrokerLoadData {
this.bandwidthOut = bandwidthOut;
}
- public double getMaxResourceUsage() {
- return LocalBrokerData.max(cpu.percentUsage(),
directMemory.percentUsage(), bandwidthIn.percentUsage(),
+ private void updateFeatures(ServiceConfiguration conf) {
+ updateMaxResourceUsage();
+ updateWeightedMaxEMA(conf);
+ }
+
+ private void updateMaxResourceUsage() {
+ maxResourceUsage = LocalBrokerData.max(cpu.percentUsage(),
directMemory.percentUsage(),
+ bandwidthIn.percentUsage(),
bandwidthOut.percentUsage()) / 100;
}
- public double getMaxResourceUsageWithWeight(final double cpuWeight, final
double memoryWeight,
+ private double getMaxResourceUsageWithWeight(final double cpuWeight, final
double memoryWeight,
final double
directMemoryWeight, final double bandwidthInWeight,
final double
bandwidthOutWeight) {
return LocalBrokerData.max(cpu.percentUsage() * cpuWeight,
memory.percentUsage() * memoryWeight,
@@ -108,4 +156,35 @@ public class BrokerLoadData {
bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
}
+ private void updateWeightedMaxEMA(ServiceConfiguration conf) {
+ var historyPercentage =
conf.getLoadBalancerHistoryResourcePercentage();
+ var weightedMax = getMaxResourceUsageWithWeight(
+ conf.getLoadBalancerCPUResourceWeight(),
+ conf.getLoadBalancerMemoryResourceWeight(),
conf.getLoadBalancerDirectMemoryResourceWeight(),
+ conf.getLoadBalancerBandwithInResourceWeight(),
+ conf.getLoadBalancerBandwithOutResourceWeight());
+ weightedMaxEMA = updatedAt == 0 ? weightedMax :
+ weightedMaxEMA * historyPercentage + (1 - historyPercentage) *
weightedMax;
+ }
+
+ public String toString(ServiceConfiguration conf) {
+ return String.format("cpu= %.2f%%, memory= %.2f%%, directMemory=
%.2f%%, "
+ + "bandwithIn= %.2f%%, bandwithOut= %.2f%%, "
+ + "cpuWeight= %f, memoryWeight= %f,
directMemoryWeight= %f, "
+ + "bandwithInResourceWeight= %f,
bandwithOutResourceWeight= %f, "
+ + "msgThroughputIn= %.2f, msgThroughputOut= %.2f,
msgRateIn= %.2f, msgRateOut= %.2f, "
+ + "maxResourceUsage= %.2f%%, weightedMaxEMA= %.2f%%,
updatedAt= %d",
+
+ cpu.percentUsage(), memory.percentUsage(),
directMemory.percentUsage(),
+ bandwidthIn.percentUsage(), bandwidthOut.percentUsage(),
+ conf.getLoadBalancerCPUResourceWeight(),
+ conf.getLoadBalancerMemoryResourceWeight(),
+ conf.getLoadBalancerDirectMemoryResourceWeight(),
+ conf.getLoadBalancerBandwithInResourceWeight(),
+ conf.getLoadBalancerBandwithOutResourceWeight(),
+ msgThroughputIn, msgThroughputOut, msgRateIn, msgRateOut,
+ maxResourceUsage * 100, weightedMaxEMA * 100, updatedAt
+ );
+ }
+
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
index 829dc7ce12a..f48bab54f89 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
@@ -19,10 +19,8 @@
package org.apache.pulsar.broker.loadbalance.extensions.strategy;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
@@ -39,15 +37,12 @@ import org.apache.pulsar.common.naming.ServiceUnitId;
*/
@Slf4j
public class LeastResourceUsageWithWeight implements BrokerSelectionStrategy {
- private static final double MAX_RESOURCE_USAGE = 1.0d;
// Maintain this list to reduce object creation.
private final ArrayList<String> bestBrokers;
private final Set<String> noLoadDataBrokers;
- private final Map<String, Double> brokerAvgResourceUsageWithWeight;
public LeastResourceUsageWithWeight() {
this.bestBrokers = new ArrayList<>();
- this.brokerAvgResourceUsageWithWeight = new HashMap<>();
this.noLoadDataBrokers = new HashSet<>();
}
@@ -55,65 +50,24 @@ public class LeastResourceUsageWithWeight implements
BrokerSelectionStrategy {
private double getMaxResourceUsageWithWeight(final String broker, final
BrokerLoadData brokerLoadData,
final ServiceConfiguration
conf, boolean debugMode) {
final double overloadThreshold =
conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
- final double maxUsageWithWeight =
- updateAndGetMaxResourceUsageWithWeight(broker, brokerLoadData,
conf, debugMode);
+ final var maxUsageWithWeight = brokerLoadData.getWeightedMaxEMA();
+
if (maxUsageWithWeight > overloadThreshold) {
log.warn(
- "Broker {} is overloaded, max resource usage with weight
percentage: {}%, "
- + "CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%,
BANDWIDTH IN: {}%, "
- + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY
weight: {}, DIRECT MEMORY weight: {}, "
- + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight:
{}",
- broker, maxUsageWithWeight * 100,
- brokerLoadData.getCpu().percentUsage(),
brokerLoadData.getMemory().percentUsage(),
- brokerLoadData.getDirectMemory().percentUsage(),
brokerLoadData.getBandwidthIn().percentUsage(),
- brokerLoadData.getBandwidthOut().percentUsage(),
conf.getLoadBalancerCPUResourceWeight(),
- conf.getLoadBalancerMemoryResourceWeight(),
conf.getLoadBalancerDirectMemoryResourceWeight(),
- conf.getLoadBalancerBandwithInResourceWeight(),
- conf.getLoadBalancerBandwithOutResourceWeight());
+ "Broker {} is overloaded, brokerLoad({}%) >
overloadThreshold({}%). load data:{{}}",
+ broker,
+ maxUsageWithWeight * 100,
+ overloadThreshold * 100,
+ brokerLoadData.toString(conf));
+ } else if (debugMode) {
+ log.info("Broker {} load data:{{}}", broker,
brokerLoadData.toString(conf));
}
- if (debugMode) {
- log.info("Broker {} has max resource usage with weight percentage:
{}%",
- broker, maxUsageWithWeight * 100);
- }
+
return maxUsageWithWeight;
}
- /**
- * Update and get the max resource usage with weight of broker according
to the service configuration.
- *
- * @param broker The broker name.
- * @param brokerData The broker load data.
- * @param conf The service configuration.
- * @param debugMode The debug mode to print computed load states and
decision information.
- * @return the max resource usage with weight of broker
- */
- private double updateAndGetMaxResourceUsageWithWeight(String broker,
BrokerLoadData brokerData,
- ServiceConfiguration
conf, boolean debugMode) {
- final double historyPercentage =
conf.getLoadBalancerHistoryResourcePercentage();
- Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
- double resourceUsage = brokerData.getMaxResourceUsageWithWeight(
- conf.getLoadBalancerCPUResourceWeight(),
- conf.getLoadBalancerMemoryResourceWeight(),
- conf.getLoadBalancerDirectMemoryResourceWeight(),
- conf.getLoadBalancerBandwithInResourceWeight(),
- conf.getLoadBalancerBandwithOutResourceWeight());
- historyUsage = historyUsage == null
- ? resourceUsage : historyUsage * historyPercentage + (1 -
historyPercentage) * resourceUsage;
- if (debugMode) {
- log.info(
- "Broker {} get max resource usage with weight: {}, history
resource percentage: {}%, CPU weight: "
- + "{}, MEMORY weight: {}, DIRECT MEMORY weight:
{}, BANDWIDTH IN weight: {}, BANDWIDTH "
- + "OUT weight: {} ",
- broker, historyUsage, historyPercentage,
conf.getLoadBalancerCPUResourceWeight(),
- conf.getLoadBalancerMemoryResourceWeight(),
conf.getLoadBalancerDirectMemoryResourceWeight(),
- conf.getLoadBalancerBandwithInResourceWeight(),
- conf.getLoadBalancerBandwithOutResourceWeight());
- }
- brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
- return historyUsage;
- }
/**
* Find a suitable broker to assign the given bundle to.
@@ -143,7 +97,7 @@ public class LeastResourceUsageWithWeight implements
BrokerSelectionStrategy {
for (String broker : candidates) {
var brokerLoadDataOptional =
context.brokerLoadDataStore().get(broker);
if (brokerLoadDataOptional.isEmpty()) {
- log.warn("There is no broker load data for broker:{}. Skipping
this broker.", broker);
+ log.warn("There is no broker load data for broker:{}. Skipping
this broker. Phase one", broker);
noLoadDataBrokers.add(broker);
continue;
}
@@ -162,12 +116,17 @@ public class LeastResourceUsageWithWeight implements
BrokerSelectionStrategy {
if (debugMode) {
log.info("Computed avgUsage:{}, diffThreshold:{}", avgUsage,
diffThreshold);
}
- candidates.forEach(broker -> {
- Double avgResUsage =
brokerAvgResourceUsageWithWeight.getOrDefault(broker, MAX_RESOURCE_USAGE);
+ for (String broker : candidates) {
+ var brokerLoadDataOptional =
context.brokerLoadDataStore().get(broker);
+ if (brokerLoadDataOptional.isEmpty()) {
+ log.warn("There is no broker load data for broker:{}.
Skipping this broker. Phase two", broker);
+ continue;
+ }
+ double avgResUsage =
brokerLoadDataOptional.get().getWeightedMaxEMA();
if ((avgResUsage + diffThreshold <= avgUsage &&
!noLoadDataBrokers.contains(broker))) {
bestBrokers.add(broker);
}
- });
+ }
}
if (bestBrokers.isEmpty()) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java
index 58805af922b..cedf7bca5d5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java
@@ -18,9 +18,13 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.data;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.testng.Assert.assertEquals;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.testng.annotations.Test;
/**
@@ -31,21 +35,109 @@ import org.testng.annotations.Test;
public class BrokerLoadDataTest {
@Test
- public void testMaxResourceUsage() {
+ public void testUpdateBySystemResourceUsage() {
+
+ ServiceConfiguration conf = new ServiceConfiguration();
+ conf.setLoadBalancerCPUResourceWeight(0.5);
+ conf.setLoadBalancerMemoryResourceWeight(0.5);
+ conf.setLoadBalancerDirectMemoryResourceWeight(0.5);
+ conf.setLoadBalancerBandwithInResourceWeight(0.5);
+ conf.setLoadBalancerBandwithOutResourceWeight(0.5);
+ conf.setLoadBalancerHistoryResourcePercentage(0.75);
+
BrokerLoadData data = new BrokerLoadData();
- data.setCpu(new ResourceUsage(1.0, 100.0));
- data.setMemory(new ResourceUsage(800.0, 200.0));
- data.setDirectMemory(new ResourceUsage(2.0, 100.0));
- data.setBandwidthIn(new ResourceUsage(3.0, 100.0));
- data.setBandwidthOut(new ResourceUsage(4.0, 100.0));
-
- double epsilon = 0.00001;
- double weight = 0.5;
- // skips memory usage
- assertEquals(data.getMaxResourceUsage(), 0.04, epsilon);
-
- assertEquals(
- data.getMaxResourceUsageWithWeight(
- weight, weight, weight, weight, weight), 2.0, epsilon);
+
+ long now = System.currentTimeMillis();
+ SystemResourceUsage usage1 = new SystemResourceUsage();
+ var cpu = new ResourceUsage(1.0, 100.0);
+ var memory = new ResourceUsage(800.0, 200.0);
+ var directMemory= new ResourceUsage(2.0, 100.0);
+ var bandwidthIn= new ResourceUsage(3.0, 100.0);
+ var bandwidthOut= new ResourceUsage(4.0, 100.0);
+ usage1.setCpu(cpu);
+ usage1.setMemory(memory);
+ usage1.setDirectMemory(directMemory);
+ usage1.setBandwidthIn(bandwidthIn);
+ usage1.setBandwidthOut(bandwidthOut);
+ data.update(usage1, 1,2,3,4, conf);
+
+ assertEquals(data.getCpu(), cpu);
+ assertEquals(data.getMemory(), memory);
+ assertEquals(data.getDirectMemory(), directMemory);
+ assertEquals(data.getBandwidthIn(), bandwidthIn);
+ assertEquals(data.getBandwidthOut(), bandwidthOut);
+ assertEquals(data.getMsgThroughputIn(), 1.0);
+ assertEquals(data.getMsgThroughputOut(), 2.0);
+ assertEquals(data.getMsgRateIn(), 3.0);
+ assertEquals(data.getMsgRateOut(), 4.0);
+ assertEquals(data.getMaxResourceUsage(), 0.04); // skips memory usage
+ assertEquals(data.getWeightedMaxEMA(), 2);
+ assertThat(data.getUpdatedAt(), greaterThanOrEqualTo(now));
+
+ now = System.currentTimeMillis();
+ SystemResourceUsage usage2 = new SystemResourceUsage();
+ cpu = new ResourceUsage(300.0, 100.0);
+ memory = new ResourceUsage(200.0, 200.0);
+ directMemory= new ResourceUsage(2.0, 100.0);
+ bandwidthIn= new ResourceUsage(3.0, 100.0);
+ bandwidthOut= new ResourceUsage(4.0, 100.0);
+ usage2.setCpu(cpu);
+ usage2.setMemory(memory);
+ usage2.setDirectMemory(directMemory);
+ usage2.setBandwidthIn(bandwidthIn);
+ usage2.setBandwidthOut(bandwidthOut);
+ data.update(usage2, 5,6,7,8, conf);
+
+ assertEquals(data.getCpu(), cpu);
+ assertEquals(data.getMemory(), memory);
+ assertEquals(data.getDirectMemory(), directMemory);
+ assertEquals(data.getBandwidthIn(), bandwidthIn);
+ assertEquals(data.getBandwidthOut(), bandwidthOut);
+ assertEquals(data.getMsgThroughputIn(), 5.0);
+ assertEquals(data.getMsgThroughputOut(), 6.0);
+ assertEquals(data.getMsgRateIn(), 7.0);
+ assertEquals(data.getMsgRateOut(), 8.0);
+ assertEquals(data.getMaxResourceUsage(), 3.0);
+ assertEquals(data.getWeightedMaxEMA(), 1.875);
+ assertThat(data.getUpdatedAt(), greaterThanOrEqualTo(now));
+ assertEquals(data.toString(conf), "cpu= 300.00%, memory= 100.00%,
directMemory= 2.00%, "
+ + "bandwithIn= 3.00%, bandwithOut= 4.00%, "
+ + "cpuWeight= 0.500000, memoryWeight= 0.500000,
directMemoryWeight= 0.500000, "
+ + "bandwithInResourceWeight= 0.500000,
bandwithOutResourceWeight= 0.500000, "
+ + "msgThroughputIn= 5.00, msgThroughputOut= 6.00, "
+ + "msgRateIn= 7.00, msgRateOut= 8.00,"
+ + " maxResourceUsage= 300.00%, weightedMaxEMA= 187.50%,
updatedAt= " + data.getUpdatedAt());
}
+
+ @Test
+ public void testUpdateByBrokerLoadData() {
+ ServiceConfiguration conf = new ServiceConfiguration();
+ conf.setLoadBalancerCPUResourceWeight(0.5);
+ conf.setLoadBalancerMemoryResourceWeight(0.5);
+ conf.setLoadBalancerDirectMemoryResourceWeight(0.5);
+ conf.setLoadBalancerBandwithInResourceWeight(0.5);
+ conf.setLoadBalancerBandwithOutResourceWeight(0.5);
+ conf.setLoadBalancerHistoryResourcePercentage(0.75);
+
+ BrokerLoadData data = new BrokerLoadData();
+ BrokerLoadData other = new BrokerLoadData();
+
+ SystemResourceUsage usage1 = new SystemResourceUsage();
+ var cpu = new ResourceUsage(1.0, 100.0);
+ var memory = new ResourceUsage(800.0, 200.0);
+ var directMemory= new ResourceUsage(2.0, 100.0);
+ var bandwidthIn= new ResourceUsage(3.0, 100.0);
+ var bandwidthOut= new ResourceUsage(4.0, 100.0);
+ usage1.setCpu(cpu);
+ usage1.setMemory(memory);
+ usage1.setDirectMemory(directMemory);
+ usage1.setBandwidthIn(bandwidthIn);
+ usage1.setBandwidthOut(bandwidthOut);
+ other.update(usage1, 1,2,3,4, conf);
+ data.update(other);
+
+ assertEquals(data, other);
+ }
+
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
index 6a260dff72d..db3d8f9304c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
@@ -24,7 +24,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
-import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -33,7 +32,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
+import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
@@ -42,13 +41,13 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.testng.annotations.Test;
@Test(groups = "broker")
public class LeastResourceUsageWithWeightTest {
// Test that least resource usage with weight works correctly.
-
ServiceUnitId bundleData = new ServiceUnitId() {
@Override
public NamespaceName getNamespaceObject() {
@@ -65,10 +64,10 @@ public class LeastResourceUsageWithWeightTest {
var ctx = getContext();
var brokerLoadDataStore = ctx.brokerLoadDataStore();
- brokerLoadDataStore.pushAsync("broker1", initBrokerData(10, 100));
- brokerLoadDataStore.pushAsync("broker2", initBrokerData(30, 100));
- brokerLoadDataStore.pushAsync("broker3", initBrokerData(60, 100));
- brokerLoadDataStore.pushAsync("broker4", initBrokerData(5, 100));
+ brokerLoadDataStore.pushAsync("1", createBrokerData(ctx, 10, 100));
+ brokerLoadDataStore.pushAsync("2", createBrokerData(ctx, 30, 100));
+ brokerLoadDataStore.pushAsync("3", createBrokerData(ctx, 60, 100));
+ brokerLoadDataStore.pushAsync("4", createBrokerData(ctx, 5, 100));
return ctx;
}
@@ -76,85 +75,65 @@ public class LeastResourceUsageWithWeightTest {
public void testSelect() {
var ctx = setupContext();
- ServiceConfiguration conf = ctx.brokerConfiguration();
- conf.setLoadBalancerCPUResourceWeight(1.0);
- conf.setLoadBalancerMemoryResourceWeight(0.1);
- conf.setLoadBalancerDirectMemoryResourceWeight(0.1);
- conf.setLoadBalancerBandwithInResourceWeight(1.0);
- conf.setLoadBalancerBandwithOutResourceWeight(1.0);
- conf.setLoadBalancerHistoryResourcePercentage(0.5);
-
conf.setLoadBalancerAverageResourceUsageDifferenceThresholdPercentage(5);
-
LeastResourceUsageWithWeight strategy = new
LeastResourceUsageWithWeight();
-
- var brokers = ctx.brokerLoadDataStore().entrySet().stream()
- .map(e -> e.getKey()).collect(Collectors.toList());
- // Make brokerAvgResourceUsageWithWeight contain broker4.
- strategy.select(brokers, bundleData, ctx);
-
// Should choice broker from broker1 2 3.
List<String> candidates = new ArrayList<>();
- candidates.add("broker1");
- candidates.add("broker2");
- candidates.add("broker3");
+ candidates.add("1");
+ candidates.add("2");
+ candidates.add("3");
- assertEquals(strategy.select(candidates, bundleData, ctx),
Optional.of("broker1"));
+ assertEquals(strategy.select(candidates, bundleData, ctx),
Optional.of("1"));
var brokerLoadDataStore = ctx.brokerLoadDataStore();
- brokerLoadDataStore.pushAsync("broker1", initBrokerData(20, 100));
- brokerLoadDataStore.pushAsync("broker2", initBrokerData(30, 100));
- brokerLoadDataStore.pushAsync("broker3", initBrokerData(50, 100));
- brokerLoadDataStore.pushAsync("broker4", null);
+ brokerLoadDataStore.pushAsync("1", createBrokerData(ctx, 20, 100));
+ brokerLoadDataStore.pushAsync("2", createBrokerData(ctx, 30, 100));
+ brokerLoadDataStore.pushAsync("3", createBrokerData(ctx, 50, 100));
+
+ assertEquals(strategy.select(candidates, bundleData, ctx),
Optional.of("1"));
- assertEquals(strategy.select(candidates, bundleData, ctx),
Optional.of("broker1"));
+ updateLoad(ctx, "1", 30);
+ updateLoad(ctx, "2", 30);
+ updateLoad(ctx, "3", 40);
- brokerLoadDataStore.pushAsync("broker1", initBrokerData(30, 100));
- brokerLoadDataStore.pushAsync("broker2", initBrokerData(30, 100));
- brokerLoadDataStore.pushAsync("broker3", initBrokerData(40, 100));
+ assertEquals(strategy.select(candidates, bundleData, ctx),
Optional.of("1"));
- assertEquals(strategy.select(candidates, bundleData, ctx),
Optional.of("broker1"));
+ updateLoad(ctx, "1", 30);
+ updateLoad(ctx, "2", 30);
+ updateLoad(ctx, "3", 40);
- brokerLoadDataStore.pushAsync("broker1", initBrokerData(30, 100));
- brokerLoadDataStore.pushAsync("broker2", initBrokerData(30, 100));
- brokerLoadDataStore.pushAsync("broker3", initBrokerData(40, 100));
+ assertEquals(strategy.select(candidates, bundleData, ctx),
Optional.of("1"));
- assertEquals(strategy.select(candidates, bundleData, ctx),
Optional.of("broker1"));
+ updateLoad(ctx, "1", 35);
+ updateLoad(ctx, "2", 20);
+ updateLoad(ctx, "3", 45);
- brokerLoadDataStore.pushAsync("broker1", initBrokerData(35, 100));
- brokerLoadDataStore.pushAsync("broker2", initBrokerData(20, 100));
- brokerLoadDataStore.pushAsync("broker3", initBrokerData(45, 100));
+ assertEquals(strategy.select(candidates, bundleData, ctx),
Optional.of("2"));
- assertEquals(strategy.select(candidates, bundleData, ctx),
Optional.of("broker2"));
+ // test restart broker can load bundle as one of the best brokers.
+ updateLoad(ctx, "1", 35);
+ updateLoad(ctx, "2", 20);
+ brokerLoadDataStore.pushAsync("3", createBrokerData(ctx, 0, 100));
+
+ assertEquals(strategy.select(candidates, bundleData, ctx),
Optional.of("3"));
}
public void testArithmeticException()
throws NoSuchFieldException, IllegalAccessException {
var ctx = setupContext();
- var conf = ctx.brokerConfiguration();
- conf.setLoadBalancerCPUResourceWeight(1.0);
- conf.setLoadBalancerMemoryResourceWeight(0.1);
- conf.setLoadBalancerDirectMemoryResourceWeight(0.1);
- conf.setLoadBalancerBandwithInResourceWeight(1.0);
- conf.setLoadBalancerBandwithOutResourceWeight(1.0);
- conf.setLoadBalancerHistoryResourcePercentage(0.5);
-
conf.setLoadBalancerAverageResourceUsageDifferenceThresholdPercentage(5);
-
+ var brokerLoadStore = ctx.brokerLoadDataStore();
LeastResourceUsageWithWeight strategy = new
LeastResourceUsageWithWeight();
// Should choice broker from broker1 2 3.
List<String> candidates = new ArrayList<>();
- candidates.add("broker1");
- candidates.add("broker2");
- candidates.add("broker3");
- Field strategyUpdater =
LeastResourceUsageWithWeight.class.getDeclaredField("brokerAvgResourceUsageWithWeight");
- strategyUpdater.setAccessible(true);
- Map<String, Double> brokerAvgResourceUsageWithWeight = new HashMap<>();
- brokerAvgResourceUsageWithWeight.put("broker1", 0.1d);
- brokerAvgResourceUsageWithWeight.put("broker2", 0.3d);
- brokerAvgResourceUsageWithWeight.put("broker4", 0.05d);
- strategyUpdater.set(strategy, brokerAvgResourceUsageWithWeight);
- assertEquals(strategy.select(candidates, bundleData, ctx),
Optional.of("broker1"));
+ candidates.add("1");
+ candidates.add("2");
+ candidates.add("3");
+
+ FieldUtils.writeDeclaredField(brokerLoadStore.get("1").get(),
"weightedMaxEMA", 0.1d, true);
+ FieldUtils.writeDeclaredField(brokerLoadStore.get("2").get(),
"weightedMaxEMA", 0.3d, true);
+ FieldUtils.writeDeclaredField(brokerLoadStore.get("4").get(),
"weightedMaxEMA", 0.05d, true);
+ assertEquals(strategy.select(candidates, bundleData, ctx),
Optional.of("1"));
}
public void testNoLoadDataBrokers() {
@@ -164,42 +143,62 @@ public class LeastResourceUsageWithWeightTest {
List<String> candidates = new ArrayList<>();
var brokerLoadDataStore = ctx.brokerLoadDataStore();
- brokerLoadDataStore.pushAsync("broker1", initBrokerData(50, 100));
- brokerLoadDataStore.pushAsync("broker2", initBrokerData(100, 100));
- brokerLoadDataStore.pushAsync("broker3", null);
- brokerLoadDataStore.pushAsync("broker4", null);
- candidates.add("broker1");
- candidates.add("broker2");
- candidates.add("broker5");
+ brokerLoadDataStore.pushAsync("1", createBrokerData(ctx,50, 100));
+ brokerLoadDataStore.pushAsync("2", createBrokerData(ctx,100, 100));
+ brokerLoadDataStore.pushAsync("3", null);
+ brokerLoadDataStore.pushAsync("4", null);
+ candidates.add("1");
+ candidates.add("2");
+ candidates.add("5");
var result = strategy.select(candidates, bundleData, ctx).get();
- assertEquals(result, "broker1");
+ assertEquals(result, "1");
strategy = new LeastResourceUsageWithWeight();
- brokerLoadDataStore.pushAsync("broker1", initBrokerData(100, 100));
+ brokerLoadDataStore.pushAsync("1", createBrokerData(ctx,100, 100));
result = strategy.select(candidates, bundleData, ctx).get();
- assertThat(result, anyOf(equalTo("broker1"), equalTo("broker2"),
equalTo("broker5")));
+ assertThat(result, anyOf(equalTo("1"), equalTo("2"), equalTo("5")));
- brokerLoadDataStore.pushAsync("broker1", null);
- brokerLoadDataStore.pushAsync("broker2", null);
+ brokerLoadDataStore.pushAsync("1", null);
+ brokerLoadDataStore.pushAsync("2", null);
result = strategy.select(candidates, bundleData, ctx).get();
- assertThat(result, anyOf(equalTo("broker1"), equalTo("broker2"),
equalTo("broker5")));
+ assertThat(result, anyOf(equalTo("1"), equalTo("2"), equalTo("5")));
}
- private BrokerLoadData initBrokerData(double usage, double limit) {
+ private BrokerLoadData createBrokerData(LoadManagerContext ctx, double
usage, double limit) {
var brokerLoadData = new BrokerLoadData();
- brokerLoadData.setCpu(new ResourceUsage(usage, limit));
- brokerLoadData.setMemory(new ResourceUsage(usage, limit));
- brokerLoadData.setDirectMemory(new ResourceUsage(usage, limit));
- brokerLoadData.setBandwidthIn(new ResourceUsage(usage, limit));
- brokerLoadData.setBandwidthOut(new ResourceUsage(usage, limit));
+ SystemResourceUsage usages = createUsage(usage, limit);
+ brokerLoadData.update(usages, 1, 1, 1, 1,
+ ctx.brokerConfiguration());
return brokerLoadData;
}
+ private SystemResourceUsage createUsage(double usage, double limit) {
+ SystemResourceUsage usages = new SystemResourceUsage();
+ usages.setCpu(new ResourceUsage(usage, limit));
+ usages.setMemory(new ResourceUsage(usage, limit));
+ usages.setDirectMemory(new ResourceUsage(usage, limit));
+ usages.setBandwidthIn(new ResourceUsage(usage, limit));
+ usages.setBandwidthOut(new ResourceUsage(usage, limit));
+ return usages;
+ }
+
+ private void updateLoad(LoadManagerContext ctx, String broker, double
usage) {
+ ctx.brokerLoadDataStore().get(broker).get().update(createUsage(usage,
100.0),
+ 1, 1, 1, 1, ctx.brokerConfiguration());
+ }
+
public LoadManagerContext getContext() {
var ctx = mock(LoadManagerContext.class);
var conf = new ServiceConfiguration();
+ conf.setLoadBalancerCPUResourceWeight(1.0);
+ conf.setLoadBalancerMemoryResourceWeight(0.1);
+ conf.setLoadBalancerDirectMemoryResourceWeight(0.1);
+ conf.setLoadBalancerBandwithInResourceWeight(1.0);
+ conf.setLoadBalancerBandwithOutResourceWeight(1.0);
+ conf.setLoadBalancerHistoryResourcePercentage(0.5);
+
conf.setLoadBalancerAverageResourceUsageDifferenceThresholdPercentage(5);
var brokerLoadDataStore = new LoadDataStore<BrokerLoadData>() {
Map<String, BrokerLoadData> map = new HashMap<>();
@Override