This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedImprove
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/wagedImprove by this push:
new 128ccde Add TopStateUsage constraint to Waged (#1652)
128ccde is described below
commit 128ccded98295638320e45e148cb556bc61591c9
Author: Neal Sun <[email protected]>
AuthorDate: Mon Mar 1 13:40:04 2021 -0800
Add TopStateUsage constraint to Waged (#1652)
Add new top state weight-based constraint to Waged to ensure top state
weight evenness.
Co-authored-by: Neal Sun <[email protected]>
---
.../ConstraintBasedAlgorithmFactory.java | 7 +--
.../MaxCapacityUsageInstanceConstraint.java | 3 +-
...opStateMaxCapacityUsageInstanceConstraint.java} | 19 ++++---
.../rebalancer/waged/model/AssignableNode.java | 61 ++++++++++++++++------
.../rebalancer/waged/model/ClusterContext.java | 36 ++++++++++---
.../stages/CurrentStateComputationStage.java | 2 +-
.../TestMaxCapacityUsageInstanceConstraint.java | 2 +-
...opStateMaxCapacityUsageInstanceConstraint.java} | 12 +++--
.../rebalancer/waged/model/TestAssignableNode.java | 12 +++--
.../rebalancer/waged/model/TestClusterContext.java | 4 ++
.../WagedRebalancer/TestWagedRebalance.java | 10 ++++
11 files changed, 123 insertions(+), 45 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
index 934bfa7..33aa6c8 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
@@ -41,8 +41,8 @@ public class ConstraintBasedAlgorithmFactory {
put(PartitionMovementConstraint.class.getSimpleName(), 2f);
put(InstancePartitionsCountConstraint.class.getSimpleName(), 1f);
put(ResourcePartitionAntiAffinityConstraint.class.getSimpleName(), 1f);
- put(ResourceTopStateAntiAffinityConstraint.class.getSimpleName(), 3f);
- put(MaxCapacityUsageInstanceConstraint.class.getSimpleName(), 5f);
+ put(TopStateMaxCapacityUsageInstanceConstraint.class.getSimpleName(),
3f);
+ put(MaxCapacityUsageInstanceConstraint.class.getSimpleName(), 6f);
}
};
@@ -69,7 +69,8 @@ public class ConstraintBasedAlgorithmFactory {
List<SoftConstraint> softConstraints = ImmutableList
.of(new PartitionMovementConstraint(), new
InstancePartitionsCountConstraint(),
new ResourcePartitionAntiAffinityConstraint(),
- new ResourceTopStateAntiAffinityConstraint(), new
MaxCapacityUsageInstanceConstraint());
+ new TopStateMaxCapacityUsageInstanceConstraint(),
+ new MaxCapacityUsageInstanceConstraint());
Map<SoftConstraint, Float> softConstraintsWithWeight =
Maps.toMap(softConstraints, key -> {
String name = key.getClass().getSimpleName();
float weight = MODEL.get(name);
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
index 8f41f5e..7d74c26 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
@@ -36,7 +36,8 @@ class MaxCapacityUsageInstanceConstraint extends
UsageSoftConstraint {
protected double getAssignmentScore(AssignableNode node, AssignableReplica
replica,
ClusterContext clusterContext) {
float estimatedMaxUtilization =
clusterContext.getEstimatedMaxUtilization();
- float projectedHighestUtilization =
node.getProjectedHighestUtilization(replica.getCapacity());
+ float projectedHighestUtilization =
+ node.getGeneralProjectedHighestUtilization(replica.getCapacity());
return computeUtilizationScore(estimatedMaxUtilization,
projectedHighestUtilization);
}
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java
similarity index 69%
copy from
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
copy to
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java
index 8f41f5e..1454253 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java
@@ -23,20 +23,25 @@ import
org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
/**
- * The constraint evaluates the score by checking the max used capacity key
out of all the capacity
- * keys.
+ * Evaluate the proposed assignment according to the top state resource usage
on the instance.
* The higher the maximum usage value for the capacity key, the lower the
score will be, implying
* that it is that much less desirable to assign anything on the given node.
* It is a greedy approach since it evaluates only on the most used capacity
key.
*/
-class MaxCapacityUsageInstanceConstraint extends UsageSoftConstraint {
-
+class TopStateMaxCapacityUsageInstanceConstraint extends UsageSoftConstraint {
@Override
protected double getAssignmentScore(AssignableNode node, AssignableReplica
replica,
ClusterContext clusterContext) {
- float estimatedMaxUtilization =
clusterContext.getEstimatedMaxUtilization();
- float projectedHighestUtilization =
node.getProjectedHighestUtilization(replica.getCapacity());
- return computeUtilizationScore(estimatedMaxUtilization,
projectedHighestUtilization);
+ if (!replica.isReplicaTopState()) {
+ // For non top state replica, this constraint is not applicable.
+ // So return zero on any assignable node candidate.
+ return 0;
+ }
+ float estimatedTopStateMaxUtilization =
clusterContext.getEstimatedTopStateMaxUtilization();
+ float projectedHighestUtilization =
+ node.getTopStateProjectedHighestUtilization(replica.getCapacity());
+ return computeUtilizationScore(estimatedTopStateMaxUtilization,
projectedHighestUtilization);
}
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index d3d014d..aae2328 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -62,6 +62,7 @@ public class AssignableNode implements
Comparable<AssignableNode> {
private Map<String, Map<String, AssignableReplica>>
_currentAssignedReplicaMap;
// A map of <capacity key, capacity value> that tracks the current available
node capacity
private Map<String, Integer> _remainingCapacity;
+ private Map<String, Integer> _remainingTopStateCapacity;
/**
* Update the node with a ClusterDataCache. This resets the current
assignment and recalculates
@@ -81,6 +82,7 @@ public class AssignableNode implements
Comparable<AssignableNode> {
// make a copy of max capacity
_maxAllowedCapacity = ImmutableMap.copyOf(instanceCapacity);
_remainingCapacity = new HashMap<>(instanceCapacity);
+ _remainingTopStateCapacity = new HashMap<>(instanceCapacity);
_maxPartition = clusterConfig.getMaxPartitionsPerInstance();
_currentAssignedReplicaMap = new HashMap<>();
}
@@ -92,12 +94,18 @@ public class AssignableNode implements
Comparable<AssignableNode> {
* Using this function avoids the overhead of updating capacity repeatedly.
*/
void assignInitBatch(Collection<AssignableReplica> replicas) {
+ Map<String, Integer> totalTopStatePartitionCapacity = new HashMap<>();
Map<String, Integer> totalPartitionCapacity = new HashMap<>();
for (AssignableReplica replica : replicas) {
// TODO: the exception could occur in the middle of for loop and the
previous added records cannot be reverted
addToAssignmentRecord(replica);
// increment the capacity requirement according to partition's capacity
configuration.
for (Map.Entry<String, Integer> capacity :
replica.getCapacity().entrySet()) {
+ if (replica.isReplicaTopState()) {
+ totalTopStatePartitionCapacity.compute(capacity.getKey(),
+ (key, totalValue) -> (totalValue == null) ? capacity.getValue()
+ : totalValue + capacity.getValue());
+ }
totalPartitionCapacity.compute(capacity.getKey(),
(key, totalValue) -> (totalValue == null) ? capacity.getValue()
: totalValue + capacity.getValue());
@@ -105,9 +113,8 @@ public class AssignableNode implements
Comparable<AssignableNode> {
}
// Update the global state after all single replications' calculation is
done.
- for (String capacityKey : totalPartitionCapacity.keySet()) {
- updateRemainingCapacity(capacityKey,
totalPartitionCapacity.get(capacityKey));
- }
+ updateRemainingCapacity(totalTopStatePartitionCapacity,
_remainingTopStateCapacity, false);
+ updateRemainingCapacity(totalPartitionCapacity, _remainingCapacity, false);
}
/**
@@ -116,8 +123,10 @@ public class AssignableNode implements
Comparable<AssignableNode> {
*/
void assign(AssignableReplica assignableReplica) {
addToAssignmentRecord(assignableReplica);
- assignableReplica.getCapacity().entrySet().stream()
- .forEach(capacity -> updateRemainingCapacity(capacity.getKey(),
capacity.getValue()));
+ updateRemainingCapacity(assignableReplica.getCapacity(),
_remainingCapacity, false);
+ if (assignableReplica.isReplicaTopState()) {
+ updateRemainingCapacity(assignableReplica.getCapacity(),
_remainingTopStateCapacity, false);
+ }
}
/**
@@ -146,8 +155,10 @@ public class AssignableNode implements
Comparable<AssignableNode> {
}
AssignableReplica removedReplica = partitionMap.remove(partitionName);
- removedReplica.getCapacity().entrySet().stream()
- .forEach(entry -> updateRemainingCapacity(entry.getKey(), -1 *
entry.getValue()));
+ updateRemainingCapacity(removedReplica.getCapacity(), _remainingCapacity,
true);
+ if (removedReplica.isReplicaTopState()) {
+ updateRemainingCapacity(removedReplica.getCapacity(),
_remainingTopStateCapacity, true);
+ }
}
/**
@@ -228,11 +239,30 @@ public class AssignableNode implements
Comparable<AssignableNode> {
* @param newUsage the proposed new additional capacity usage.
* @return The highest utilization number of the node among all the capacity
category.
*/
- public float getProjectedHighestUtilization(Map<String, Integer> newUsage) {
+ public float getGeneralProjectedHighestUtilization(Map<String, Integer>
newUsage) {
+ return getProjectedHighestUtilization(newUsage, _remainingCapacity);
+ }
+
+ /**
+ * Return the most concerning capacity utilization number for evenly
partition assignment.
+ * The method dynamically calculates the projected highest utilization
number among all the
+ * capacity categories assuming the new capacity usage is added to the node.
+ * For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK:
0.6}. Then this call shall
+ * return 0.9.
+ * This function returns projected highest utilization for only top state
partitions.
+ * @param newUsage the proposed new additional capacity usage.
+ * @return The highest utilization number of the node among all the capacity
category.
+ */
+ public float getTopStateProjectedHighestUtilization(Map<String, Integer>
newUsage) {
+ return getProjectedHighestUtilization(newUsage,
_remainingTopStateCapacity);
+ }
+
+ private float getProjectedHighestUtilization(Map<String, Integer> newUsage,
+ Map<String, Integer> remainingCapacity) {
float highestCapacityUtilization = 0;
for (String capacityKey : _maxAllowedCapacity.keySet()) {
float capacityValue = _maxAllowedCapacity.get(capacityKey);
- float utilization = (capacityValue - _remainingCapacity.get(capacityKey)
+ newUsage
+ float utilization = (capacityValue - remainingCapacity.get(capacityKey)
+ newUsage
.getOrDefault(capacityKey, 0)) / capacityValue;
highestCapacityUtilization = Math.max(highestCapacityUtilization,
utilization);
}
@@ -311,13 +341,12 @@ public class AssignableNode implements
Comparable<AssignableNode> {
}
}
- private void updateRemainingCapacity(String capacityKey, int usage) {
- if (!_remainingCapacity.containsKey(capacityKey)) {
- //if the capacityKey belongs to replicas does not exist in the
instance's capacity,
- // it will be treated as if it has unlimited capacity of that capacityKey
- return;
- }
- _remainingCapacity.put(capacityKey, _remainingCapacity.get(capacityKey) -
usage);
+ private void updateRemainingCapacity(Map<String, Integer> usedCapacity,
Map<String, Integer> remainingCapacity,
+ boolean isRelease) {
+ int multiplier = isRelease ? -1 : 1;
+ // if the used capacity key does not exist in the node's capacity, ignore
it
+ usedCapacity.forEach((capacityKey, capacityValue) ->
remainingCapacity.compute(capacityKey,
+ (key, value) -> value == null ? null : value - multiplier *
capacityValue));
}
/**
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
index 46392c9..5bfd4d0 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
@@ -43,6 +43,8 @@ public class ClusterContext {
private final Map<String, Integer> _estimatedMaxPartitionByResource = new
HashMap<>();
// This estimation helps to ensure global resource usage evenness.
private final float _estimatedMaxUtilization;
+ // This estimation helps to ensure global resource top state usage evenness.
+ private final float _estimatedTopStateMaxUtilization;
// map{zoneName : map{resourceName : set(partitionNames)}}
private Map<String, Map<String, Set<String>>> _assignmentForFaultZoneMap =
new HashMap<>();
@@ -63,6 +65,7 @@ public class ClusterContext {
int totalReplicas = 0;
int totalTopStateReplicas = 0;
Map<String, Integer> totalUsage = new HashMap<>();
+ Map<String, Integer> totalTopStateUsage = new HashMap<>();
Map<String, Integer> totalCapacity = new HashMap<>();
for (Map.Entry<String, List<AssignableReplica>> entry : replicaSet.stream()
@@ -77,6 +80,9 @@ public class ClusterContext {
for (AssignableReplica replica : entry.getValue()) {
if (replica.isReplicaTopState()) {
totalTopStateReplicas += 1;
+ replica.getCapacity().entrySet().stream().forEach(capacityEntry ->
totalTopStateUsage
+ .compute(capacityEntry.getKey(), (k, v) -> (v == null) ?
capacityEntry.getValue()
+ : (v + capacityEntry.getValue())));
}
replica.getCapacity().entrySet().stream().forEach(capacityEntry ->
totalUsage
.compute(capacityEntry.getKey(),
@@ -87,18 +93,15 @@ public class ClusterContext {
capacityEntry -> totalCapacity.compute(capacityEntry.getKey(),
(k, v) -> (v == null) ? capacityEntry.getValue() : (v +
capacityEntry.getValue()))));
+ // TODO: these variables correspond to one constraint each, and may become
unnecessary if the
+ // constraints are not used. A better design is to make them pluggable.
if (totalCapacity.isEmpty()) {
// If no capacity is configured, we treat the cluster as fully utilized.
_estimatedMaxUtilization = 1f;
+ _estimatedTopStateMaxUtilization = 1f;
} else {
- float estimatedMaxUsage = 0;
- for (String capacityKey : totalCapacity.keySet()) {
- int maxCapacity = totalCapacity.get(capacityKey);
- int usage = totalUsage.getOrDefault(capacityKey, 0);
- float utilization = (maxCapacity == 0) ? 1 : (float) usage /
maxCapacity;
- estimatedMaxUsage = Math.max(estimatedMaxUsage, utilization);
- }
- _estimatedMaxUtilization = estimatedMaxUsage;
+ _estimatedMaxUtilization = estimateMaxUtilization(totalCapacity,
totalUsage);
+ _estimatedTopStateMaxUtilization = estimateMaxUtilization(totalCapacity,
totalTopStateUsage);
}
_estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas,
instanceCount);
_estimatedMaxTopStateCount =
estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
@@ -135,6 +138,10 @@ public class ClusterContext {
return _estimatedMaxUtilization;
}
+ public float getEstimatedTopStateMaxUtilization() {
+ return _estimatedTopStateMaxUtilization;
+ }
+
public Set<String> getPartitionsForResourceAndFaultZone(String resourceName,
String faultZoneId) {
return _assignmentForFaultZoneMap.getOrDefault(faultZoneId,
Collections.emptyMap())
.getOrDefault(resourceName, Collections.emptySet());
@@ -169,4 +176,17 @@ public class ClusterContext {
// partitions. The later scenario is what we want to achieve.
return (int) Math.floor((float) replicaCount / instanceCount);
}
+
+ private float estimateMaxUtilization(Map<String, Integer> totalCapacity,
+ Map<String, Integer> totalUsage) {
+ float estimatedMaxUsage = 0;
+ for (String capacityKey : totalCapacity.keySet()) {
+ int maxCapacity = totalCapacity.get(capacityKey);
+ int usage = totalUsage.getOrDefault(capacityKey, 0);
+ float utilization = (maxCapacity == 0) ? 1 : (float) usage / maxCapacity;
+ estimatedMaxUsage = Math.max(estimatedMaxUsage, utilization);
+ }
+
+ return estimatedMaxUsage;
+ }
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 49e5d8f..bda56ba 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -298,7 +298,7 @@ public class CurrentStateComputationStage extends
AbstractBaseStage {
for (AssignableNode node : clusterModel.getAssignableNodes().values())
{
String instanceName = node.getInstanceName();
// There is no new usage adding to this node, so an empty map is
passed in.
- double usage =
node.getProjectedHighestUtilization(Collections.emptyMap());
+ double usage =
node.getGeneralProjectedHighestUtilization(Collections.emptyMap());
clusterStatusMonitor
.updateInstanceCapacityStatus(instanceName, usage,
node.getMaxCapacity());
}
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
index 5d52cb7..f08371a 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
@@ -45,7 +45,7 @@ public class TestMaxCapacityUsageInstanceConstraint {
@Test
public void testGetNormalizedScore() {
- when(_testNode.getProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
+
when(_testNode.getGeneralProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
when(_clusterContext.getEstimatedMaxUtilization()).thenReturn(1f);
double score = _constraint.getAssignmentScore(_testNode, _testReplica,
_clusterContext);
// Convert to float so as to compare with equal.
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java
similarity index 82%
copy from
helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
copy to
helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java
index 5d52cb7..947d0a1 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java
@@ -30,11 +30,12 @@ import static org.mockito.Matchers.anyMap;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class TestMaxCapacityUsageInstanceConstraint {
+
+public class TestTopStateMaxCapacityUsageInstanceConstraint {
private AssignableReplica _testReplica;
private AssignableNode _testNode;
private ClusterContext _clusterContext;
- private final SoftConstraint _constraint = new
MaxCapacityUsageInstanceConstraint();
+ private final SoftConstraint _constraint = new
TopStateMaxCapacityUsageInstanceConstraint();
@BeforeMethod
public void setUp() {
@@ -45,11 +46,12 @@ public class TestMaxCapacityUsageInstanceConstraint {
@Test
public void testGetNormalizedScore() {
- when(_testNode.getProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
- when(_clusterContext.getEstimatedMaxUtilization()).thenReturn(1f);
+ when(_testReplica.isReplicaTopState()).thenReturn(true);
+
when(_testNode.getTopStateProjectedHighestUtilization(anyMap())).thenReturn(0.8f);
+ when(_clusterContext.getEstimatedTopStateMaxUtilization()).thenReturn(1f);
double score = _constraint.getAssignmentScore(_testNode, _testReplica,
_clusterContext);
// Convert to float so as to compare with equal.
- Assert.assertEquals((float) score,0.8f);
+ Assert.assertEquals((float) score, 0.8f);
double normalizedScore =
_constraint.getAssignmentNormalizedScore(_testNode, _testReplica,
_clusterContext);
Assert.assertTrue(normalizedScore > 0.99);
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index 0245ffa..4570efd 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -68,8 +68,10 @@ public class TestAssignableNode extends
AbstractTestClusterModel {
assignableNode.assignInitBatch(assignmentSet);
Assert.assertEquals(assignableNode.getAssignedPartitionsMap(),
expectedAssignment);
Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
-
Assert.assertEquals(assignableNode.getProjectedHighestUtilization(Collections.EMPTY_MAP),
+
Assert.assertEquals(assignableNode.getGeneralProjectedHighestUtilization(Collections.EMPTY_MAP),
16.0 / 20.0, 0.005);
+
Assert.assertEquals(assignableNode.getTopStateProjectedHighestUtilization(Collections.EMPTY_MAP),
+ 8.0 / 20.0, 0.005);
Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
Assert.assertEquals(assignableNode.getMaxPartition(), 5);
Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
@@ -109,8 +111,10 @@ public class TestAssignableNode extends
AbstractTestClusterModel {
Assert.assertEquals(assignableNode.getAssignedPartitionsMap(),
expectedAssignment);
Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 3);
-
Assert.assertEquals(assignableNode.getProjectedHighestUtilization(Collections.EMPTY_MAP),
+
Assert.assertEquals(assignableNode.getGeneralProjectedHighestUtilization(Collections.EMPTY_MAP),
11.0 / 20.0, 0.005);
+
Assert.assertEquals(assignableNode.getTopStateProjectedHighestUtilization(Collections.EMPTY_MAP),
+ 3.0 / 20.0, 0.005);
Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
Assert.assertEquals(assignableNode.getMaxPartition(), 5);
Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
@@ -143,8 +147,10 @@ public class TestAssignableNode extends
AbstractTestClusterModel {
Assert.assertEquals(assignableNode.getAssignedPartitionsMap(),
expectedAssignment);
Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
-
Assert.assertEquals(assignableNode.getProjectedHighestUtilization(Collections.EMPTY_MAP),
+
Assert.assertEquals(assignableNode.getGeneralProjectedHighestUtilization(Collections.EMPTY_MAP),
16.0 / 20.0, 0.005);
+
Assert.assertEquals(assignableNode.getTopStateProjectedHighestUtilization(Collections.EMPTY_MAP),
+ 3.0 / 20.0, 0.005);
Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
Assert.assertEquals(assignableNode.getMaxPartition(), 5);
Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
index 6b2787c..7171755 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
@@ -68,6 +68,10 @@ public class TestClusterContext extends
AbstractTestClusterModel {
.addPartitionToFaultZone(_testFaultZoneId, replica.getResourceName(),
replica.getPartitionName()));
Assert.assertEquals(context.getAssignmentForFaultZoneMap(),
expectedFaultZoneMap);
+ // Capacity with "item1" key is the highest utilized. Among 4 partitions,
their weights are
+ // 3, 5, 3, 5, so a total of 16/20 is used; the 2 master partitions have
3, 5, so 8/20 used.
+ Assert.assertEquals(context.getEstimatedMaxUtilization(), 16.0 / 20.0,
0.005);
+ Assert.assertEquals(context.getEstimatedTopStateMaxUtilization(), 8.0 /
20.0, 0.005);
// release
expectedFaultZoneMap.get(_testFaultZoneId).get(_resourceNames.get(0))
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index 80c63bc..bba94fc 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -123,6 +123,16 @@ public class TestWagedRebalance extends ZkTestBase {
return super.getBestPossibleAssignment();
}
};
+
+ // Set test instance capacity and partition weights
+ HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME,
_baseAccessor);
+ ClusterConfig clusterConfig =
+ dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig());
+ String testCapacityKey = "TestCapacityKey";
+
clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey));
+
clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey,
100));
+
clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey,
1));
+ dataAccessor.setProperty(dataAccessor.keyBuilder().clusterConfig(),
clusterConfig);
}
protected void addInstanceConfig(String storageNodeName, int seqNo, int
tagCount) {