This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedImprove
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/wagedImprove by this push:
     new 128ccde  Add TopStateUsage constraint to Waged (#1652)
128ccde is described below

commit 128ccded98295638320e45e148cb556bc61591c9
Author: Neal Sun <[email protected]>
AuthorDate: Mon Mar 1 13:40:04 2021 -0800

    Add TopStateUsage constraint to Waged (#1652)
    
    Add new top state weight-based constraint to Waged to ensure top state 
weight evenness.
    
    Co-authored-by: Neal Sun <[email protected]>
---
 .../ConstraintBasedAlgorithmFactory.java           |  7 +--
 .../MaxCapacityUsageInstanceConstraint.java        |  3 +-
 ...opStateMaxCapacityUsageInstanceConstraint.java} | 19 ++++---
 .../rebalancer/waged/model/AssignableNode.java     | 61 ++++++++++++++++------
 .../rebalancer/waged/model/ClusterContext.java     | 36 ++++++++++---
 .../stages/CurrentStateComputationStage.java       |  2 +-
 .../TestMaxCapacityUsageInstanceConstraint.java    |  2 +-
 ...opStateMaxCapacityUsageInstanceConstraint.java} | 12 +++--
 .../rebalancer/waged/model/TestAssignableNode.java | 12 +++--
 .../rebalancer/waged/model/TestClusterContext.java |  4 ++
 .../WagedRebalancer/TestWagedRebalance.java        | 10 ++++
 11 files changed, 123 insertions(+), 45 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
index 934bfa7..33aa6c8 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
@@ -41,8 +41,8 @@ public class ConstraintBasedAlgorithmFactory {
       put(PartitionMovementConstraint.class.getSimpleName(), 2f);
       put(InstancePartitionsCountConstraint.class.getSimpleName(), 1f);
       put(ResourcePartitionAntiAffinityConstraint.class.getSimpleName(), 1f);
-      put(ResourceTopStateAntiAffinityConstraint.class.getSimpleName(), 3f);
-      put(MaxCapacityUsageInstanceConstraint.class.getSimpleName(), 5f);
+      put(TopStateMaxCapacityUsageInstanceConstraint.class.getSimpleName(), 
3f);
+      put(MaxCapacityUsageInstanceConstraint.class.getSimpleName(), 6f);
     }
   };
 
@@ -69,7 +69,8 @@ public class ConstraintBasedAlgorithmFactory {
     List<SoftConstraint> softConstraints = ImmutableList
         .of(new PartitionMovementConstraint(), new 
InstancePartitionsCountConstraint(),
             new ResourcePartitionAntiAffinityConstraint(),
-            new ResourceTopStateAntiAffinityConstraint(), new 
MaxCapacityUsageInstanceConstraint());
+            new TopStateMaxCapacityUsageInstanceConstraint(),
+            new MaxCapacityUsageInstanceConstraint());
     Map<SoftConstraint, Float> softConstraintsWithWeight = 
