heesung-sn commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r912527219


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least 
resource usage with weight.
+ * This strategy takes into account the historical load percentage and 
short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements 
ModularLoadManagerStrategy {
+    private static Logger log = 
LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // A broker's max resource usage with weight using its historical load and 
short-term load data with weight.
+    private double getMaxResourceUsageWithWeight(final String broker, final 
BrokerData brokerData,
+                                         final ServiceConfiguration conf) {
+        final double overloadThreshold = 
conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
+        final double maxUsageWithWeight =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, 
conf);
+
+        if (maxUsageWithWeight > overloadThreshold) {
+            final LocalBrokerData localData = brokerData.getLocalData();
+            log.warn(
+                    "Broker {} is overloaded, max resource usage with weight 
percentage: {}%, "
+                            + "CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, 
BANDWIDTH IN: {}%, "
+                            + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY 
weight: {}, DIRECT MEMORY weight: {}, "
+                            + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: 
{}",
+                    broker, maxUsageWithWeight * 100,
+                    localData.getCpu().percentUsage(), 
localData.getMemory().percentUsage(),
+                    localData.getDirectMemory().percentUsage(), 
localData.getBandwidthIn().percentUsage(),
+                    localData.getBandwidthOut().percentUsage(), 
conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), 
conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight 
percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), 
maxUsageWithWeight * 100);
+        }
+        return maxUsageWithWeight;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according 
to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, 
BrokerData brokerData,
+                                                          ServiceConfiguration 
conf) {
+        final double historyPercentage = 
conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = 
brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - 
historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history 
resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: 
{}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, 
conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), 
conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     * This method is not thread safety.
+     *
+     * @param candidates     The candidates for which the bundle may be 
assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData 
bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+        double totalUsage = 0.0d;
+        for (String broker : candidates) {
+            BrokerData brokerData = loadData.getBrokerData().get(broker);
+            double usageWithWeight = getMaxResourceUsageWithWeight(broker, 
brokerData, conf);
+            totalUsage += usageWithWeight;
+        }
+
+        final double avgUsage = totalUsage / candidates.size();
+        final double diffThreshold =
+                
conf.getLoadBalancerAverageResourceUsageDifferenceThresholdShedderPercentage() 
/ 100.0;
+        brokerAvgResourceUsageWithWeight.forEach((broker, avgResUsage) -> {
+            if (avgResUsage + diffThreshold <= avgUsage) {

Review Comment:
   It is good we have a fallback case that just randomly selects any broker, 
when empty.
   
   But threshold=-10 {b1:1, b2:25, b3:25,b4=25 ... b10=25}, then 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to