This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch wagedRebalancer in repository https://gitbox.apache.org/repos/asf/helix.git
commit f3ec63534d7382d83d7d8737498cf319280f0cab Author: Jiajun Wang <[email protected]> AuthorDate: Fri Sep 6 19:17:19 2019 -0700 Validate the instance capacity/partition weight configuration while constructing the assignable instances (#451) Compare the configure items with the required capacity keys that are defined in the cluster config when build the assignable instances. - According to the design, all the required capacity keys must appear in the instance capacity config. - As for the partition weights, the corresponding weight item will be filled with value 0 if the required capacity key is not specified in the resource config. --- .../rebalancer/waged/model/AssignableNode.java | 41 ++++++++++++---- .../rebalancer/waged/model/AssignableReplica.java | 27 ++++++++--- .../waged/model/ClusterModelProvider.java | 31 ++++++------- .../waged/model/AbstractTestClusterModel.java | 23 ++++----- .../rebalancer/waged/model/TestAssignableNode.java | 53 +++++++++++++-------- .../waged/model/TestAssignableReplica.java | 54 +++++++++++++++++----- 6 files changed, 153 insertions(+), 76 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java index 35c3c38..33677e5 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java @@ -19,12 +19,6 @@ package org.apache.helix.controller.rebalancer.waged.model; * under the License. */ -import org.apache.helix.HelixException; -import org.apache.helix.model.ClusterConfig; -import org.apache.helix.model.InstanceConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -35,6 +29,12 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.helix.HelixException; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static java.lang.Math.max; /** @@ -91,10 +91,7 @@ public class AssignableNode { Collection<AssignableReplica> existingAssignment) { reset(); - Map<String, Integer> instanceCapacity = instanceConfig.getInstanceCapacityMap(); - if (instanceCapacity.isEmpty()) { - instanceCapacity = clusterConfig.getDefaultInstanceCapacityMap(); - } + Map<String, Integer> instanceCapacity = fetchInstanceCapacity(clusterConfig, instanceConfig); _currentCapacityMap.putAll(instanceCapacity); _faultZone = computeFaultZone(clusterConfig, instanceConfig); _instanceTags = new HashSet<>(instanceConfig.getTags()); @@ -213,6 +210,7 @@ public class AssignableNode { * The method dynamically returns the highest utilization number among all the capacity categories. * For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 0.6}. Then this call shall * return 0.9. + * * @return The highest utilization number of the node among all the capacity category. */ public float getHighestCapacityUtilization() { @@ -359,6 +357,29 @@ public class AssignableNode { // a NOP; in other words, this node will be treated as if it has unlimited capacity. } + /** + * Get and validate the instance capacity from instance config. + * + * @throws HelixException if any required capacity key is not configured in the instance config. + */ + private Map<String, Integer> fetchInstanceCapacity(ClusterConfig clusterConfig, + InstanceConfig instanceConfig) { + List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys(); + Map<String, Integer> instanceCapacity = instanceConfig.getInstanceCapacityMap(); + if (instanceCapacity.isEmpty()) { + instanceCapacity = clusterConfig.getDefaultInstanceCapacityMap(); + } + // Remove all the non-required capacity items from the map. + instanceCapacity.keySet().retainAll(requiredCapacityKeys); + // All the required keys must exist in the instance config. + if (!instanceCapacity.keySet().containsAll(requiredCapacityKeys)) { + throw new HelixException(String.format( + "The required capacity keys %s are not fully configured in the instance %s capacity map %s.", + requiredCapacityKeys.toString(), _instanceName, instanceCapacity.toString())); + } + return instanceCapacity; + } + @Override public int hashCode() { return _instanceName.hashCode(); diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java index ade04bf..537bf70 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java @@ -19,12 +19,14 @@ package org.apache.helix.controller.rebalancer.waged.model; * under the License. */ -import org.apache.helix.model.ResourceConfig; -import org.apache.helix.model.StateModelDefinition; - import java.io.IOException; +import java.util.List; import java.util.Map; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.model.StateModelDefinition; + /** * This class represents a partition replication that needs to be allocated. */ @@ -40,18 +42,19 @@ public class AssignableReplica implements Comparable<AssignableReplica> { private final String _replicaState; /** + * @param clusterConfig The cluster config. * @param resourceConfig The resource config for the resource which contains the replication. * @param partitionName The replication's partition name. * @param replicaState The state of the replication. * @param statePriority The priority of the replication's state. */ - AssignableReplica(ResourceConfig resourceConfig, String partitionName, String replicaState, - int statePriority) { + AssignableReplica(ClusterConfig clusterConfig, ResourceConfig resourceConfig, + String partitionName, String replicaState, int statePriority) { _partitionName = partitionName; _replicaState = replicaState; _statePriority = statePriority; _resourceName = resourceConfig.getResourceName(); - _capacityUsage = fetchCapacityUsage(partitionName, resourceConfig); + _capacityUsage = fetchCapacityUsage(partitionName, resourceConfig, clusterConfig); _resourceInstanceGroupTag = resourceConfig.getInstanceGroupTag(); _resourceMaxPartitionsPerInstance = resourceConfig.getMaxPartitionsPerInstance(); } @@ -127,7 +130,7 @@ public class AssignableReplica implements Comparable<AssignableReplica> { * Parse the resource config for the partition weight. */ private Map<String, Integer> fetchCapacityUsage(String partitionName, - ResourceConfig resourceConfig) { + ResourceConfig resourceConfig, ClusterConfig clusterConfig) { Map<String, Map<String, Integer>> capacityMap; try { capacityMap = resourceConfig.getPartitionCapacityMap(); @@ -146,6 +149,16 @@ public class AssignableReplica implements Comparable<AssignableReplica> { "The capacity usage of the specified partition %s is not configured in the Resource Config %s. No default partition capacity is configured neither.", partitionName, resourceConfig.getResourceName())); } + + List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys(); + // Remove the non-required capacity items. + partitionCapacity.keySet().retainAll(requiredCapacityKeys); + // If any required capacity key is not configured in the resource config, fill the partition + // capacity map with 0 usage. + for (String capacityKey : requiredCapacityKeys) { + partitionCapacity.putIfAbsent(capacityKey, 0); + } + return partitionCapacity; } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java index 61f5d8d..3570164 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java @@ -19,6 +19,14 @@ package org.apache.helix.controller.rebalancer.waged.model; * under the License. */ +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + import org.apache.helix.HelixConstants; import org.apache.helix.HelixException; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; @@ -30,14 +38,6 @@ import org.apache.helix.model.ResourceAssignment; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; - /** * This util class generates Cluster Model object based on the controller's data cache. */ @@ -112,8 +112,8 @@ public class ClusterModelProvider { Map<String, ResourceAssignment> bestPossibleAssignment, Map<String, Set<AssignableReplica>> allocatedReplicas) { Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>(); - if (clusterChanges.containsKey(HelixConstants.ChangeType.CLUSTER_CONFIG) - || clusterChanges.containsKey(HelixConstants.ChangeType.INSTANCE_CONFIG)) { + if (clusterChanges.containsKey(HelixConstants.ChangeType.CLUSTER_CONFIG) || clusterChanges + .containsKey(HelixConstants.ChangeType.INSTANCE_CONFIG)) { // If the cluster topology has been modified, need to reassign all replicas toBeAssignedReplicas .addAll(replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet())); @@ -125,11 +125,9 @@ public class ClusterModelProvider { // 2. if the resource does appear in the best possible assignment, need to reassign. if (clusterChanges .getOrDefault(HelixConstants.ChangeType.RESOURCE_CONFIG, Collections.emptySet()) - .contains(resourceName) - || clusterChanges + .contains(resourceName) || clusterChanges .getOrDefault(HelixConstants.ChangeType.IDEAL_STATE, Collections.emptySet()) - .contains(resourceName) - || !bestPossibleAssignment.containsKey(resourceName)) { + .contains(resourceName) || !bestPossibleAssignment.containsKey(resourceName)) { toBeAssignedReplicas.addAll(replicas); continue; // go to check next resource } else { @@ -196,9 +194,10 @@ public class ClusterModelProvider { ResourceControllerDataProvider dataProvider, Map<String, Resource> resourceMap, int instanceCount) { Map<String, Set<AssignableReplica>> totalReplicaMap = new HashMap<>(); + ClusterConfig clusterConfig = dataProvider.getClusterConfig(); for (String resourceName : resourceMap.keySet()) { - ResourceConfig config = dataProvider.getResourceConfig(resourceName); + ResourceConfig resourceConfig = dataProvider.getResourceConfig(resourceName); IdealState is = dataProvider.getIdealState(resourceName); if (is == null) { throw new HelixException( @@ -220,7 +219,7 @@ public class ClusterModelProvider { String state = entry.getKey(); for (int i = 0; i < entry.getValue(); i++) { totalReplicaMap.computeIfAbsent(resourceName, key -> new HashSet<>()).add( - new AssignableReplica(config, partition, state, + new AssignableReplica(clusterConfig, resourceConfig, partition, state, def.getStatePriorityMap().get(state))); } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java index d99a3fb..a8a5de5 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java @@ -19,6 +19,15 @@ package org.apache.helix.controller.rebalancer.waged.model; * under the License. */ +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ClusterConfig; @@ -29,15 +38,6 @@ import org.apache.helix.model.ResourceConfig; import org.mockito.Mockito; import org.testng.annotations.BeforeClass; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - import static org.mockito.Mockito.when; public abstract class AbstractTestClusterModel { @@ -104,6 +104,7 @@ public abstract class AbstractTestClusterModel { testClusterConfig.setMaxPartitionsPerInstance(5); testClusterConfig.setDisabledInstances(Collections.emptyMap()); testClusterConfig.setTopologyAwareEnabled(false); + testClusterConfig.setInstanceCapacityKeys(new ArrayList<>(_capacityDataMap.keySet())); when(testCache.getClusterConfig()).thenReturn(testClusterConfig); // 3. Mock the live instance node for the default instance. @@ -179,8 +180,8 @@ public abstract class AbstractTestClusterModel { ResourceConfig resourceConfig = dataProvider.getResourceConfig(cs.getResourceName()); // Construct one AssignableReplica for each partition in the current state. cs.getPartitionStateMap().entrySet().stream().forEach(entry -> assignmentSet.add( - new AssignableReplica(resourceConfig, entry.getKey(), entry.getValue(), - entry.getValue().equals("MASTER") ? 1 : 2))); + new AssignableReplica(dataProvider.getClusterConfig(), resourceConfig, entry.getKey(), + entry.getValue(), entry.getValue().equals("MASTER") ? 1 : 2))); } return assignmentSet; } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java index 34a03a9..92a6998 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java @@ -19,22 +19,24 @@ package org.apache.helix.controller.rebalancer.waged.model; * under the License. */ -import org.apache.helix.HelixException; -import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; -import org.apache.helix.model.ClusterConfig; -import org.apache.helix.model.InstanceConfig; -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.helix.HelixException; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + import static org.mockito.Mockito.when; public class TestAssignableNode extends AbstractTestClusterModel { @@ -87,9 +89,8 @@ public class TestAssignableNode extends AbstractTestClusterModel { expectedTopStateAssignmentSet1.size() + expectedTopStateAssignmentSet2.size()); // Test 2 - release assignment from the AssignableNode - AssignableReplica removingReplica = - new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(1)), - _partitionNames.get(2), "MASTER", 1); + AssignableReplica removingReplica = new AssignableReplica(testCache.getClusterConfig(), + testCache.getResourceConfig(_resourceNames.get(1)), _partitionNames.get(2), "MASTER", 1); expectedAssignment.get(_resourceNames.get(1)).remove(_partitionNames.get(2)); expectedCapacityMap.put("item1", 9); expectedCapacityMap.put("item2", 18); @@ -128,9 +129,8 @@ public class TestAssignableNode extends AbstractTestClusterModel { expectedTopStateAssignmentSet1.size() + expectedTopStateAssignmentSet2.size()); // Test 3 - add assignment to the AssignableNode - AssignableReplica addingReplica = - new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(1)), - _partitionNames.get(2), "SLAVE", 2); + AssignableReplica addingReplica = new AssignableReplica(testCache.getClusterConfig(), + testCache.getResourceConfig(_resourceNames.get(1)), _partitionNames.get(2), "SLAVE", 2); expectedAssignment.get(_resourceNames.get(1)).add(_partitionNames.get(2)); expectedCapacityMap.put("item1", 4); expectedCapacityMap.put("item2", 8); @@ -169,9 +169,9 @@ public class TestAssignableNode extends AbstractTestClusterModel { AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(), testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, Collections.emptyList()); - AssignableReplica removingReplica = - new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(1)), - _partitionNames.get(2) + "non-exist", "MASTER", 1); + AssignableReplica removingReplica = new AssignableReplica(testCache.getClusterConfig(), + testCache.getResourceConfig(_resourceNames.get(1)), _partitionNames.get(2) + "non-exist", + "MASTER", 1); // Release shall pass. assignableNode.release(removingReplica); @@ -184,9 +184,8 @@ public class TestAssignableNode extends AbstractTestClusterModel { AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(), testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, assignmentSet); - AssignableReplica duplicateReplica = - new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(0)), - _partitionNames.get(0), "SLAVE", 2); + AssignableReplica duplicateReplica = new AssignableReplica(testCache.getClusterConfig(), + testCache.getResourceConfig(_resourceNames.get(0)), _partitionNames.get(0), "SLAVE", 2); assignableNode.assign(duplicateReplica); } @@ -264,4 +263,18 @@ public class TestAssignableNode extends AbstractTestClusterModel { Collections.emptyList()); Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap); } + + @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "The required capacity keys \\[item2, item1, item3, AdditionalCapacityKey\\] are not fully configured in the instance testInstanceId capacity map \\{item2=40, item1=20, item3=30\\}.") + public void testIncompleteInstanceCapacity() { + ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId"); + List<String> requiredCapacityKeys = new ArrayList<>(_capacityDataMap.keySet()); + requiredCapacityKeys.add("AdditionalCapacityKey"); + testClusterConfig.setInstanceCapacityKeys(requiredCapacityKeys); + + InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId"); + testInstanceConfig.setInstanceCapacityMap(_capacityDataMap); + + new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId, + Collections.emptyList()); + } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableReplica.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableReplica.java index d069ced..a247537 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableReplica.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableReplica.java @@ -19,16 +19,19 @@ package org.apache.helix.controller.rebalancer.waged.model; * under the License. */ -import org.apache.helix.model.ResourceConfig; -import org.apache.helix.model.StateModelDefinition; -import org.testng.Assert; -import org.testng.annotations.Test; - import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.model.StateModelDefinition; +import org.testng.Assert; +import org.testng.annotations.Test; + public class TestAssignableReplica { String resourceName = "Resource"; String partitionNamePrefix = "partition"; @@ -38,7 +41,7 @@ public class TestAssignableReplica { int slavePriority = 2; @Test - public void testConstructRepliaWithResourceConfig() throws IOException { + public void testConstructReplicaWithResourceConfig() throws IOException { // Init assignable replica with a basic config object Map<String, Integer> capacityDataMapResource1 = new HashMap<>(); capacityDataMapResource1.put("item1", 3); @@ -46,11 +49,13 @@ public class TestAssignableReplica { ResourceConfig testResourceConfigResource = new ResourceConfig(resourceName); testResourceConfigResource.setPartitionCapacityMap( Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource1)); + ClusterConfig testClusterConfig = new ClusterConfig("testCluster"); + testClusterConfig.setInstanceCapacityKeys(new ArrayList<>(capacityDataMapResource1.keySet())); String partitionName = partitionNamePrefix + 1; AssignableReplica replica = - new AssignableReplica(testResourceConfigResource, partitionName, masterState, - masterPriority); + new AssignableReplica(testClusterConfig, testResourceConfigResource, partitionName, + masterState, masterPriority); Assert.assertEquals(replica.getResourceName(), resourceName); Assert.assertEquals(replica.getPartitionName(), partitionName); Assert.assertEquals(replica.getReplicaState(), masterState); @@ -79,14 +84,14 @@ public class TestAssignableReplica { .setIntField(ResourceConfig.ResourceConfigProperty.MAX_PARTITIONS_PER_INSTANCE.name(), maxPartition); - replica = new AssignableReplica(testResourceConfigResource, partitionName, masterState, - masterPriority); + replica = new AssignableReplica(testClusterConfig, testResourceConfigResource, partitionName, + masterState, masterPriority); Assert.assertEquals(replica.getCapacity(), capacityDataMapResource1); Assert.assertEquals(replica.getResourceInstanceGroupTag(), group); Assert.assertEquals(replica.getResourceMaxPartitionsPerInstance(), maxPartition); - replica = new AssignableReplica(testResourceConfigResource, partitionName2, slaveState, - slavePriority); + replica = new AssignableReplica(testClusterConfig, testResourceConfigResource, partitionName2, + slaveState, slavePriority); Assert.assertEquals(replica.getResourceName(), resourceName); Assert.assertEquals(replica.getPartitionName(), partitionName2); Assert.assertEquals(replica.getReplicaState(), slaveState); @@ -96,4 +101,29 @@ public class TestAssignableReplica { Assert.assertEquals(replica.getResourceInstanceGroupTag(), group); Assert.assertEquals(replica.getResourceMaxPartitionsPerInstance(), maxPartition); } + + @Test + public void testIncompletePartitionWeightConfig() throws IOException { + // Init assignable replica with a basic config object + Map<String, Integer> capacityDataMapResource = new HashMap<>(); + capacityDataMapResource.put("item1", 3); + capacityDataMapResource.put("item2", 6); + ResourceConfig testResourceConfigResource = new ResourceConfig(resourceName); + testResourceConfigResource.setPartitionCapacityMap( + Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource)); + ClusterConfig testClusterConfig = new ClusterConfig("testCluster"); + List<String> requiredCapacityKeys = new ArrayList<>(capacityDataMapResource.keySet()); + // Remove one required key, so it becomes a unnecessary item. + String unnecessaryCapacityKey = requiredCapacityKeys.remove(0); + // Add one new required key, so it does not exist in the resource config. + String newCapacityKey = "newCapacityKey"; + requiredCapacityKeys.add(newCapacityKey); + testClusterConfig.setInstanceCapacityKeys(requiredCapacityKeys); + + AssignableReplica replica = new AssignableReplica(testClusterConfig, testResourceConfigResource, + partitionNamePrefix + 1, masterState, masterPriority); + Assert.assertTrue(replica.getCapacity().keySet().containsAll(requiredCapacityKeys)); + Assert.assertEquals(replica.getCapacity().get(newCapacityKey).intValue(), 0); + Assert.assertFalse(replica.getCapacity().containsKey(unnecessaryCapacityKey)); + } }
