This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 86ee2c3b5 Enable computation of evenness score based on a preferred 
scoring key (#2738)
86ee2c3b5 is described below

commit 86ee2c3b5165da12f0acca8c7756848f9d269e11
Author: Charanya Sudharsanan <[email protected]>
AuthorDate: Fri Jan 26 18:28:37 2024 -0800

    Enable computation of evenness score based on a preferred scoring key 
(#2738)
    
    This PR provides a way for users to specify a list of PreferredScoringKeys 
that will help in computing evenness scores based on the preferred capacity 
keys only.
---
 .../MaxCapacityUsageInstanceConstraint.java        |  2 +-
 ...TopStateMaxCapacityUsageInstanceConstraint.java |  2 +-
 .../rebalancer/waged/model/AssignableNode.java     | 54 +++++++++++++++++--
 .../rebalancer/waged/model/ClusterContext.java     | 61 +++++++++++++++++++---
 .../waged/model/ClusterModelProvider.java          |  2 +-
 .../java/org/apache/helix/model/ClusterConfig.java | 26 ++++++++-
 .../TestMaxCapacityUsageInstanceConstraint.java    | 22 +++++++-
 ...TopStateMaxCapacityUsageInstanceConstraint.java | 24 ++++++++-
 .../rebalancer/waged/model/TestClusterContext.java | 46 ++++++++++++++++
 9 files changed, 223 insertions(+), 16 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
index 7d74c2680..1a7e6521f 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
@@ -37,7 +37,7 @@ class MaxCapacityUsageInstanceConstraint extends 
UsageSoftConstraint {
       ClusterContext clusterContext) {
     float estimatedMaxUtilization = 
clusterContext.getEstimatedMaxUtilization();
     float projectedHighestUtilization =
-        node.getGeneralProjectedHighestUtilization(replica.getCapacity());
+        node.getGeneralProjectedHighestUtilization(replica.getCapacity(), 
clusterContext.getPreferredScoringKeys());
     return computeUtilizationScore(estimatedMaxUtilization, 
projectedHighestUtilization);
   }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java
index 145425398..53584727f 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java
@@ -41,7 +41,7 @@ class TopStateMaxCapacityUsageInstanceConstraint extends 
UsageSoftConstraint {
     }
     float estimatedTopStateMaxUtilization = 
clusterContext.getEstimatedTopStateMaxUtilization();
     float projectedHighestUtilization =
-        node.getTopStateProjectedHighestUtilization(replica.getCapacity());
+        node.getTopStateProjectedHighestUtilization(replica.getCapacity(), 
clusterContext.getPreferredScoringKeys());
     return computeUtilizationScore(estimatedTopStateMaxUtilization, 
projectedHighestUtilization);
   }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index b4e8c3c71..a364dd9d1 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -249,7 +249,27 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
    * @return The highest utilization number of the node among all the capacity 
category.
    */
   public float getGeneralProjectedHighestUtilization(Map<String, Integer> 
newUsage) {
-    return getProjectedHighestUtilization(newUsage, _remainingCapacity);
+    return getProjectedHighestUtilization(newUsage, _remainingCapacity, null);
+  }
+
+  /**
+   * Return the most concerning capacity utilization number for evenly 
partition assignment.
+   * The method dynamically calculates the projected highest utilization 
number among all the
+   * capacity categories assuming the new capacity usage is added to the node.
+   *
+   * If the list of preferredScoringKeys is specified then utilization number 
is computed based op the
+   * specified capacity category (keys) in the list only.
+   *
+   * For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 
0.6}, preferredScoringKeys: [ CPU ]
+   * Then this call shall return 0.9.
+   *
+   * @param newUsage            the proposed new additional capacity usage.
+   * @param preferredScoringKeys if provided, the capacity utilization will be 
calculated based on
+   *                            the supplied keys only, else across all 
capacity categories.
+   * @return The highest utilization number of the node among the specified 
capacity category.
+   */
+  public float getGeneralProjectedHighestUtilization(Map<String, Integer> 
newUsage, List<String> preferredScoringKeys) {
+    return getProjectedHighestUtilization(newUsage, _remainingCapacity, 
preferredScoringKeys);
   }
 
   /**
@@ -263,13 +283,39 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
    * @return The highest utilization number of the node among all the capacity 
category.
    */
   public float getTopStateProjectedHighestUtilization(Map<String, Integer> 
newUsage) {
-    return getProjectedHighestUtilization(newUsage, 
_remainingTopStateCapacity);
+    return getProjectedHighestUtilization(newUsage, 
_remainingTopStateCapacity, null);
+  }
+
+  /**
+   * Return the most concerning capacity utilization number for evenly 
partition assignment.
+   * The method dynamically calculates the projected highest utilization 
number among all the
+   * capacity categories assuming the new capacity usage is added to the node.
+   *
+   * If the list of preferredScoringKeys is specified then utilization number 
is computed based op the
+   * specified capacity category (keys) in the list only.
+   *
+   * For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 
0.6}, preferredScoringKeys: [ CPU ]
+   * Then this call shall return 0.9.
+   *
+   * This function returns projected highest utilization for only top state 
partitions.
+   *
+   * @param newUsage            the proposed new additional capacity usage.
+   * @param preferredScoringKeys if provided, the capacity utilization will be 
calculated based on
+   *                            the supplied keys only, else across all 
capacity categories.
+   * @return The highest utilization number of the node among all the capacity 
category.
+   */
+  public float getTopStateProjectedHighestUtilization(Map<String, Integer> 
newUsage, List<String> preferredScoringKeys) {
+    return getProjectedHighestUtilization(newUsage, 
_remainingTopStateCapacity, preferredScoringKeys);
   }
 
   private float getProjectedHighestUtilization(Map<String, Integer> newUsage,
-      Map<String, Integer> remainingCapacity) {
+      Map<String, Integer> remainingCapacity, List<String> 
preferredScoringKeys) {
+    Set<String> capacityKeySet = _maxAllowedCapacity.keySet();
+    if (preferredScoringKeys != null && preferredScoringKeys.size() != 0 && 
capacityKeySet.contains(preferredScoringKeys.get(0))) {
+      capacityKeySet = 
preferredScoringKeys.stream().collect(Collectors.toSet());
+    }
     float highestCapacityUtilization = 0;
-    for (String capacityKey : _maxAllowedCapacity.keySet()) {
+    for (String capacityKey : capacityKeySet) {
       float capacityValue = _maxAllowedCapacity.get(capacityKey);
       float utilization = (capacityValue - remainingCapacity.get(capacityKey) 
+ newUsage
           .getOrDefault(capacityKey, 0)) / capacityValue;
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
index 07e33df16..1b3ad2a87 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
@@ -24,17 +24,22 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.helix.HelixException;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ResourceAssignment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * This class tracks the rebalance-related global cluster status.
  */
 public class ClusterContext {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ClusterContext.class.getName());
   // This estimation helps to ensure global partition count evenness
   private final int _estimatedMaxPartitionCount;
   // This estimation helps to ensure global top state replica count evenness
@@ -57,20 +62,29 @@ public class ClusterContext {
   private final Map<String, Integer> _estimateUtilizationMap;
   // Cluster total capacity. Used to compute score when sorting replicas.
   private final Map<String, Integer> _clusterCapacityMap;
-
+  private final List<String> _preferredScoringKeys;
+  private final String _clusterName;
   /**
    * Construct the cluster context based on the current instance status.
    * @param replicaSet All the partition replicas that are managed by the 
rebalancer
    * @param nodeSet All the active nodes that are managed by the rebalancer
    */
   ClusterContext(Set<AssignableReplica> replicaSet, Set<AssignableNode> 
nodeSet,
-      Map<String, ResourceAssignment> baselineAssignment, Map<String, 
ResourceAssignment> bestPossibleAssignment) {
+                 Map<String, ResourceAssignment> baselineAssignment, 
Map<String, ResourceAssignment> bestPossibleAssignment) {
+    this(replicaSet, nodeSet, baselineAssignment, bestPossibleAssignment, 
null);
+  }
+
+  ClusterContext(Set<AssignableReplica> replicaSet, Set<AssignableNode> 
nodeSet,
+                 Map<String, ResourceAssignment> baselineAssignment, 
Map<String, ResourceAssignment> bestPossibleAssignment,
+                 ClusterConfig clusterConfig) {
     int instanceCount = nodeSet.size();
     int totalReplicas = 0;
     int totalTopStateReplicas = 0;
     Map<String, Integer> totalUsage = new HashMap<>();
     Map<String, Integer> totalTopStateUsage = new HashMap<>();
     Map<String, Integer> totalCapacity = new HashMap<>();
+    _preferredScoringKeys = 
Optional.ofNullable(clusterConfig).map(ClusterConfig::getPreferredScoringKeys).orElse(null);
+    _clusterName = 
Optional.ofNullable(clusterConfig).map(ClusterConfig::getClusterName).orElse(null);
 
     for (Map.Entry<String, List<AssignableReplica>> entry : replicaSet.stream()
         .collect(Collectors.groupingBy(AssignableReplica::getResourceName))
@@ -103,17 +117,31 @@ public class ClusterContext {
       _estimateUtilizationMap = Collections.emptyMap();
       _clusterCapacityMap = Collections.emptyMap();
     } else {
-      _estimatedMaxUtilization = estimateMaxUtilization(totalCapacity, 
totalUsage);
-      _estimatedTopStateMaxUtilization = estimateMaxUtilization(totalCapacity, 
totalTopStateUsage);
+      _estimatedMaxUtilization = estimateMaxUtilization(totalCapacity, 
totalUsage, _preferredScoringKeys);
+      _estimatedTopStateMaxUtilization = estimateMaxUtilization(totalCapacity, 
totalTopStateUsage, _preferredScoringKeys);
       _estimateUtilizationMap = estimateUtilization(totalCapacity, totalUsage);
       _clusterCapacityMap = Collections.unmodifiableMap(totalCapacity);
     }
+    LOG.info(
+        "clusterName: {}, preferredScoringKeys: {}, estimatedMaxUtilization: 
{}, estimatedTopStateMaxUtilization: {}",
+        _clusterName, _preferredScoringKeys, _estimatedMaxUtilization,
+        _estimatedTopStateMaxUtilization);
     _estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, 
instanceCount);
     _estimatedMaxTopStateCount = 
estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
     _baselineAssignment = baselineAssignment;
     _bestPossibleAssignment = bestPossibleAssignment;
   }
 
+
+  /**
+   * Get List of preferred scoring keys if set.
+   *
+   * @return PreferredScoringKeys which is used in computation of evenness 
score
+   */
+  public List<String> getPreferredScoringKeys() {
+    return _preferredScoringKeys;
+  }
+
   public Map<String, ResourceAssignment> getBaselineAssignment() {
     return _baselineAssignment == null || _baselineAssignment.isEmpty() ? 
Collections.emptyMap() : _baselineAssignment;
   }
@@ -190,10 +218,31 @@ public class ClusterContext {
     return (int) Math.floor((float) replicaCount / instanceCount);
   }
 
+  /**
+   * Estimates the max utilization number from all capacity categories and 
their usages.
+   * If the list of preferredScoringKeys is specified then max utilization 
number is computed based op the
+   * specified capacity category (keys) in the list only.
+   *
+   * For example, if totalCapacity is {CPU: 0.6, MEM: 0.7, DISK: 0.9}, 
totalUsage is {CPU: 0.1, MEM: 0.2, DISK: 0.3},
+   * preferredScoringKeys: [ CPU ]. Then this call shall return 0.16. If 
preferredScoringKeys
+   * is not specified, this call returns 0.33 which would be the max 
utilization for the DISK.
+   *
+   * @param totalCapacity        Sum total of max capacity of all active nodes 
managed by a rebalancer
+   * @param totalUsage           Sum total of capacity usage of all partition 
replicas that are managed by the rebalancer
+   * @param preferredScoringKeys if provided, the max utilization will be 
calculated based on
+   *                             the supplied keys only, else across all 
capacity categories.
+   * @return The max utilization number from the specified capacity categories.
+   */
+
   private static float estimateMaxUtilization(Map<String, Integer> 
totalCapacity,
-      Map<String, Integer> totalUsage) {
+                                              Map<String, Integer> totalUsage,
+                                              List<String> 
preferredScoringKeys) {
     float estimatedMaxUsage = 0;
-    for (String capacityKey : totalCapacity.keySet()) {
+    Set<String> capacityKeySet = totalCapacity.keySet();
+    if (preferredScoringKeys != null && preferredScoringKeys.size() != 0 && 
capacityKeySet.contains(preferredScoringKeys.get(0))) {
+      capacityKeySet = 
preferredScoringKeys.stream().collect(Collectors.toSet());
+    }
+    for (String capacityKey : capacityKeySet) {
       int maxCapacity = totalCapacity.get(capacityKey);
       int usage = totalUsage.getOrDefault(capacityKey, 0);
       float utilization = (maxCapacity == 0) ? 1 : (float) usage / maxCapacity;
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index dc825f2f7..ddd9880c0 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -268,7 +268,7 @@ public class ClusterModelProvider {
     // Construct and initialize cluster context.
     ClusterContext context = new ClusterContext(
         
replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()),
-        assignableNodes, logicalIdIdealAssignment, logicalIdCurrentAssignment);
+        assignableNodes, logicalIdIdealAssignment, logicalIdCurrentAssignment, 
dataProvider.getClusterConfig());
 
     // Initial the cluster context with the allocated assignments.
     
context.setAssignmentForFaultZoneMap(mapAssignmentToFaultZone(assignableNodes));
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java 
b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index e33b90204..628962166 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -154,7 +154,10 @@ public class ClusterConfig extends HelixProperty {
     HELIX_DISABLED_TYPE,
 
     // The last time when the on-demand rebalance is triggered.
-    LAST_ON_DEMAND_REBALANCE_TIMESTAMP
+    LAST_ON_DEMAND_REBALANCE_TIMESTAMP,
+
+    // List of Preferred scoring keys used in evenness score computation
+    PREFERRED_SCORING_KEYS
   }
 
   public enum GlobalRebalancePreferenceKey {
@@ -1198,4 +1201,25 @@ public class ClusterConfig extends HelixProperty {
     
_record.setLongField(ClusterConfigProperty.LAST_ON_DEMAND_REBALANCE_TIMESTAMP.name(),
         rebalanceTimestamp);
   }
+
+  /**
+   * Get the list of preferred scoring keys if set.
+   *
+   * @return PreferredScoringKeys that is used in computation of evenness score
+   */
+  public List<String> getPreferredScoringKeys() {
+    return 
_record.getListField(ClusterConfigProperty.PREFERRED_SCORING_KEYS.name());
+  }
+
+  /**
+   * Set list of preferred scoring keys for cluster.
+   * preferredScoringKeys is set as a List to make it generic and accommodate 
any future use case.
+   * preferredScoringKeys will be a singleton list for current use case.
+   *
+   * @param preferredScoringKeys value used in evenness score computation
+   */
+  public void setPreferredScoringKeys(List<String> preferredScoringKeys) {
+    _record.setListField(ClusterConfigProperty.PREFERRED_SCORING_KEYS.name(),
+        preferredScoringKeys);
+  }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
index f08371ad5..83faf42bc 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
@@ -19,6 +19,9 @@ package 
org.apache.helix.controller.rebalancer.waged.constraints;
  * under the License.
  */
 
+import java.util.Collections;
+import java.util.List;
+
 import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
 import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
@@ -29,6 +32,8 @@ import org.testng.annotations.Test;
 import static org.mockito.Matchers.anyMap;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.any;
 
 public class TestMaxCapacityUsageInstanceConstraint {
   private AssignableReplica _testReplica;
@@ -45,7 +50,7 @@ public class TestMaxCapacityUsageInstanceConstraint {
 
   @Test
   public void testGetNormalizedScore() {
-    
when(_testNode.getGeneralProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
+    when(_testNode.getGeneralProjectedHighestUtilization(anyMap(), 
any())).thenReturn(0.8f);
     when(_clusterContext.getEstimatedMaxUtilization()).thenReturn(1f);
     double score = _constraint.getAssignmentScore(_testNode, _testReplica, 
_clusterContext);
     // Convert to float so as to compare with equal.
@@ -54,4 +59,19 @@ public class TestMaxCapacityUsageInstanceConstraint {
         _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, 
_clusterContext);
     Assert.assertTrue(normalizedScore > 0.99);
   }
+
+  @Test
+  public void testGetNormalizedScoreWithPreferredScoringKey() {
+    List<String> preferredScoringKeys = Collections.singletonList("CU");
+    when(_testNode.getGeneralProjectedHighestUtilization(anyMap(),
+        eq(preferredScoringKeys))).thenReturn(0.5f);
+    
when(_clusterContext.getPreferredScoringKeys()).thenReturn(preferredScoringKeys);
+    when(_clusterContext.getEstimatedMaxUtilization()).thenReturn(1f);
+    double score = _constraint.getAssignmentScore(_testNode, _testReplica, 
_clusterContext);
+    // Convert to float so as to compare with equal.
+    Assert.assertEquals((float) score,0.5f);
+    double normalizedScore =
+            _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, 
_clusterContext);
+    Assert.assertTrue(normalizedScore > 0.99);
+  }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java
index 947d0a18a..8150931fd 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java
@@ -19,6 +19,9 @@ package 
org.apache.helix.controller.rebalancer.waged.constraints;
  * under the License.
  */
 
+import java.util.Collections;
+import java.util.List;
+
 import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
 import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
@@ -26,6 +29,8 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Matchers.anyMap;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -47,7 +52,7 @@ public class TestTopStateMaxCapacityUsageInstanceConstraint {
   @Test
   public void testGetNormalizedScore() {
     when(_testReplica.isReplicaTopState()).thenReturn(true);
-    
when(_testNode.getTopStateProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
+    when(_testNode.getTopStateProjectedHighestUtilization(anyMap(), 
any())).thenReturn(0.8f);
     when(_clusterContext.getEstimatedTopStateMaxUtilization()).thenReturn(1f);
     double score = _constraint.getAssignmentScore(_testNode, _testReplica, 
_clusterContext);
     // Convert to float so as to compare with equal.
@@ -56,4 +61,21 @@ public class TestTopStateMaxCapacityUsageInstanceConstraint {
         _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, 
_clusterContext);
     Assert.assertTrue(normalizedScore > 0.99);
   }
+
+
+  @Test
+  public void testGetNormalizedScoreWithPreferredScoringKey() {
+    List<String> preferredScoringKeys = Collections.singletonList("CU");
+    when(_testReplica.isReplicaTopState()).thenReturn(true);
+    when(_testNode.getTopStateProjectedHighestUtilization(anyMap(),
+        eq(preferredScoringKeys))).thenReturn(0.5f);
+    
when(_clusterContext.getPreferredScoringKeys()).thenReturn(preferredScoringKeys);
+    when(_clusterContext.getEstimatedTopStateMaxUtilization()).thenReturn(1f);
+    double score = _constraint.getAssignmentScore(_testNode, _testReplica, 
_clusterContext);
+    // Convert to float so as to compare with equal.
+    Assert.assertEquals((float) score,0.5f);
+    double normalizedScore =
+            _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, 
_clusterContext);
+    Assert.assertTrue(normalizedScore > 0.99);
+  }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
index 7171755e9..7a9fe8a93 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
@@ -22,14 +22,18 @@ package org.apache.helix.controller.rebalancer.waged.model;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.helix.HelixException;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 public class TestClusterContext extends AbstractTestClusterModel {
@@ -94,4 +98,46 @@ public class TestClusterContext extends 
AbstractTestClusterModel {
     context
         .addPartitionToFaultZone(_testFaultZoneId, _resourceNames.get(0), 
_partitionNames.get(0));
   }
+
+  @DataProvider(name = "preferredScoringKeys")
+  public static Object[][] preferredScoringKeys() {
+    return new Object[][]{
+            {Collections.singletonList("item1")},//valid key
+            {Collections.singletonList("item3")},//valid key
+            {Collections.singletonList("item-x")},//invalid key
+            {null}
+    };
+  }
+
+  @Test(dataProvider = "preferredScoringKeys")
+  public void testEstimateMaxUtilization(List<String> preferredScoringKeys) 
throws IOException {
+    ResourceControllerDataProvider testCache = setupClusterDataCache();
+    Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
+    ClusterConfig clusterConfig = testCache.getClusterConfig();
+    clusterConfig.setPreferredScoringKeys(preferredScoringKeys);
+    ClusterContext context =
+            new ClusterContext(assignmentSet, generateNodes(testCache), new 
HashMap<>(),
+                    new HashMap<>(), clusterConfig);
+    /**
+     * Total Capacity and Total Usage values calculated from nodeSet and 
replicaSet above are as follows:
+     * TotalCapacity : {"item1",20, "item2",40, "item3",30}
+     * TotalUsage : {"item1",16, "item2",32, "item3",0}
+     * Using these values to validate the results of estimateMaxUtilization.
+     */
+
+    validateResult(ImmutableMap.of("item1", 20, "item2", 40, "item3", 30),
+            ImmutableMap.of("item1", 16, "item2", 32, "item3", 0),
+            preferredScoringKeys, context.getEstimatedMaxUtilization());
+  }
+
+  private void validateResult(Map<String, Integer> totalCapacity, Map<String, 
Integer> totalUsage,
+                              List<String> preferredScoringKeys, float 
actualEstimatedMaxUtilization) {
+    if (preferredScoringKeys == null || preferredScoringKeys.size() == 0 || 
!totalCapacity.keySet().contains(preferredScoringKeys.get(0))) {
+      //estimatedMaxUtilization calculated from all capacity keys
+      Assert.assertEquals(actualEstimatedMaxUtilization, 0.8f);
+      return;
+    }
+    //estimatedMaxUtilization calculated using preferredScoringKey only.
+    Assert.assertEquals(actualEstimatedMaxUtilization, (float) 
totalUsage.get(preferredScoringKeys.get(0)) / 
totalCapacity.get(preferredScoringKeys.get(0)));
+  }
 }

Reply via email to