This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e7a5c77c55 [improve][broker] PIP-192 Moved the common broker load 
data feature(weightedMaxEMA) to BrokerLoadData (#19154)
9e7a5c77c55 is described below

commit 9e7a5c77c55cce5adc13b305896e99c2d900056c
Author: Heesung Sohn <[email protected]>
AuthorDate: Tue Jan 17 20:42:47 2023 -0800

    [improve][broker] PIP-192 Moved the common broker load data 
feature(weightedMaxEMA) to BrokerLoadData (#19154)
---
 .../extensions/data/BrokerLoadData.java            | 131 +++++++++++++----
 .../strategy/LeastResourceUsageWithWeight.java     |  79 +++-------
 .../extensions/data/BrokerLoadDataTest.java        | 122 +++++++++++++--
 .../strategy/LeastResourceUsageWithWeightTest.java | 163 ++++++++++-----------
 4 files changed, 312 insertions(+), 183 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java
index fbb5093939e..39419946992 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions.data;
 
-import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
@@ -29,24 +31,42 @@ import 
org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
  * Migrate from {@link 
org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
  * And removed the lookup data, see {@link BrokerLookupData}
  */
-@Data
+@Getter
+@EqualsAndHashCode
 public class BrokerLoadData {
 
+    private static final double DEFAULT_RESOURCE_USAGE = 1.0d;
+
     // Most recently available system resource usage.
     private ResourceUsage cpu;
     private ResourceUsage memory;
     private ResourceUsage directMemory;
-
     private ResourceUsage bandwidthIn;
     private ResourceUsage bandwidthOut;
 
     // Message data from the most recent namespace bundle stats.
-    private double msgThroughputIn;
-    private ResourceUsage msgThroughputInUsage;
-    private double msgThroughputOut;
-    private ResourceUsage msgThroughputOutUsage;
-    private double msgRateIn;
-    private double msgRateOut;
+    private double msgThroughputIn; // bytes/sec
+    private double msgThroughputOut;  // bytes/sec
+    private double msgRateIn; // messages/sec
+    private double msgRateOut; // messages/sec
+
+    // Load data features computed from the above resources.
+    private double maxResourceUsage; // max of resource usages
+    /**
+     * Exponential moving average(EMA) of max of weighted resource usages among
+     * cpu, memory, directMemory, bandwidthIn and bandwidthOut.
+     *
+     * The resource weights are configured by :
+     * loadBalancerCPUResourceWeight,
+     * loadBalancerMemoryResourceWeight,
+     * loadBalancerDirectMemoryResourceWeight,
+     * loadBalancerBandwithInResourceWeight, and
+     * loadBalancerBandwithOutResourceWeight.
+     *
+     * The historical resource percentage is configured by 
loadBalancerHistoryResourcePercentage.
+     */
+    private double weightedMaxEMA;
+    private long updatedAt;
 
     public BrokerLoadData() {
         cpu = new ResourceUsage();
@@ -54,34 +74,56 @@ public class BrokerLoadData {
         directMemory = new ResourceUsage();
         bandwidthIn = new ResourceUsage();
         bandwidthOut = new ResourceUsage();
-        msgThroughputInUsage = new ResourceUsage();
-        msgThroughputOutUsage = new ResourceUsage();
+        maxResourceUsage = DEFAULT_RESOURCE_USAGE;
+        weightedMaxEMA = DEFAULT_RESOURCE_USAGE;
     }
 
     /**
-     * Using the system resource usage and bundle stats acquired from the 
Pulsar client, update this LocalBrokerData.
+     * Using the system resource usage from the Pulsar client, update this 
BrokerLoadData.
      *
-     * @param systemResourceUsage
+     * @param usage
      *            System resource usage (cpu, memory, and direct memory).
+     * @param msgThroughputIn
+     *            broker-level message input throughput in bytes/s.
+     * @param msgThroughputOut
+     *            broker-level message output throughput in bytes/s.
+     * @param msgRateIn
+     *            broker-level message input rate in messages/s.
+     * @param msgRateOut
+     *            broker-level message output rate in messages/s.
+     * @param conf
+     *            Service configuration to compute load data features.
      */
-    public void update(final SystemResourceUsage systemResourceUsage) {
-        updateSystemResourceUsage(systemResourceUsage);
+    public void update(final SystemResourceUsage usage,
+                       double msgThroughputIn,
+                       double msgThroughputOut,
+                       double msgRateIn,
+                       double msgRateOut,
+                       ServiceConfiguration conf) {
+        updateSystemResourceUsage(usage.cpu, usage.memory, usage.directMemory, 
usage.bandwidthIn, usage.bandwidthOut);
+        this.msgThroughputIn = msgThroughputIn;
+        this.msgThroughputOut = msgThroughputOut;
+        this.msgRateIn = msgRateIn;
+        this.msgRateOut = msgRateOut;
+        updateFeatures(conf);
+        updatedAt = System.currentTimeMillis();
     }
 
     /**
-     * Using another LocalBrokerData, update this.
+     * Using another BrokerLoadData, update this.
      *
      * @param other
-     *            LocalBrokerData to update from.
+     *            BrokerLoadData to update from.
      */
     public void update(final BrokerLoadData other) {
         updateSystemResourceUsage(other.cpu, other.memory, other.directMemory, 
other.bandwidthIn, other.bandwidthOut);
-    }
-
-    // Set the cpu, memory, and direct memory to that of the new system 
resource usage data.
-    private void updateSystemResourceUsage(final SystemResourceUsage 
systemResourceUsage) {
-        updateSystemResourceUsage(systemResourceUsage.cpu, 
systemResourceUsage.memory, systemResourceUsage.directMemory,
-                systemResourceUsage.bandwidthIn, 
systemResourceUsage.bandwidthOut);
+        msgThroughputIn = other.msgThroughputIn;
+        msgThroughputOut = other.msgThroughputOut;
+        msgRateIn = other.msgRateIn;
+        msgRateOut = other.msgRateOut;
+        weightedMaxEMA = other.weightedMaxEMA;
+        maxResourceUsage = other.maxResourceUsage;
+        updatedAt = other.updatedAt;
     }
 
     // Update resource usage given each individual usage.
@@ -95,12 +137,18 @@ public class BrokerLoadData {
         this.bandwidthOut = bandwidthOut;
     }
 
-    public double getMaxResourceUsage() {
-        return LocalBrokerData.max(cpu.percentUsage(), 
directMemory.percentUsage(), bandwidthIn.percentUsage(),
+    private void updateFeatures(ServiceConfiguration conf) {
+        updateMaxResourceUsage();
+        updateWeightedMaxEMA(conf);
+    }
+
+    private void updateMaxResourceUsage() {
+        maxResourceUsage = LocalBrokerData.max(cpu.percentUsage(), 
directMemory.percentUsage(),
+                bandwidthIn.percentUsage(),
                 bandwidthOut.percentUsage()) / 100;
     }
 
-    public double getMaxResourceUsageWithWeight(final double cpuWeight, final 
double memoryWeight,
+    private double getMaxResourceUsageWithWeight(final double cpuWeight, final 
double memoryWeight,
                                                 final double 
directMemoryWeight, final double bandwidthInWeight,
                                                 final double 
bandwidthOutWeight) {
         return LocalBrokerData.max(cpu.percentUsage() * cpuWeight, 
memory.percentUsage() * memoryWeight,
@@ -108,4 +156,35 @@ public class BrokerLoadData {
                 bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
     }
 
+    private void updateWeightedMaxEMA(ServiceConfiguration conf) {
+        var historyPercentage = 
conf.getLoadBalancerHistoryResourcePercentage();
+        var weightedMax = getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(), 
conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        weightedMaxEMA = updatedAt == 0 ? weightedMax :
+                weightedMaxEMA * historyPercentage + (1 - historyPercentage) * 
weightedMax;
+    }
+
+    public String toString(ServiceConfiguration conf) {
+        return String.format("cpu= %.2f%%, memory= %.2f%%, directMemory= 
%.2f%%, "
+                        + "bandwithIn= %.2f%%, bandwithOut= %.2f%%, "
+                        + "cpuWeight= %f, memoryWeight= %f, 
directMemoryWeight= %f, "
+                        + "bandwithInResourceWeight= %f, 
bandwithOutResourceWeight= %f, "
+                        + "msgThroughputIn= %.2f, msgThroughputOut= %.2f, 
msgRateIn= %.2f, msgRateOut= %.2f, "
+                        + "maxResourceUsage= %.2f%%, weightedMaxEMA= %.2f%%, 
updatedAt= %d",
+
+                cpu.percentUsage(), memory.percentUsage(), 
directMemory.percentUsage(),
+                bandwidthIn.percentUsage(), bandwidthOut.percentUsage(),
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight(),
+                msgThroughputIn, msgThroughputOut, msgRateIn, msgRateOut,
+                maxResourceUsage * 100, weightedMaxEMA * 100, updatedAt
+        );
+    }
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
index 829dc7ce12a..f48bab54f89 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
@@ -19,10 +19,8 @@
 package org.apache.pulsar.broker.loadbalance.extensions.strategy;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
@@ -39,15 +37,12 @@ import org.apache.pulsar.common.naming.ServiceUnitId;
  */
 @Slf4j
 public class LeastResourceUsageWithWeight implements BrokerSelectionStrategy {
-    private static final double MAX_RESOURCE_USAGE = 1.0d;
     // Maintain this list to reduce object creation.
     private final ArrayList<String> bestBrokers;
     private final Set<String> noLoadDataBrokers;
-    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
 
     public LeastResourceUsageWithWeight() {
         this.bestBrokers = new ArrayList<>();
-        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
         this.noLoadDataBrokers = new HashSet<>();
     }
 
@@ -55,65 +50,24 @@ public class LeastResourceUsageWithWeight implements 
BrokerSelectionStrategy {
     private double getMaxResourceUsageWithWeight(final String broker, final 
BrokerLoadData brokerLoadData,
                                                  final ServiceConfiguration 
conf, boolean debugMode) {
         final double overloadThreshold = 
conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
-        final double maxUsageWithWeight =
-                updateAndGetMaxResourceUsageWithWeight(broker, brokerLoadData, 
conf, debugMode);
+        final var maxUsageWithWeight = brokerLoadData.getWeightedMaxEMA();
+
 
         if (maxUsageWithWeight > overloadThreshold) {
             log.warn(
-                    "Broker {} is overloaded, max resource usage with weight 
percentage: {}%, "
-                            + "CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, 
BANDWIDTH IN: {}%, "
-                            + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY 
weight: {}, DIRECT MEMORY weight: {}, "
-                            + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: 
{}",
-                    broker, maxUsageWithWeight * 100,
-                    brokerLoadData.getCpu().percentUsage(), 
brokerLoadData.getMemory().percentUsage(),
-                    brokerLoadData.getDirectMemory().percentUsage(), 
brokerLoadData.getBandwidthIn().percentUsage(),
-                    brokerLoadData.getBandwidthOut().percentUsage(), 
conf.getLoadBalancerCPUResourceWeight(),
-                    conf.getLoadBalancerMemoryResourceWeight(), 
conf.getLoadBalancerDirectMemoryResourceWeight(),
-                    conf.getLoadBalancerBandwithInResourceWeight(),
-                    conf.getLoadBalancerBandwithOutResourceWeight());
+                    "Broker {} is overloaded, brokerLoad({}%) > 
overloadThreshold({}%). load data:{{}}",
+                    broker,
+                    maxUsageWithWeight * 100,
+                    overloadThreshold * 100,
+                    brokerLoadData.toString(conf));
+        } else if (debugMode) {
+            log.info("Broker {} load data:{{}}", broker, 
brokerLoadData.toString(conf));
         }
 
-        if (debugMode) {
-            log.info("Broker {} has max resource usage with weight percentage: 
{}%",
-                    broker, maxUsageWithWeight * 100);
-        }
+
         return maxUsageWithWeight;
     }
 
-    /**
-     * Update and get the max resource usage with weight of broker according 
to the service configuration.
-     *
-     * @param broker     The broker name.
-     * @param brokerData The broker load data.
-     * @param conf       The service configuration.
-     * @param debugMode  The debug mode to print computed load states and 
decision information.
-     * @return the max resource usage with weight of broker
-     */
-    private double updateAndGetMaxResourceUsageWithWeight(String broker, 
BrokerLoadData brokerData,
-                                                          ServiceConfiguration 
conf, boolean debugMode) {
-        final double historyPercentage = 
conf.getLoadBalancerHistoryResourcePercentage();
-        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
-        double resourceUsage = brokerData.getMaxResourceUsageWithWeight(
-                conf.getLoadBalancerCPUResourceWeight(),
-                conf.getLoadBalancerMemoryResourceWeight(),
-                conf.getLoadBalancerDirectMemoryResourceWeight(),
-                conf.getLoadBalancerBandwithInResourceWeight(),
-                conf.getLoadBalancerBandwithOutResourceWeight());
-        historyUsage = historyUsage == null
-                ? resourceUsage : historyUsage * historyPercentage + (1 - 
historyPercentage) * resourceUsage;
-        if (debugMode) {
-            log.info(
-                    "Broker {} get max resource usage with weight: {}, history 
resource percentage: {}%, CPU weight: "
-                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: 
{}, BANDWIDTH IN weight: {}, BANDWIDTH "
-                            + "OUT weight: {} ",
-                    broker, historyUsage, historyPercentage, 
conf.getLoadBalancerCPUResourceWeight(),
-                    conf.getLoadBalancerMemoryResourceWeight(), 
conf.getLoadBalancerDirectMemoryResourceWeight(),
-                    conf.getLoadBalancerBandwithInResourceWeight(),
-                    conf.getLoadBalancerBandwithOutResourceWeight());
-        }
-        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
-        return historyUsage;
-    }
 
     /**
      * Find a suitable broker to assign the given bundle to.
@@ -143,7 +97,7 @@ public class LeastResourceUsageWithWeight implements 
BrokerSelectionStrategy {
         for (String broker : candidates) {
             var brokerLoadDataOptional = 
context.brokerLoadDataStore().get(broker);
             if (brokerLoadDataOptional.isEmpty()) {
-                log.warn("There is no broker load data for broker:{}. Skipping 
this broker.", broker);
+                log.warn("There is no broker load data for broker:{}. Skipping 
this broker. Phase one", broker);
                 noLoadDataBrokers.add(broker);
                 continue;
             }
@@ -162,12 +116,17 @@ public class LeastResourceUsageWithWeight implements 
BrokerSelectionStrategy {
             if (debugMode) {
                 log.info("Computed avgUsage:{}, diffThreshold:{}", avgUsage, 
diffThreshold);
             }
-            candidates.forEach(broker -> {
-                Double avgResUsage = 
brokerAvgResourceUsageWithWeight.getOrDefault(broker, MAX_RESOURCE_USAGE);
+            for (String broker : candidates) {
+                var brokerLoadDataOptional = 
context.brokerLoadDataStore().get(broker);
+                if (brokerLoadDataOptional.isEmpty()) {
+                    log.warn("There is no broker load data for broker:{}. 
Skipping this broker. Phase two", broker);
+                    continue;
+                }
+                double avgResUsage = 
brokerLoadDataOptional.get().getWeightedMaxEMA();
                 if ((avgResUsage + diffThreshold <= avgUsage && 
!noLoadDataBrokers.contains(broker))) {
                     bestBrokers.add(broker);
                 }
-            });
+            }
         }
 
         if (bestBrokers.isEmpty()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java
index 58805af922b..cedf7bca5d5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java
@@ -18,9 +18,13 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions.data;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.testng.Assert.assertEquals;
 
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
 import org.testng.annotations.Test;
 
 /**
@@ -31,21 +35,109 @@ import org.testng.annotations.Test;
 public class BrokerLoadDataTest {
 
     @Test
-    public void testMaxResourceUsage() {
+    public void testUpdateBySystemResourceUsage() {
+
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setLoadBalancerCPUResourceWeight(0.5);
+        conf.setLoadBalancerMemoryResourceWeight(0.5);
+        conf.setLoadBalancerDirectMemoryResourceWeight(0.5);
+        conf.setLoadBalancerBandwithInResourceWeight(0.5);
+        conf.setLoadBalancerBandwithOutResourceWeight(0.5);
+        conf.setLoadBalancerHistoryResourcePercentage(0.75);
+
         BrokerLoadData data = new BrokerLoadData();
-        data.setCpu(new ResourceUsage(1.0, 100.0));
-        data.setMemory(new ResourceUsage(800.0, 200.0));
-        data.setDirectMemory(new ResourceUsage(2.0, 100.0));
-        data.setBandwidthIn(new ResourceUsage(3.0, 100.0));
-        data.setBandwidthOut(new ResourceUsage(4.0, 100.0));
-
-        double epsilon = 0.00001;
-        double weight = 0.5;
-        // skips memory usage
-        assertEquals(data.getMaxResourceUsage(), 0.04, epsilon);
-
-        assertEquals(
-                data.getMaxResourceUsageWithWeight(
-                        weight, weight, weight, weight, weight), 2.0, epsilon);
+
+        long now = System.currentTimeMillis();
+        SystemResourceUsage usage1 = new SystemResourceUsage();
+        var cpu = new ResourceUsage(1.0, 100.0);
+        var memory = new ResourceUsage(800.0, 200.0);
+        var directMemory= new ResourceUsage(2.0, 100.0);
+        var bandwidthIn= new ResourceUsage(3.0, 100.0);
+        var bandwidthOut= new ResourceUsage(4.0, 100.0);
+        usage1.setCpu(cpu);
+        usage1.setMemory(memory);
+        usage1.setDirectMemory(directMemory);
+        usage1.setBandwidthIn(bandwidthIn);
+        usage1.setBandwidthOut(bandwidthOut);
+        data.update(usage1, 1,2,3,4, conf);
+
+        assertEquals(data.getCpu(), cpu);
+        assertEquals(data.getMemory(), memory);
+        assertEquals(data.getDirectMemory(), directMemory);
+        assertEquals(data.getBandwidthIn(), bandwidthIn);
+        assertEquals(data.getBandwidthOut(), bandwidthOut);
+        assertEquals(data.getMsgThroughputIn(), 1.0);
+        assertEquals(data.getMsgThroughputOut(), 2.0);
+        assertEquals(data.getMsgRateIn(), 3.0);
+        assertEquals(data.getMsgRateOut(), 4.0);
+        assertEquals(data.getMaxResourceUsage(), 0.04); // skips memory usage
+        assertEquals(data.getWeightedMaxEMA(), 2);
+        assertThat(data.getUpdatedAt(), greaterThanOrEqualTo(now));
+
+        now = System.currentTimeMillis();
+        SystemResourceUsage usage2 = new SystemResourceUsage();
+        cpu = new ResourceUsage(300.0, 100.0);
+        memory = new ResourceUsage(200.0, 200.0);
+        directMemory= new ResourceUsage(2.0, 100.0);
+        bandwidthIn= new ResourceUsage(3.0, 100.0);
+        bandwidthOut= new ResourceUsage(4.0, 100.0);
+        usage2.setCpu(cpu);
+        usage2.setMemory(memory);
+        usage2.setDirectMemory(directMemory);
+        usage2.setBandwidthIn(bandwidthIn);
+        usage2.setBandwidthOut(bandwidthOut);
+        data.update(usage2, 5,6,7,8, conf);
+
+        assertEquals(data.getCpu(), cpu);
+        assertEquals(data.getMemory(), memory);
+        assertEquals(data.getDirectMemory(), directMemory);
+        assertEquals(data.getBandwidthIn(), bandwidthIn);
+        assertEquals(data.getBandwidthOut(), bandwidthOut);
+        assertEquals(data.getMsgThroughputIn(), 5.0);
+        assertEquals(data.getMsgThroughputOut(), 6.0);
+        assertEquals(data.getMsgRateIn(), 7.0);
+        assertEquals(data.getMsgRateOut(), 8.0);
+        assertEquals(data.getMaxResourceUsage(), 3.0);
+        assertEquals(data.getWeightedMaxEMA(), 1.875);
+        assertThat(data.getUpdatedAt(), greaterThanOrEqualTo(now));
+        assertEquals(data.toString(conf), "cpu= 300.00%, memory= 100.00%, 
directMemory= 2.00%, "
+                + "bandwithIn= 3.00%, bandwithOut= 4.00%, "
+                + "cpuWeight= 0.500000, memoryWeight= 0.500000, 
directMemoryWeight= 0.500000, "
+                + "bandwithInResourceWeight= 0.500000, 
bandwithOutResourceWeight= 0.500000, "
+                + "msgThroughputIn= 5.00, msgThroughputOut= 6.00, "
+                + "msgRateIn= 7.00, msgRateOut= 8.00,"
+                + " maxResourceUsage= 300.00%, weightedMaxEMA= 187.50%, 
updatedAt= " + data.getUpdatedAt());
     }
+
+    @Test
+    public void testUpdateByBrokerLoadData() {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setLoadBalancerCPUResourceWeight(0.5);
+        conf.setLoadBalancerMemoryResourceWeight(0.5);
+        conf.setLoadBalancerDirectMemoryResourceWeight(0.5);
+        conf.setLoadBalancerBandwithInResourceWeight(0.5);
+        conf.setLoadBalancerBandwithOutResourceWeight(0.5);
+        conf.setLoadBalancerHistoryResourcePercentage(0.75);
+
+        BrokerLoadData data = new BrokerLoadData();
+        BrokerLoadData other = new BrokerLoadData();
+
+        SystemResourceUsage usage1 = new SystemResourceUsage();
+        var cpu = new ResourceUsage(1.0, 100.0);
+        var memory = new ResourceUsage(800.0, 200.0);
+        var directMemory= new ResourceUsage(2.0, 100.0);
+        var bandwidthIn= new ResourceUsage(3.0, 100.0);
+        var bandwidthOut= new ResourceUsage(4.0, 100.0);
+        usage1.setCpu(cpu);
+        usage1.setMemory(memory);
+        usage1.setDirectMemory(directMemory);
+        usage1.setBandwidthIn(bandwidthIn);
+        usage1.setBandwidthOut(bandwidthOut);
+        other.update(usage1, 1,2,3,4, conf);
+        data.update(other);
+
+        assertEquals(data, other);
+    }
+
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
index 6a260dff72d..db3d8f9304c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
@@ -24,7 +24,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -33,7 +32,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
+import org.apache.commons.lang.reflect.FieldUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
@@ -42,13 +41,13 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
 public class LeastResourceUsageWithWeightTest {
 
     // Test that least resource usage with weight works correctly.
-
     ServiceUnitId bundleData = new ServiceUnitId() {
         @Override
         public NamespaceName getNamespaceObject() {
@@ -65,10 +64,10 @@ public class LeastResourceUsageWithWeightTest {
         var ctx = getContext();
 
         var brokerLoadDataStore = ctx.brokerLoadDataStore();
-        brokerLoadDataStore.pushAsync("broker1", initBrokerData(10, 100));
-        brokerLoadDataStore.pushAsync("broker2", initBrokerData(30, 100));
-        brokerLoadDataStore.pushAsync("broker3", initBrokerData(60, 100));
-        brokerLoadDataStore.pushAsync("broker4", initBrokerData(5, 100));
+        brokerLoadDataStore.pushAsync("1", createBrokerData(ctx, 10, 100));
+        brokerLoadDataStore.pushAsync("2", createBrokerData(ctx, 30, 100));
+        brokerLoadDataStore.pushAsync("3", createBrokerData(ctx, 60, 100));
+        brokerLoadDataStore.pushAsync("4", createBrokerData(ctx, 5, 100));
 
         return ctx;
     }
@@ -76,85 +75,65 @@ public class LeastResourceUsageWithWeightTest {
     public void testSelect() {
 
         var ctx = setupContext();
-        ServiceConfiguration conf = ctx.brokerConfiguration();
-        conf.setLoadBalancerCPUResourceWeight(1.0);
-        conf.setLoadBalancerMemoryResourceWeight(0.1);
-        conf.setLoadBalancerDirectMemoryResourceWeight(0.1);
-        conf.setLoadBalancerBandwithInResourceWeight(1.0);
-        conf.setLoadBalancerBandwithOutResourceWeight(1.0);
-        conf.setLoadBalancerHistoryResourcePercentage(0.5);
-        
conf.setLoadBalancerAverageResourceUsageDifferenceThresholdPercentage(5);
-
         LeastResourceUsageWithWeight strategy = new 
LeastResourceUsageWithWeight();
 
-
-        var brokers = ctx.brokerLoadDataStore().entrySet().stream()
-                .map(e -> e.getKey()).collect(Collectors.toList());
-        // Make brokerAvgResourceUsageWithWeight contain broker4.
-        strategy.select(brokers, bundleData, ctx);
-
         // Should choice broker from broker1 2 3.
         List<String> candidates = new ArrayList<>();
-        candidates.add("broker1");
-        candidates.add("broker2");
-        candidates.add("broker3");
+        candidates.add("1");
+        candidates.add("2");
+        candidates.add("3");
 
-        assertEquals(strategy.select(candidates, bundleData, ctx), 
Optional.of("broker1"));
+        assertEquals(strategy.select(candidates, bundleData, ctx), 
Optional.of("1"));
 
         var brokerLoadDataStore = ctx.brokerLoadDataStore();
-        brokerLoadDataStore.pushAsync("broker1", initBrokerData(20, 100));
-        brokerLoadDataStore.pushAsync("broker2", initBrokerData(30, 100));
-        brokerLoadDataStore.pushAsync("broker3", initBrokerData(50, 100));
-        brokerLoadDataStore.pushAsync("broker4", null);
+        brokerLoadDataStore.pushAsync("1", createBrokerData(ctx, 20, 100));
+        brokerLoadDataStore.pushAsync("2", createBrokerData(ctx, 30, 100));
+        brokerLoadDataStore.pushAsync("3", createBrokerData(ctx, 50, 100));
+
+        assertEquals(strategy.select(candidates, bundleData, ctx), 
Optional.of("1"));
 
-        assertEquals(strategy.select(candidates, bundleData, ctx), 
Optional.of("broker1"));
+        updateLoad(ctx, "1", 30);
+        updateLoad(ctx, "2", 30);
+        updateLoad(ctx, "3", 40);
 
-        brokerLoadDataStore.pushAsync("broker1", initBrokerData(30, 100));
-        brokerLoadDataStore.pushAsync("broker2", initBrokerData(30, 100));
-        brokerLoadDataStore.pushAsync("broker3", initBrokerData(40, 100));
+        assertEquals(strategy.select(candidates, bundleData, ctx), 
Optional.of("1"));
 
-        assertEquals(strategy.select(candidates, bundleData, ctx), 
Optional.of("broker1"));
+        updateLoad(ctx, "1", 30);
+        updateLoad(ctx, "2", 30);
+        updateLoad(ctx, "3", 40);
 
-        brokerLoadDataStore.pushAsync("broker1", initBrokerData(30, 100));
-        brokerLoadDataStore.pushAsync("broker2", initBrokerData(30, 100));
-        brokerLoadDataStore.pushAsync("broker3", initBrokerData(40, 100));
+        assertEquals(strategy.select(candidates, bundleData, ctx), 
Optional.of("1"));
 
-        assertEquals(strategy.select(candidates, bundleData, ctx), 
Optional.of("broker1"));
+        updateLoad(ctx, "1", 35);
+        updateLoad(ctx, "2", 20);
+        updateLoad(ctx, "3", 45);
 
-        brokerLoadDataStore.pushAsync("broker1", initBrokerData(35, 100));
-        brokerLoadDataStore.pushAsync("broker2", initBrokerData(20, 100));
-        brokerLoadDataStore.pushAsync("broker3", initBrokerData(45, 100));
+        assertEquals(strategy.select(candidates, bundleData, ctx), 
Optional.of("2"));
 
-        assertEquals(strategy.select(candidates, bundleData, ctx), 
Optional.of("broker2"));
+        // test restart broker can load bundle as one of the best brokers.
+        updateLoad(ctx, "1", 35);
+        updateLoad(ctx, "2", 20);
+        brokerLoadDataStore.pushAsync("3", createBrokerData(ctx, 0, 100));
+
+        assertEquals(strategy.select(candidates, bundleData, ctx), 
Optional.of("3"));
     }
 
     public void testArithmeticException()
             throws NoSuchFieldException, IllegalAccessException {
         var ctx = setupContext();
-        var conf = ctx.brokerConfiguration();
-        conf.setLoadBalancerCPUResourceWeight(1.0);
-        conf.setLoadBalancerMemoryResourceWeight(0.1);
-        conf.setLoadBalancerDirectMemoryResourceWeight(0.1);
-        conf.setLoadBalancerBandwithInResourceWeight(1.0);
-        conf.setLoadBalancerBandwithOutResourceWeight(1.0);
-        conf.setLoadBalancerHistoryResourcePercentage(0.5);
-        
conf.setLoadBalancerAverageResourceUsageDifferenceThresholdPercentage(5);
-
+        var brokerLoadStore = ctx.brokerLoadDataStore();
         LeastResourceUsageWithWeight strategy = new 
LeastResourceUsageWithWeight();
 
         // Should choice broker from broker1 2 3.
         List<String> candidates = new ArrayList<>();
-        candidates.add("broker1");
-        candidates.add("broker2");
-        candidates.add("broker3");
-        Field strategyUpdater = 
LeastResourceUsageWithWeight.class.getDeclaredField("brokerAvgResourceUsageWithWeight");
-        strategyUpdater.setAccessible(true);
-        Map<String, Double> brokerAvgResourceUsageWithWeight = new HashMap<>();
-        brokerAvgResourceUsageWithWeight.put("broker1", 0.1d);
-        brokerAvgResourceUsageWithWeight.put("broker2", 0.3d);
-        brokerAvgResourceUsageWithWeight.put("broker4", 0.05d);
-        strategyUpdater.set(strategy, brokerAvgResourceUsageWithWeight);
-        assertEquals(strategy.select(candidates, bundleData, ctx), 
Optional.of("broker1"));
+        candidates.add("1");
+        candidates.add("2");
+        candidates.add("3");
+
+        FieldUtils.writeDeclaredField(brokerLoadStore.get("1").get(), 
"weightedMaxEMA", 0.1d, true);
+        FieldUtils.writeDeclaredField(brokerLoadStore.get("2").get(), 
"weightedMaxEMA", 0.3d, true);
+        FieldUtils.writeDeclaredField(brokerLoadStore.get("4").get(), 
"weightedMaxEMA", 0.05d, true);
+        assertEquals(strategy.select(candidates, bundleData, ctx), 
Optional.of("1"));
     }
 
     public void testNoLoadDataBrokers() {
@@ -164,42 +143,62 @@ public class LeastResourceUsageWithWeightTest {
 
         List<String> candidates = new ArrayList<>();
         var brokerLoadDataStore = ctx.brokerLoadDataStore();
-        brokerLoadDataStore.pushAsync("broker1", initBrokerData(50, 100));
-        brokerLoadDataStore.pushAsync("broker2", initBrokerData(100, 100));
-        brokerLoadDataStore.pushAsync("broker3", null);
-        brokerLoadDataStore.pushAsync("broker4", null);
-        candidates.add("broker1");
-        candidates.add("broker2");
-        candidates.add("broker5");
+        brokerLoadDataStore.pushAsync("1", createBrokerData(ctx,50, 100));
+        brokerLoadDataStore.pushAsync("2", createBrokerData(ctx,100, 100));
+        brokerLoadDataStore.pushAsync("3", null);
+        brokerLoadDataStore.pushAsync("4", null);
+        candidates.add("1");
+        candidates.add("2");
+        candidates.add("5");
         var result = strategy.select(candidates, bundleData, ctx).get();
-        assertEquals(result, "broker1");
+        assertEquals(result, "1");
 
         strategy = new LeastResourceUsageWithWeight();
-        brokerLoadDataStore.pushAsync("broker1", initBrokerData(100, 100));
+        brokerLoadDataStore.pushAsync("1", createBrokerData(ctx,100, 100));
         result = strategy.select(candidates, bundleData, ctx).get();
-        assertThat(result, anyOf(equalTo("broker1"), equalTo("broker2"), 
equalTo("broker5")));
+        assertThat(result, anyOf(equalTo("1"), equalTo("2"), equalTo("5")));
 
-        brokerLoadDataStore.pushAsync("broker1", null);
-        brokerLoadDataStore.pushAsync("broker2", null);
+        brokerLoadDataStore.pushAsync("1", null);
+        brokerLoadDataStore.pushAsync("2", null);
 
         result = strategy.select(candidates, bundleData, ctx).get();
-        assertThat(result, anyOf(equalTo("broker1"), equalTo("broker2"), 
equalTo("broker5")));
+        assertThat(result, anyOf(equalTo("1"), equalTo("2"), equalTo("5")));
     }
 
 
-    private BrokerLoadData initBrokerData(double usage, double limit) {
+    private BrokerLoadData createBrokerData(LoadManagerContext ctx, double 
usage, double limit) {
         var brokerLoadData = new BrokerLoadData();
-        brokerLoadData.setCpu(new ResourceUsage(usage, limit));
-        brokerLoadData.setMemory(new ResourceUsage(usage, limit));
-        brokerLoadData.setDirectMemory(new ResourceUsage(usage, limit));
-        brokerLoadData.setBandwidthIn(new ResourceUsage(usage, limit));
-        brokerLoadData.setBandwidthOut(new ResourceUsage(usage, limit));
+        SystemResourceUsage usages = createUsage(usage, limit);
+        brokerLoadData.update(usages, 1, 1, 1, 1,
+                ctx.brokerConfiguration());
         return brokerLoadData;
     }
 
+    private SystemResourceUsage createUsage(double usage, double limit) {
+        SystemResourceUsage usages = new SystemResourceUsage();
+        usages.setCpu(new ResourceUsage(usage, limit));
+        usages.setMemory(new ResourceUsage(usage, limit));
+        usages.setDirectMemory(new ResourceUsage(usage, limit));
+        usages.setBandwidthIn(new ResourceUsage(usage, limit));
+        usages.setBandwidthOut(new ResourceUsage(usage, limit));
+        return usages;
+    }
+
+    private void updateLoad(LoadManagerContext ctx, String broker, double 
usage) {
+        ctx.brokerLoadDataStore().get(broker).get().update(createUsage(usage, 
100.0),
+                1, 1, 1, 1, ctx.brokerConfiguration());
+    }
+
     public LoadManagerContext getContext() {
         var ctx = mock(LoadManagerContext.class);
         var conf = new ServiceConfiguration();
+        conf.setLoadBalancerCPUResourceWeight(1.0);
+        conf.setLoadBalancerMemoryResourceWeight(0.1);
+        conf.setLoadBalancerDirectMemoryResourceWeight(0.1);
+        conf.setLoadBalancerBandwithInResourceWeight(1.0);
+        conf.setLoadBalancerBandwithOutResourceWeight(1.0);
+        conf.setLoadBalancerHistoryResourcePercentage(0.5);
+        
conf.setLoadBalancerAverageResourceUsageDifferenceThresholdPercentage(5);
         var brokerLoadDataStore = new LoadDataStore<BrokerLoadData>() {
             Map<String, BrokerLoadData> map = new HashMap<>();
             @Override


Reply via email to