Maps.toMap(softConstraints, key -> {
       String name = key.getClass().getSimpleName();
       float weight = MODEL.get(name);
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
index 8f41f5e..7d74c26 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
@@ -36,7 +36,8 @@ class MaxCapacityUsageInstanceConstraint extends 
UsageSoftConstraint {
   protected double getAssignmentScore(AssignableNode node, AssignableReplica 
replica,
       ClusterContext clusterContext) {
     float estimatedMaxUtilization = 
clusterContext.getEstimatedMaxUtilization();
-    float projectedHighestUtilization = 
node.getProjectedHighestUtilization(replica.getCapacity());
+    float projectedHighestUtilization =
+        node.getGeneralProjectedHighestUtilization(replica.getCapacity());
     return computeUtilizationScore(estimatedMaxUtilization, 
projectedHighestUtilization);
   }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java
similarity index 69%
copy from 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
copy to 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java
index 8f41f5e..1454253 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java
@@ -23,20 +23,25 @@ import 
org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
 import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
 
+
 /**
- * The constraint evaluates the score by checking the max used capacity key 
out of all the capacity
- * keys.
+ * Evaluate the proposed assignment according to the top state resource usage 
on the instance.
  * The higher the maximum usage value for the capacity key, the lower the 
score will be, implying
  * that it is that much less desirable to assign anything on the given node.
  * It is a greedy approach since it evaluates only on the most used capacity 
key.
  */
-class MaxCapacityUsageInstanceConstraint extends UsageSoftConstraint {
-
+class TopStateMaxCapacityUsageInstanceConstraint extends UsageSoftConstraint {
   @Override
   protected double getAssignmentScore(AssignableNode node, AssignableReplica 
replica,
       ClusterContext clusterContext) {
-    float estimatedMaxUtilization = 
clusterContext.getEstimatedMaxUtilization();
-    float projectedHighestUtilization = 
node.getProjectedHighestUtilization(replica.getCapacity());
-    return computeUtilizationScore(estimatedMaxUtilization, 
projectedHighestUtilization);
+    if (!replica.isReplicaTopState()) {
+      // For non top state replica, this constraint is not applicable.
+      // So return zero on any assignable node candidate.
+      return 0;
+    }
+    float estimatedTopStateMaxUtilization = 
clusterContext.getEstimatedTopStateMaxUtilization();
+    float projectedHighestUtilization =
+        node.getTopStateProjectedHighestUtilization(replica.getCapacity());
+    return computeUtilizationScore(estimatedTopStateMaxUtilization, 
projectedHighestUtilization);
   }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index d3d014d..aae2328 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -62,6 +62,7 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
   private Map<String, Map<String, AssignableReplica>> 
_currentAssignedReplicaMap;
   // A map of <capacity key, capacity value> that tracks the current available 
node capacity
   private Map<String, Integer> _remainingCapacity;
+  private Map<String, Integer> _remainingTopStateCapacity;
 
   /**
    * Update the node with a ClusterDataCache. This resets the current 
assignment and recalculates
@@ -81,6 +82,7 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
     // make a copy of max capacity
     _maxAllowedCapacity = ImmutableMap.copyOf(instanceCapacity);
     _remainingCapacity = new HashMap<>(instanceCapacity);
+    _remainingTopStateCapacity = new HashMap<>(instanceCapacity);
     _maxPartition = clusterConfig.getMaxPartitionsPerInstance();
     _currentAssignedReplicaMap = new HashMap<>();
   }
@@ -92,12 +94,18 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
    * Using this function avoids the overhead of updating capacity repeatedly.
    */
   void assignInitBatch(Collection<AssignableReplica> replicas) {
+    Map<String, Integer> totalTopStatePartitionCapacity = new HashMap<>();
     Map<String, Integer> totalPartitionCapacity = new HashMap<>();
     for (AssignableReplica replica : replicas) {
       // TODO: the exception could occur in the middle of for loop and the 
previous added records cannot be reverted
       addToAssignmentRecord(replica);
       // increment the capacity requirement according to partition's capacity 
configuration.
       for (Map.Entry<String, Integer> capacity : 
replica.getCapacity().entrySet()) {
+        if (replica.isReplicaTopState()) {
+          totalTopStatePartitionCapacity.compute(capacity.getKey(),
+              (key, totalValue) -> (totalValue == null) ? capacity.getValue()
+                  : totalValue + capacity.getValue());
+        }
         totalPartitionCapacity.compute(capacity.getKey(),
             (key, totalValue) -> (totalValue == null) ? capacity.getValue()
                 : totalValue + capacity.getValue());
@@ -105,9 +113,8 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
     }
 
     // Update the global state after all single replications' calculation is 
done.
-    for (String capacityKey : totalPartitionCapacity.keySet()) {
-      updateRemainingCapacity(capacityKey, 
totalPartitionCapacity.get(capacityKey));
-    }
+    updateRemainingCapacity(totalTopStatePartitionCapacity, 
_remainingTopStateCapacity, false);
+    updateRemainingCapacity(totalPartitionCapacity, _remainingCapacity, false);
   }
 
   /**
@@ -116,8 +123,10 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
    */
   void assign(AssignableReplica assignableReplica) {
     addToAssignmentRecord(assignableReplica);
-    assignableReplica.getCapacity().entrySet().stream()
-            .forEach(capacity -> updateRemainingCapacity(capacity.getKey(), 
capacity.getValue()));
+    updateRemainingCapacity(assignableReplica.getCapacity(), 
_remainingCapacity, false);
+    if (assignableReplica.isReplicaTopState()) {
+      updateRemainingCapacity(assignableReplica.getCapacity(), 
_remainingTopStateCapacity, false);
+    }
   }
 
   /**
@@ -146,8 +155,10 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
     }
 
     AssignableReplica removedReplica = partitionMap.remove(partitionName);
-    removedReplica.getCapacity().entrySet().stream()
-        .forEach(entry -> updateRemainingCapacity(entry.getKey(), -1 * 
entry.getValue()));
+    updateRemainingCapacity(removedReplica.getCapacity(), _remainingCapacity, 
true);
+    if (removedReplica.isReplicaTopState()) {
+      updateRemainingCapacity(removedReplica.getCapacity(), 
_remainingTopStateCapacity, true);
+    }
   }
 
   /**
@@ -228,11 +239,30 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
    * @param newUsage the proposed new additional capacity usage.
    * @return The highest utilization number of the node among all the capacity 
category.
    */
-  public float getProjectedHighestUtilization(Map<String, Integer> newUsage) {
+  public float getGeneralProjectedHighestUtilization(Map<String, Integer> 
newUsage) {
+    return getProjectedHighestUtilization(newUsage, _remainingCapacity);
+  }
+
+  /**
+   * Return the most concerning capacity utilization number for evenly 
partition assignment.
+   * The method dynamically calculates the projected highest utilization 
number among all the
+   * capacity categories assuming the new capacity usage is added to the node.
+   * For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 
0.6}. Then this call shall
+   * return 0.9.
+   * This function returns projected highest utilization for only top state 
partitions.
+   * @param newUsage the proposed new additional capacity usage.
+   * @return The highest utilization number of the node among all the capacity 
category.
+   */
+  public float getTopStateProjectedHighestUtilization(Map<String, Integer> 
newUsage) {
+    return getProjectedHighestUtilization(newUsage, 
_remainingTopStateCapacity);
+  }
+
+  private float getProjectedHighestUtilization(Map<String, Integer> newUsage,
+      Map<String, Integer> remainingCapacity) {
     float highestCapacityUtilization = 0;
     for (String capacityKey : _maxAllowedCapacity.keySet()) {
       float capacityValue = _maxAllowedCapacity.get(capacityKey);
-      float utilization = (capacityValue - _remainingCapacity.get(capacityKey) 
+ newUsage
+      float utilization = (capacityValue - remainingCapacity.get(capacityKey) 
+ newUsage
           .getOrDefault(capacityKey, 0)) / capacityValue;
       highestCapacityUtilization = Math.max(highestCapacityUtilization, 
utilization);
     }
@@ -311,13 +341,12 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
     }
   }
 
-  private void updateRemainingCapacity(String capacityKey, int usage) {
-    if (!_remainingCapacity.containsKey(capacityKey)) {
-      //if the capacityKey belongs to replicas does not exist in the 
instance's capacity,
-      // it will be treated as if it has unlimited capacity of that capacityKey
-      return;
-    }
-    _remainingCapacity.put(capacityKey, _remainingCapacity.get(capacityKey) - 
usage);
+  private void updateRemainingCapacity(Map<String, Integer> usedCapacity, 
Map<String, Integer> remainingCapacity,
+      boolean isRelease) {
+    int multiplier = isRelease ? -1 : 1;
+    // if the used capacity key does not exist in the node's capacity, ignore 
it
+    usedCapacity.forEach((capacityKey, capacityValue) -> 
remainingCapacity.compute(capacityKey,
+        (key, value) -> value == null ? null : value - multiplier * 
capacityValue));
   }
 
   /**
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
index 46392c9..5bfd4d0 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
@@ -43,6 +43,8 @@ public class ClusterContext {
   private final Map<String, Integer> _estimatedMaxPartitionByResource = new 
HashMap<>();
   // This estimation helps to ensure global resource usage evenness.
   private final float _estimatedMaxUtilization;
+  // This estimation helps to ensure global resource top state usage evenness.
+  private final float _estimatedTopStateMaxUtilization;
 
   // map{zoneName : map{resourceName : set(partitionNames)}}
   private Map<String, Map<String, Set<String>>> _assignmentForFaultZoneMap = 
new HashMap<>();
@@ -63,6 +65,7 @@ public class ClusterContext {
     int totalReplicas = 0;
     int totalTopStateReplicas = 0;
     Map<String, Integer> totalUsage = new HashMap<>();
+    Map<String, Integer> totalTopStateUsage = new HashMap<>();
     Map<String, Integer> totalCapacity = new HashMap<>();
 
     for (Map.Entry<String, List<AssignableReplica>> entry : replicaSet.stream()
@@ -77,6 +80,9 @@ public class ClusterContext {
       for (AssignableReplica replica : entry.getValue()) {
         if (replica.isReplicaTopState()) {
           totalTopStateReplicas += 1;
+          replica.getCapacity().entrySet().stream().forEach(capacityEntry -> 
totalTopStateUsage
+              .compute(capacityEntry.getKey(), (k, v) -> (v == null) ? 
capacityEntry.getValue()
+                  : (v + capacityEntry.getValue())));
         }
         replica.getCapacity().entrySet().stream().forEach(capacityEntry -> 
totalUsage
             .compute(capacityEntry.getKey(),
@@ -87,18 +93,15 @@ public class ClusterContext {
         capacityEntry -> totalCapacity.compute(capacityEntry.getKey(),
             (k, v) -> (v == null) ? capacityEntry.getValue() : (v + 
capacityEntry.getValue()))));
 
+    // TODO: these variables correspond to one constraint each, and may become 
unnecessary if the
+    // constraints are not used. A better design is to make them pluggable.
     if (totalCapacity.isEmpty()) {
       // If no capacity is configured, we treat the cluster as fully utilized.
       _estimatedMaxUtilization = 1f;
+      _estimatedTopStateMaxUtilization = 1f;
     } else {
-      float estimatedMaxUsage = 0;
-      for (String capacityKey : totalCapacity.keySet()) {
-        int maxCapacity = totalCapacity.get(capacityKey);
-        int usage = totalUsage.getOrDefault(capacityKey, 0);
-        float utilization = (maxCapacity == 0) ? 1 : (float) usage / 
maxCapacity;
-        estimatedMaxUsage = Math.max(estimatedMaxUsage, utilization);
-      }
-      _estimatedMaxUtilization = estimatedMaxUsage;
+      _estimatedMaxUtilization = estimateMaxUtilization(totalCapacity, 
totalUsage);
+      _estimatedTopStateMaxUtilization = estimateMaxUtilization(totalCapacity, 
totalTopStateUsage);
     }
     _estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, 
instanceCount);
     _estimatedMaxTopStateCount = 
estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
@@ -135,6 +138,10 @@ public class ClusterContext {
     return _estimatedMaxUtilization;
   }
 
+  public float getEstimatedTopStateMaxUtilization() {
+    return _estimatedTopStateMaxUtilization;
+  }
+
   public Set<String> getPartitionsForResourceAndFaultZone(String resourceName, 
String faultZoneId) {
     return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, 
Collections.emptyMap())
         .getOrDefault(resourceName, Collections.emptySet());
@@ -169,4 +176,17 @@ public class ClusterContext {
     // partitions. The later scenario is what we want to achieve.
     return (int) Math.floor((float) replicaCount / instanceCount);
   }
+
+  private float estimateMaxUtilization(Map<String, Integer> totalCapacity,
+      Map<String, Integer> totalUsage) {
+    float estimatedMaxUsage = 0;
+    for (String capacityKey : totalCapacity.keySet()) {
+      int maxCapacity = totalCapacity.get(capacityKey);
+      int usage = totalUsage.getOrDefault(capacityKey, 0);
+      float utilization = (maxCapacity == 0) ? 1 : (float) usage / maxCapacity;
+      estimatedMaxUsage = Math.max(estimatedMaxUsage, utilization);
+    }
+
+    return estimatedMaxUsage;
+  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 49e5d8f..bda56ba 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -298,7 +298,7 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
         for (AssignableNode node : clusterModel.getAssignableNodes().values()) 
{
           String instanceName = node.getInstanceName();
           // There is no new usage adding to this node, so an empty map is 
passed in.
-          double usage = 
node.getProjectedHighestUtilization(Collections.emptyMap());
+          double usage = 
node.getGeneralProjectedHighestUtilization(Collections.emptyMap());
           clusterStatusMonitor
               .updateInstanceCapacityStatus(instanceName, usage, 
node.getMaxCapacity());
         }
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
index 5d52cb7..f08371a 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
@@ -45,7 +45,7 @@ public class TestMaxCapacityUsageInstanceConstraint {
 
   @Test
   public void testGetNormalizedScore() {
-    when(_testNode.getProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
+    
when(_testNode.getGeneralProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
     when(_clusterContext.getEstimatedMaxUtilization()).thenReturn(1f);
     double score = _constraint.getAssignmentScore(_testNode, _testReplica, 
_clusterContext);
     // Convert to float so as to compare with equal.
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java
similarity index 82%
copy from 
helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
copy to 
helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java
index 5d52cb7..947d0a1 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java
@@ -30,11 +30,12 @@ import static org.mockito.Matchers.anyMap;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class TestMaxCapacityUsageInstanceConstraint {
+
+public class TestTopStateMaxCapacityUsageInstanceConstraint {
   private AssignableReplica _testReplica;
   private AssignableNode _testNode;
   private ClusterContext _clusterContext;
-  private final SoftConstraint _constraint = new 
MaxCapacityUsageInstanceConstraint();
+  private final SoftConstraint _constraint = new 
TopStateMaxCapacityUsageInstanceConstraint();
 
   @BeforeMethod
   public void setUp() {
@@ -45,11 +46,12 @@ public class TestMaxCapacityUsageInstanceConstraint {
 
   @Test
   public void testGetNormalizedScore() {
-    when(_testNode.getProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
-    when(_clusterContext.getEstimatedMaxUtilization()).thenReturn(1f);
+    when(_testReplica.isReplicaTopState()).thenReturn(true);
+    
when(_testNode.getTopStateProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
+    when(_clusterContext.getEstimatedTopStateMaxUtilization()).thenReturn(1f);
     double score = _constraint.getAssignmentScore(_testNode, _testReplica, 
_clusterContext);
     // Convert to float so as to compare with equal.
-    Assert.assertEquals((float) score,0.8f);
+    Assert.assertEquals((float) score, 0.8f);
     double normalizedScore =
         _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, 
_clusterContext);
     Assert.assertTrue(normalizedScore > 0.99);
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index 0245ffa..4570efd 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -68,8 +68,10 @@ public class TestAssignableNode extends 
AbstractTestClusterModel {
     assignableNode.assignInitBatch(assignmentSet);
     Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), 
expectedAssignment);
     Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
-    
Assert.assertEquals(assignableNode.getProjectedHighestUtilization(Collections.EMPTY_MAP),
+    
Assert.assertEquals(assignableNode.getGeneralProjectedHighestUtilization(Collections.EMPTY_MAP),
         16.0 / 20.0, 0.005);
+    
Assert.assertEquals(assignableNode.getTopStateProjectedHighestUtilization(Collections.EMPTY_MAP),
+        8.0 / 20.0, 0.005);
     Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
     Assert.assertEquals(assignableNode.getMaxPartition(), 5);
     Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
@@ -109,8 +111,10 @@ public class TestAssignableNode extends 
AbstractTestClusterModel {
 
     Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), 
expectedAssignment);
     Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 3);
-    
Assert.assertEquals(assignableNode.getProjectedHighestUtilization(Collections.EMPTY_MAP),
+    
Assert.assertEquals(assignableNode.getGeneralProjectedHighestUtilization(Collections.EMPTY_MAP),
         11.0 / 20.0, 0.005);
+    
Assert.assertEquals(assignableNode.getTopStateProjectedHighestUtilization(Collections.EMPTY_MAP),
+        3.0 / 20.0, 0.005);
     Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
     Assert.assertEquals(assignableNode.getMaxPartition(), 5);
     Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
@@ -143,8 +147,10 @@ public class TestAssignableNode extends 
AbstractTestClusterModel {
 
     Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), 
expectedAssignment);
     Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
-    
Assert.assertEquals(assignableNode.getProjectedHighestUtilization(Collections.EMPTY_MAP),
+    
Assert.assertEquals(assignableNode.getGeneralProjectedHighestUtilization(Collections.EMPTY_MAP),
         16.0 / 20.0, 0.005);
+    
Assert.assertEquals(assignableNode.getTopStateProjectedHighestUtilization(Collections.EMPTY_MAP),
+        3.0 / 20.0, 0.005);
     Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
     Assert.assertEquals(assignableNode.getMaxPartition(), 5);
     Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
index 6b2787c..7171755 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
@@ -68,6 +68,10 @@ public class TestClusterContext extends 
AbstractTestClusterModel {
         .addPartitionToFaultZone(_testFaultZoneId, replica.getResourceName(),
             replica.getPartitionName()));
     Assert.assertEquals(context.getAssignmentForFaultZoneMap(), 
expectedFaultZoneMap);
+    // Capacity with "item1" key is the highest utilized. Among 4 partitions, 
their weights are
+    // 3, 5, 3, 5, so a total of 16/20 is used; the 2 master partitions have 
3, 5, so 8/20 used.
+    Assert.assertEquals(context.getEstimatedMaxUtilization(), 16.0 / 20.0, 
0.005);
+    Assert.assertEquals(context.getEstimatedTopStateMaxUtilization(), 8.0 / 
20.0, 0.005);
 
     // release
     expectedFaultZoneMap.get(_testFaultZoneId).get(_resourceNames.get(0))
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index 80c63bc..bba94fc 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -123,6 +123,16 @@ public class TestWagedRebalance extends ZkTestBase {
             return super.getBestPossibleAssignment();
           }
         };
+
+    // Set test instance capacity and partition weights
+    HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, 
_baseAccessor);
+    ClusterConfig clusterConfig =
+        dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig());
+    String testCapacityKey = "TestCapacityKey";
+    
clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey));
+    
clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey,
 100));
+    
clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey,
 1));
+    dataAccessor.setProperty(dataAccessor.keyBuilder().clusterConfig(), 
clusterConfig);
   }
 
   protected void addInstanceConfig(String storageNodeName, int seqNo, int 
tagCount) {

Reply via email to