Repository: helix Updated Branches: refs/heads/master 317c300c8 -> 3d2d57b05
http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java new file mode 100644 index 0000000..5bd3bdb --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java @@ -0,0 +1,216 @@ +package org.apache.helix.util; + +import org.apache.helix.HelixException; +import org.apache.helix.ZNRecord; +import org.apache.helix.api.config.RebalanceConfig; +import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceHardConstraint; +import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceSoftConstraint; +import org.apache.helix.controller.common.PartitionStateMap; +import org.apache.helix.controller.common.ResourcesStateMap; +import org.apache.helix.controller.rebalancer.strategy.ConstraintRebalanceStrategy; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.model.*; + +import java.util.*; + +/** + * A rebalance tool that generate an resource partition assignment based on the input. + * Note the assignment won't be automatically applied to the cluster. Users are supposed to + * apply the change. + * + * @see org.apache.helix.examples.WeightAwareRebalanceUtilExample WeightAwareRebalanceUtilExample + */ +public class WeightAwareRebalanceUtil { + private final ClusterConfig _clusterConfig; + private final Map<String, InstanceConfig> _instanceConfigMap = new HashMap<>(); + // For the possible customized state models. + private final Map<String, StateModelDefinition> _stateModelDefs = new HashMap<>(); + private final ClusterDataCache _dataCache; + + private enum RebalanceOption { + INCREMENTAL, + FULL + } + + /** + * Init the rebalance util with cluster and instances information. + * + * Note that it is not required to put any configuration items in these configs. + * However, in order to do topology aware rebalance, users need to set topology information such as Domain, fault zone, and TopologyAwareEnabled. + * + * The other config items will not be read or processed by the util. + * + * @param clusterConfig + * @param instanceConfigs InstanceConfigs for all assignment candidates. + * Note that all instances will be treated as enabled and alive during the calculation. + */ + public WeightAwareRebalanceUtil(ClusterConfig clusterConfig, + List<InstanceConfig> instanceConfigs) { + for (InstanceConfig instanceConfig : instanceConfigs) { + // ensure the instance is enabled + instanceConfig.setInstanceEnabled(true); + _instanceConfigMap.put(instanceConfig.getInstanceName(), instanceConfig); + } + // ensure no instance is disabled + clusterConfig.setDisabledInstances(Collections.<String, String>emptyMap()); + _clusterConfig = clusterConfig; + + _dataCache = new ClusterDataCache(); + _dataCache.setInstanceConfigMap(_instanceConfigMap); + _dataCache.setClusterConfig(_clusterConfig); + List<LiveInstance> liveInstanceList = new ArrayList<>(); + for (String instance : _instanceConfigMap.keySet()) { + LiveInstance liveInstance = new LiveInstance(instance); + liveInstanceList.add(liveInstance); + } + _dataCache.setLiveInstances(liveInstanceList); + } + + /** + * Generate partition assignments for all new resources or partitions that have not been assigned yet. + * Note that a partition assignment that does not fit the state model will still be recalculated. + * For example, if the replica requirement is 3, but one partition has only 2 replicas, this partition will still + * be rebalanced even existing assignment exists. + * + * @param resourceConfigs Config of all the resources that need to be rebalanced. + * The tool throws Exception if any resource has no IS or broken/uninitialized IS. + * The tool throws Exception if any resource is in full-auto mode. + * Following fields are required by the tool: + * 1. ResourceName + * 2. StateModelDefRef + * 3. PreferenceLists, which includes all partitions in the resource + * 4. NumReplica + * @param existingAssignment The existing partition assignment of the resources specified in param resourceConfigs. + * Unrelated resource assignment will be discarded. + * @param hardConstraints Hard constraints for rebalancing. + * @param softConstraints Soft constraints for rebalancing. + * + * @return List of the IS that contains preference list and suggested state map + **/ + public ResourcesStateMap buildIncrementalRebalanceAssignment(List<ResourceConfig> resourceConfigs, + ResourcesStateMap existingAssignment, + List<? extends AbstractRebalanceHardConstraint> hardConstraints, + List<? extends AbstractRebalanceSoftConstraint> softConstraints) { + return calculateAssignment(resourceConfigs, existingAssignment, RebalanceOption.INCREMENTAL, + hardConstraints, softConstraints); + } + + /** + * Re-calculate the partition assignments for all the resources specified in resourceConfigs list. + * + * @param resourceConfigs Config of all the resources that need to be rebalanced. + * The tool throws Exception if any resource has no IS or broken/uninitialized IS. + * The tool throws Exception if any resource is in full-auto mode. + * Following fields are required by the tool: + * 1. ResourceName + * 2. StateModelDefRef + * 3. PreferenceLists, which includes all partitions in the resource + * 4. NumReplica + * @param preferredAssignment A set of preferred partition assignments for the resources specified in param resourceConfigs. + * The preference is not guaranteed. + * @param hardConstraints Hard constraints for rebalancing. + * @param softConstraints Soft constraints for rebalancing. + * + * @return List of the IS that contains preference list and suggested state map + **/ + public ResourcesStateMap buildFullRebalanceAssignment(List<ResourceConfig> resourceConfigs, + ResourcesStateMap preferredAssignment, + List<? extends AbstractRebalanceHardConstraint> hardConstraints, + List<? extends AbstractRebalanceSoftConstraint> softConstraints) { + return calculateAssignment(resourceConfigs, preferredAssignment, RebalanceOption.FULL, + hardConstraints, softConstraints); + } + + /** + * The method to generate partition assignment mappings. + * + * @param resourceConfigs Config of all the resources that need to be rebalanced. + * The tool throws Exception if any resource has no IS or broken/uninitialized IS. + * The tool throws Exception if any resource is in full-auto mode. + * Following fields are required by the tool: + * 1. ResourceName + * 2. StateModelDefRef + * 3. PreferenceLists, which includes all partitions in the resource + * 4. NumReplica + * @param existingAssignment The existing partition assignment of the resources specified in param resourceConfigs. + * @param option INCREMENTAL or FULL + * INCREMENTAL: Keep existing assignment. Only generate new partition assignment. + * FULL: Completely re-assign resources' partitions. + * @param hardConstraints Hard constraints for rebalancing. + * @param softConstraints Soft constraints for rebalancing. + * + * @return List of the IS that contains preference list and suggested state map + **/ + private ResourcesStateMap calculateAssignment(List<ResourceConfig> resourceConfigs, + ResourcesStateMap existingAssignment, RebalanceOption option, + List<? extends AbstractRebalanceHardConstraint> hardConstraints, + List<? extends AbstractRebalanceSoftConstraint> softConstraints) { + // check the inputs + for (ResourceConfig resourceConfig : resourceConfigs) { + RebalanceConfig.RebalanceMode rebalanceMode = + resourceConfig.getRebalanceConfig().getRebalanceMode(); + if (rebalanceMode.equals(RebalanceConfig.RebalanceMode.FULL_AUTO)) { + throw new HelixException( + "Resources that in FULL_AUTO mode are not supported: " + resourceConfig + .getResourceName()); + } + } + + ConstraintRebalanceStrategy constraintBasedStrategy = + new ConstraintRebalanceStrategy(hardConstraints, softConstraints); + + ResourcesStateMap resultAssignment = new ResourcesStateMap(); + + for (ResourceConfig resourceConfig : resourceConfigs) { + Map<String, Map<String, String>> preferredMapping = new HashMap<>(); + if (existingAssignment != null) { + PartitionStateMap partitionStateMap = existingAssignment.getPartitionStateMap(resourceConfig.getResourceName()); + // keep existing assignment if rebalance option is INCREMENTAL + if (option.equals(RebalanceOption.INCREMENTAL) && partitionStateMap != null) { + for (Partition partition : partitionStateMap.getStateMap().keySet()) { + preferredMapping.put(partition.getPartitionName(), partitionStateMap.getPartitionMap(partition)); + } + } + } + + StateModelDefinition stateModelDefinition = + getStateModelDef(resourceConfig.getStateModelDefRef()); + constraintBasedStrategy.init(resourceConfig.getResourceName(), + new ArrayList<>(resourceConfig.getPreferenceLists().keySet()), stateModelDefinition + .getStateCountMap(_instanceConfigMap.size(), + Integer.parseInt(resourceConfig.getNumReplica())), Integer.MAX_VALUE); + + List<String> instanceNames = new ArrayList<>(_instanceConfigMap.keySet()); + ZNRecord znRecord = constraintBasedStrategy + .computePartitionAssignment(instanceNames, instanceNames, preferredMapping, _dataCache); + Map<String, Map<String, String>> stateMap = znRecord.getMapFields(); + // Construct resource states result + PartitionStateMap newStateMap = new PartitionStateMap(resourceConfig.getResourceName()); + for (String partition : stateMap.keySet()) { + newStateMap.setState(new Partition(partition), stateMap.get(partition)); + } + resultAssignment.setState(resourceConfig.getResourceName(), newStateMap); + } + return resultAssignment; + } + + private StateModelDefinition getStateModelDef(String stateModelDefRef) { + if (_stateModelDefs.containsKey(stateModelDefRef)) { + return _stateModelDefs.get(stateModelDefRef); + } + return BuiltInStateModelDefinitions.valueOf(stateModelDefRef).getStateModelDefinition(); + } + + /** + * Since the tool is designed not to rely on ZK, if the application has customized state model, + * it needs to register to the tool before calling for an assignment. + * + * @param stateModelDefRef + * @param stateModelDefinition + */ + public void registerCustomizedStateModelDef(String stateModelDefRef, + StateModelDefinition stateModelDefinition) { + _stateModelDefs.put(stateModelDefRef, stateModelDefinition); + } +} + http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestConstraintRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestConstraintRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestConstraintRebalanceStrategy.java new file mode 100644 index 0000000..e85e65c --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestConstraintRebalanceStrategy.java @@ -0,0 +1,453 @@ +package org.apache.helix.controller.rebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceHardConstraint; +import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceSoftConstraint; +import org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider; +import org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider; +import org.apache.helix.controller.rebalancer.constraint.PartitionWeightAwareEvennessConstraint; +import org.apache.helix.controller.rebalancer.constraint.TotalCapacityConstraint; +import org.apache.helix.controller.rebalancer.constraint.dataprovider.MockCapacityProvider; +import org.apache.helix.controller.rebalancer.constraint.dataprovider.MockPartitionWeightProvider; +import org.apache.helix.controller.rebalancer.strategy.ConstraintRebalanceStrategy; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.*; + +public class TestConstraintRebalanceStrategy { + private static Logger _logger = LoggerFactory.getLogger(TestConstraintRebalanceStrategy.class); + + final String resourceNamePrefix = "resource"; + final int nParticipants = 40; + final int nResources = 20; + final int nPartitions = 100; + final int nReplicas = 3; + final int defaultCapacity = 6000; // total = 6000*40 = 240000 + final int resourceWeight = 10; // total = 20*100*3*10 = 60000 + final String topState = "ONLINE"; + + final List<String> resourceNames = new ArrayList<>(); + final List<String> instanceNames = new ArrayList<>(); + final List<String> partitions = new ArrayList<>(nPartitions); + + final ClusterDataCache cache = new ClusterDataCache(); + final LinkedHashMap<String, Integer> states = new LinkedHashMap<>(2); + + @BeforeClass + public void beforeClass() { + for (int i = 0; i < nResources; i++) { + resourceNames.add(resourceNamePrefix + i); + } + for (int i = 0; i < nParticipants; i++) { + instanceNames.add("node" + i); + } + for (int i = 0; i < nPartitions; i++) { + partitions.add(Integer.toString(i)); + } + + setupMockCluster(); + } + + private void setupMockCluster() { + List<LiveInstance> liveInstanceList = new ArrayList<>(); + Map<String, InstanceConfig> instanceConfigs = new HashMap<>(); + for (String instance : instanceNames) { + LiveInstance liveInstance = new LiveInstance(instance); + liveInstanceList.add(liveInstance); + InstanceConfig config = new InstanceConfig(instance); + instanceConfigs.put(instance, config); + } + cache.setLiveInstances(liveInstanceList); + cache.setInstanceConfigMap(instanceConfigs); + ClusterConfig clusterConfig = new ClusterConfig("test"); + clusterConfig.setTopologyAwareEnabled(false); + cache.setClusterConfig(clusterConfig); + + states.put("OFFLINE", 0); + states.put(topState, nReplicas); + } + + private Map<String, Map<String, Map<String, String>>> calculateAssignment( + List<AbstractRebalanceHardConstraint> hardConstraints, + List<AbstractRebalanceSoftConstraint> softConstraints) { + Map<String, Map<String, Map<String, String>>> result = new HashMap<>(); + + ConstraintRebalanceStrategy strategy = + new ConstraintRebalanceStrategy(hardConstraints, softConstraints); + + for (String resourceName : resourceNames) { + Map<String, Map<String, String>> partitionMap = new HashMap<>(); + + strategy.init(resourceName, partitions, states, Integer.MAX_VALUE); + partitionMap.putAll(strategy.computePartitionAssignment(instanceNames, instanceNames, + new HashMap<String, Map<String, String>>(), cache).getMapFields()); + result.put(resourceName, partitionMap); + } + return result; + } + + private Map<String, Integer> checkPartitionUsage( + Map<String, Map<String, Map<String, String>>> assignment, + PartitionWeightProvider weightProvider) { + Map<String, Integer> weightCount = new HashMap<>(); + for (String resource : assignment.keySet()) { + Map<String, Map<String, String>> partitionMap = assignment.get(resource); + for (String partition : partitionMap.keySet()) { + // check states + Map<String, Integer> stateCount = new HashMap<>(states); + Map<String, String> stateMap = partitionMap.get(partition); + for (String state : stateMap.values()) { + Assert.assertTrue(stateCount.containsKey(state)); + stateCount.put(state, stateCount.get(state) - 1); + } + for (int count : stateCount.values()) { + Assert.assertEquals(count, 0); + } + + // report weight + int partitionWeight = weightProvider.getPartitionWeight(resource, partition); + for (String instance : partitionMap.get(partition).keySet()) { + if (!weightCount.containsKey(instance)) { + weightCount.put(instance, partitionWeight); + } else { + weightCount.put(instance, weightCount.get(instance) + partitionWeight); + } + } + } + } + return weightCount; + } + + @Test + public void testEvenness() { + // capacity / weight + Map<String, Integer> capacity = new HashMap<>(); + for (String instance : instanceNames) { + capacity.put(instance, defaultCapacity); + } + + PartitionWeightProvider weightProvider = new MockPartitionWeightProvider(resourceWeight); + CapacityProvider capacityProvider = new MockCapacityProvider(capacity, 0); + + TotalCapacityConstraint capacityConstraint = + new TotalCapacityConstraint(weightProvider, capacityProvider); + PartitionWeightAwareEvennessConstraint evenConstraint = + new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider); + + Map<String, Map<String, Map<String, String>>> assignment = calculateAssignment( + Collections.<AbstractRebalanceHardConstraint>singletonList(capacityConstraint), + Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint)); + Map<String, Integer> weightCount = checkPartitionUsage(assignment, weightProvider); + + int max = Collections.max(weightCount.values()); + int min = Collections.min(weightCount.values()); + // Since the accuracy of Default evenness constraint is 0.01, diff should be 1/100 of participant capacity in max. + Assert.assertTrue((max - min) <= defaultCapacity / 100); + } + + @Test + public void testEvennessByDefaultConstraint() { + Map<String, Map<String, Map<String, String>>> result = new HashMap<>(); + + ConstraintRebalanceStrategy strategy = new ConstraintRebalanceStrategy(); + + for (String resourceName : resourceNames) { + Map<String, Map<String, String>> partitionMap = new HashMap<>(); + + strategy.init(resourceName, partitions, states, Integer.MAX_VALUE); + partitionMap.putAll(strategy.computePartitionAssignment(instanceNames, instanceNames, + new HashMap<String, Map<String, String>>(), cache).getMapFields()); + result.put(resourceName, partitionMap); + } + + Map<String, Integer> weightCount = checkPartitionUsage(result, new PartitionWeightProvider() { + @Override + public int getPartitionWeight(String resource, String partition) { + return 1; + } + }); + int max = Collections.max(weightCount.values()); + int min = Collections.min(weightCount.values()); + // Since the accuracy of Default evenness constraint is 0.01, diff should be 1/100 of participant capacity in max. + Assert.assertTrue((max - min) <= defaultCapacity / 100); + } + + @Test + public void testCapacityAwareEvenness() { + // capacity / weight + int totalBucket = 0; + Map<String, Integer> capacity = new HashMap<>(); + for (int i = 0; i < instanceNames.size(); i++) { + capacity.put(instanceNames.get(i), defaultCapacity * (1 + i % 3)); + totalBucket += 1 + i % 3; + } + int partitionWeightGranularity = (int) (resourceWeight * 1.5); + int totalPartitionWeight = 0; + Random ran = new Random(System.currentTimeMillis()); + Map<String, Map<String, Integer>> partitionWeightMap = new HashMap<>(); + for (String resource : resourceNames) { + Map<String, Integer> weights = new HashMap<>(); + for (String partition : partitions) { + int weight = resourceWeight / 2 + ran.nextInt(resourceWeight); + weights.put(partition, weight); + totalPartitionWeight += weight * nReplicas; + } + partitionWeightMap.put(resource, weights); + } + + PartitionWeightProvider weightProvider = + new MockPartitionWeightProvider(partitionWeightMap, resourceWeight); + CapacityProvider capacityProvider = new MockCapacityProvider(capacity, 0); + + PartitionWeightAwareEvennessConstraint evenConstraint = + new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider); + + Map<String, Map<String, Map<String, String>>> assignment = + calculateAssignment(Collections.EMPTY_LIST, + Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint)); + Map<String, Integer> weightCount = checkPartitionUsage(assignment, weightProvider); + + for (int i = 0; i < instanceNames.size(); i++) { + String instanceName = instanceNames.get(i); + int expectedUsage = (int) ((double) totalPartitionWeight) / totalBucket * (1 + i % 3); + int realUsage = weightCount.get(instanceName); + // When have different capacity, calculation in the rebalance algorithm would have more fractions, so lose the restriction to 90% to 110% compared with the ideal value. + Assert.assertTrue((expectedUsage - partitionWeightGranularity) * 0.9 <= realUsage + && (expectedUsage + partitionWeightGranularity) * 1.1 >= realUsage); + } + } + + @Test + public void testHardConstraintFails() { + // capacity / weight + Map<String, Integer> capacity = new HashMap<>(); + for (String instance : instanceNames) { + // insufficient capacity + capacity.put(instance, defaultCapacity / 100); + } + + PartitionWeightProvider weightProvider = new MockPartitionWeightProvider(resourceWeight); + CapacityProvider capacityProvider = new MockCapacityProvider(capacity, 0); + + TotalCapacityConstraint capacityConstraint = + new TotalCapacityConstraint(weightProvider, capacityProvider); + + try { + calculateAssignment( + Collections.<AbstractRebalanceHardConstraint>singletonList(capacityConstraint), + Collections.EMPTY_LIST); + Assert.fail("Assignment should fail because of insufficient capacity."); + } catch (IllegalStateException e) { + // expected + } + } + + @Test(dependsOnMethods = "testHardConstraintFails") + public void testConflictConstraint() { + // capacity / weight + Map<String, Integer> capacity = new HashMap<>(); + for (String instance : instanceNames) { + // insufficient capacity + capacity.put(instance, defaultCapacity); + } + + PartitionWeightProvider weightProvider = new MockPartitionWeightProvider(resourceWeight); + CapacityProvider capacityProvider = new MockCapacityProvider(capacity, 0); + + TotalCapacityConstraint normalCapacityConstraint = + new TotalCapacityConstraint(weightProvider, capacityProvider); + TotalCapacityConstraint conflictingCapacityConstraint = + new TotalCapacityConstraint(weightProvider, + new MockCapacityProvider(Collections.EMPTY_MAP, 0)); + List<AbstractRebalanceHardConstraint> constraints = new ArrayList<>(); + constraints.add(normalCapacityConstraint); + constraints.add(conflictingCapacityConstraint); + + try { + calculateAssignment(constraints, Collections.EMPTY_LIST); + Assert.fail("Assignment should fail because of the conflicting capacity constraint."); + } catch (IllegalStateException e) { + // expected + } + } + + @Test(dependsOnMethods = "testEvenness") + public void testSoftConstraintFails() { + // capacity / weight + Map<String, Integer> capacity = new HashMap<>(); + for (String instance : instanceNames) { + // insufficient capacity + capacity.put(instance, defaultCapacity / 50); + } + + PartitionWeightProvider weightProvider = new MockPartitionWeightProvider(resourceWeight); + CapacityProvider capacityProvider = new MockCapacityProvider(capacity, 0); + + PartitionWeightAwareEvennessConstraint evenConstraint = + new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider); + + Map<String, Map<String, Map<String, String>>> assignment = + calculateAssignment(Collections.EMPTY_LIST, + Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint)); + Map<String, Integer> weightCount = checkPartitionUsage(assignment, weightProvider); + + int max = Collections.max(weightCount.values()); + int min = Collections.min(weightCount.values()); + // Since the accuracy of Default evenness constraint is 0.01, diff should be 1/100 of participant capacity in max. + Assert.assertTrue((max - min) <= defaultCapacity / 100); + } + + @Test(dependsOnMethods = "testEvenness") + public void testRebalanceWithPreferredAssignment() { + // capacity / weight + Map<String, Integer> capacity = new HashMap<>(); + for (String instance : instanceNames) { + capacity.put(instance, defaultCapacity); + } + + PartitionWeightProvider weightProvider = new MockPartitionWeightProvider(resourceWeight); + CapacityProvider capacityProvider = new MockCapacityProvider(capacity, 0); + + PartitionWeightAwareEvennessConstraint evenConstraint = + new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider); + + // inject valid partition assignment for one resources into preferred assignment. + List<String> instances = instanceNames.subList(0, nReplicas); + Map<String, Map<String, String>> preferredPartitionAssignment = new HashMap<>(); + Map<String, String> replicaState = new HashMap<>(); + for (String instance : instances) { + replicaState.put(instance, topState); + } + preferredPartitionAssignment.put(partitions.get(0), replicaState); + Map<String, Map<String, Map<String, String>>> preferredAssignment = new HashMap<>(); + preferredAssignment.put(resourceNames.get(0), preferredPartitionAssignment); + + // inject invalid partition assignment for one resources into preferred assignment. + instances = instanceNames.subList(0, nReplicas - 1); + Map<String, String> invalidReplicaState = new HashMap<>(); + for (String instance : instances) { + invalidReplicaState.put(instance, topState); + } + preferredPartitionAssignment = new HashMap<>(); + preferredPartitionAssignment.put(partitions.get(0), invalidReplicaState); + preferredAssignment.put(resourceNames.get(1), preferredPartitionAssignment); + + Map<String, Map<String, Map<String, String>>> assignment = new HashMap<>(); + ConstraintRebalanceStrategy strategy = new ConstraintRebalanceStrategy(Collections.EMPTY_LIST, + Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint)); + for (String resourceName : resourceNames) { + Map<String, Map<String, String>> partitionMap = new HashMap<>(); + + strategy.init(resourceName, partitions, states, Integer.MAX_VALUE); + partitionMap.putAll(strategy.computePartitionAssignment(instanceNames, instanceNames, + preferredAssignment.containsKey(resourceName) ? + preferredAssignment.get(resourceName) : + Collections.EMPTY_MAP, cache).getMapFields()); + assignment.put(resourceName, partitionMap); + } + + // Even with preferred assignment, the weight should still be balance + Map<String, Integer> weightCount = checkPartitionUsage(assignment, weightProvider); + int max = Collections.max(weightCount.values()); + int min = Collections.min(weightCount.values()); + // Since the accuracy of Default evenness constraint is 0.01, diff should be 1/100 of participant capacity in max. + Assert.assertTrue((max - min) <= defaultCapacity / 100); + + // the resource 0 assignment should be kept the same + Collection<String> resource_0_Assignment = + assignment.get(resourceNames.get(0)).get(partitions.get(0)).keySet(); + Assert.assertTrue(resource_0_Assignment.containsAll(instanceNames.subList(0, nReplicas)) + && resource_0_Assignment.size() == nReplicas); + // the resource 1 assignment should be set to a valid one + Assert.assertTrue( + assignment.get(resourceNames.get(1)).get(partitions.get(0)).size() == nReplicas); + } + + @Test + public void testTopologyAwareAssignment() { + // Topology Aware configuration + ClusterDataCache cache = new ClusterDataCache(); + List<LiveInstance> liveInstanceList = new ArrayList<>(); + Map<String, InstanceConfig> instanceConfigs = new HashMap<>(); + for (int i = 0; i < instanceNames.size(); i++) { + String instance = instanceNames.get(i); + LiveInstance liveInstance = new LiveInstance(instance); + liveInstanceList.add(liveInstance); + InstanceConfig config = new InstanceConfig(instance); + config.setDomain(String.format("Rack=%s,Host=%s", i % (nParticipants / 5), instance)); + instanceConfigs.put(instance, config); + } + cache.setLiveInstances(liveInstanceList); + cache.setInstanceConfigMap(instanceConfigs); + ClusterConfig clusterConfig = new ClusterConfig("test"); + clusterConfig.setTopologyAwareEnabled(true); + clusterConfig.setTopology("/Rack/Host"); + clusterConfig.setFaultZoneType("Rack"); + cache.setClusterConfig(clusterConfig); + + Map<String, Map<String, Map<String, String>>> result = new HashMap<>(); + ConstraintRebalanceStrategy strategy = new ConstraintRebalanceStrategy(); + + for (String resourceName : resourceNames) { + Map<String, Map<String, String>> partitionMap = new HashMap<>(); + + strategy.init(resourceName, partitions, states, Integer.MAX_VALUE); + partitionMap.putAll(strategy.computePartitionAssignment(instanceNames, instanceNames, + new HashMap<String, Map<String, String>>(), cache).getMapFields()); + result.put(resourceName, partitionMap); + } + + Map<String, Integer> weightCount = checkPartitionUsage(result, new PartitionWeightProvider() { + @Override + public int getPartitionWeight(String resource, String partition) { + return defaultCapacity; + } + }); + int max = Collections.max(weightCount.values()); + int min = Collections.min(weightCount.values()); + Assert.assertTrue((max - min) <= defaultCapacity / 100); + + // check for domain assignment + Map<String, Set<String>> domainPartitionMap = new HashMap<>(); + for (Map<String, Map<String, String>> partitionMap : result.values()) { + domainPartitionMap.clear(); + for (String partition : partitionMap.keySet()) { + for (String instance : partitionMap.get(partition).keySet()) { + String domain = instanceConfigs.get(instance).getDomain().split(",")[0].split("=")[1]; + if (domainPartitionMap.containsKey(domain)) { + Assert.assertFalse(domainPartitionMap.get(domain).contains(partition)); + } else { + domainPartitionMap.put(domain, new HashSet<String>()); + } + domainPartitionMap.get(domain).add(partition); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/MockCapacityProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/MockCapacityProvider.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/MockCapacityProvider.java new file mode 100644 index 0000000..3235c0b --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/MockCapacityProvider.java @@ -0,0 +1,52 @@ +package org.apache.helix.controller.rebalancer.constraint.dataprovider; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider; + +import java.util.HashMap; +import java.util.Map; + +public class MockCapacityProvider implements CapacityProvider { + private final int _defaultCapacity; + private final Map<String, Integer> _capacityMap = new HashMap<>(); + private final Map<String, Integer> _usageMap = new HashMap<>(); + + public MockCapacityProvider(Map<String, Integer> capacityMap, int defaultCapacity) { + _capacityMap.putAll(capacityMap); + _defaultCapacity = defaultCapacity; + } + + @Override + public int getParticipantCapacity(String participant) { + if (_capacityMap.containsKey(participant)) { + return _capacityMap.get(participant); + } + return _defaultCapacity; + } + + @Override + public int getParticipantUsage(String participant) { + if (_usageMap.containsKey(participant)) { + return _usageMap.get(participant); + } + return 0; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/MockPartitionWeightProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/MockPartitionWeightProvider.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/MockPartitionWeightProvider.java new file mode 100644 index 0000000..143e86e --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/MockPartitionWeightProvider.java @@ -0,0 +1,50 @@ +package org.apache.helix.controller.rebalancer.constraint.dataprovider; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider; + +import java.util.HashMap; +import java.util.Map; + +public class MockPartitionWeightProvider implements PartitionWeightProvider { + private final int _defaultWeight; + private Map<String, Map<String, Integer>> _partitionWeightMap = new HashMap<>(); + + public MockPartitionWeightProvider(int defaultWeight) { + // use the default weight + _defaultWeight = defaultWeight; + } + + public MockPartitionWeightProvider(Map<String, Map<String, Integer>> partitionWeightMap, + int defaultWeight) { + _partitionWeightMap = partitionWeightMap; + _defaultWeight = defaultWeight; + } + + @Override + public int getPartitionWeight(String resource, String partition) { + if (_partitionWeightMap.containsKey(resource) && _partitionWeightMap.get(resource) + .containsKey(partition)) { + return _partitionWeightMap.get(resource).get(partition); + } + return _defaultWeight; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/test/java/org/apache/helix/integration/TestWeightBasedRebalanceUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestWeightBasedRebalanceUtil.java b/helix-core/src/test/java/org/apache/helix/integration/TestWeightBasedRebalanceUtil.java new file mode 100644 index 0000000..488fcab --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestWeightBasedRebalanceUtil.java @@ -0,0 +1,488 @@ +package org.apache.helix.integration; + +import org.apache.helix.HelixException; +import org.apache.helix.ZNRecord; +import org.apache.helix.api.config.RebalanceConfig; +import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceHardConstraint; +import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceSoftConstraint; +import org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider; +import org.apache.helix.controller.common.PartitionStateMap; +import org.apache.helix.controller.common.ResourcesStateMap; +import org.apache.helix.controller.rebalancer.constraint.PartitionWeightAwareEvennessConstraint; +import org.apache.helix.controller.rebalancer.constraint.TotalCapacityConstraint; +import org.apache.helix.controller.rebalancer.constraint.dataprovider.MockCapacityProvider; +import org.apache.helix.controller.rebalancer.constraint.dataprovider.MockPartitionWeightProvider; +import org.apache.helix.controller.rebalancer.constraint.dataprovider.ZkBasedCapacityProvider; +import org.apache.helix.controller.rebalancer.constraint.dataprovider.ZkBasedPartitionWeightProvider; +import org.apache.helix.integration.common.ZkIntegrationTestBase; +import org.apache.helix.model.*; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.util.WeightAwareRebalanceUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.*; + +import static org.apache.helix.controller.rebalancer.constraint.dataprovider.ZkBasedPartitionWeightProvider.DEFAULT_WEIGHT_VALUE; + +public class TestWeightBasedRebalanceUtil extends ZkIntegrationTestBase { + private static Logger _logger = LoggerFactory.getLogger(TestWeightBasedRebalanceUtil.class); + private static String CLUSTER_NAME; + private static ClusterSetup _setupTool; + + final String resourceNamePrefix = "resource"; + final int nParticipants = 40; + final int nResources = 20; + final int nPartitions = 100; + final int nReplicas = 3; + final int defaultCapacity = 6000; // total = 6000*40 = 240000 + final int resourceWeight = 10; // total = 20*100*3*10 = 60000 + final String topState = "ONLINE"; + + final List<String> resourceNames = new ArrayList<>(); + final List<String> instanceNames = new ArrayList<>(); + final List<String> partitions = new ArrayList<>(nPartitions); + final List<ResourceConfig> resourceConfigs = new ArrayList<>(); + + final LinkedHashMap<String, Integer> states = new LinkedHashMap<>(2); + + final ClusterConfig clusterConfig = new ClusterConfig(CLUSTER_NAME); + final List<InstanceConfig> instanceConfigs = new ArrayList<>(); + + @BeforeClass + public void beforeClass() { + System.out.println( + "START " + getClass().getSimpleName() + " at " + new Date(System.currentTimeMillis())); + + CLUSTER_NAME = "MockCluster" + getShortClassName(); + + for (int i = 0; i < nParticipants; i++) { + instanceNames.add("node" + i); + } + for (int i = 0; i < nPartitions; i++) { + partitions.add(Integer.toString(i)); + } + + for (int i = 0; i < nResources; i++) { + resourceNames.add(resourceNamePrefix + i); + ResourceConfig.Builder resourcBuilder = new ResourceConfig.Builder(resourceNamePrefix + i); + resourcBuilder.setStateModelDefRef("OnlineOffline"); + resourcBuilder.setNumReplica(nReplicas); + for (String partition : partitions) { + resourcBuilder.setPreferenceList(partition, Collections.EMPTY_LIST); + } + resourceConfigs.add(resourcBuilder.build()); + } + + setupMockCluster(); + + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursive(namespace); + } + _setupTool = new ClusterSetup(ZK_ADDR); + + // setup storage cluster + _setupTool.addCluster(CLUSTER_NAME, true); + } + + @AfterClass + public void afterClass() { + _setupTool.deleteCluster(CLUSTER_NAME); + } + + private void setupMockCluster() { + for (String instance : instanceNames) { + InstanceConfig config = new InstanceConfig(instance); + instanceConfigs.add(config); + } + + states.put("OFFLINE", 0); + states.put(topState, nReplicas); + } + + private Map<String, Integer> checkPartitionUsage(ResourcesStateMap assignment, + PartitionWeightProvider weightProvider) { + Map<String, Integer> weightCount = new HashMap<>(); + for (String resource : assignment.resourceSet()) { + PartitionStateMap partitionMap = assignment.getPartitionStateMap(resource); + for (Partition partition : partitionMap.partitionSet()) { + // check states + Map<String, Integer> stateCount = new HashMap<>(states); + Map<String, String> stateMap = partitionMap.getPartitionMap(partition); + for (String state : stateMap.values()) { + Assert.assertTrue(stateCount.containsKey(state)); + stateCount.put(state, stateCount.get(state) - 1); + } + for (int count : stateCount.values()) { + Assert.assertEquals(count, 0); + } + + // report weight + int partitionWeight = + weightProvider.getPartitionWeight(resource, partition.getPartitionName()); + for (String instance : partitionMap.getPartitionMap(partition).keySet()) { + if (!weightCount.containsKey(instance)) { + weightCount.put(instance, partitionWeight); + } else { + weightCount.put(instance, weightCount.get(instance) + partitionWeight); + } + } + } + } + return weightCount; + } + + private void validateWeight(PartitionWeightProvider provider) { + for (String resource : resourceNames) { + for (String partition : partitions) { + int weight = provider.getPartitionWeight(resource, partition); + if (resource.equals(resourceNames.get(0))) { + if (partition.equals(partitions.get(0))) { + Assert.assertEquals(weight, resourceWeight * 3); + } else { + Assert.assertEquals(weight, resourceWeight * 2); + } + } else if (resource.equals(resourceNames.get(1))) { + if (partition.equals(partitions.get(0))) { + Assert.assertEquals(weight, resourceWeight * 3); + } else { + Assert.assertEquals(weight, resourceWeight); + } + } else { + Assert.assertEquals(weight, resourceWeight); + } + } + } + } + + @Test + public void testRebalance() { + // capacity / weight + Map<String, Integer> capacity = new HashMap<>(); + for (String instance : instanceNames) { + capacity.put(instance, defaultCapacity); + } + + MockCapacityProvider capacityProvider = new MockCapacityProvider(capacity, defaultCapacity); + + MockPartitionWeightProvider weightProvider = new MockPartitionWeightProvider(resourceWeight); + + TotalCapacityConstraint capacityConstraint = + new TotalCapacityConstraint(weightProvider, capacityProvider); + PartitionWeightAwareEvennessConstraint evenConstraint = + new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider); + + WeightAwareRebalanceUtil util = new WeightAwareRebalanceUtil(clusterConfig, instanceConfigs); + ResourcesStateMap assignment = util.buildIncrementalRebalanceAssignment(resourceConfigs, null, + Collections.<AbstractRebalanceHardConstraint>singletonList(capacityConstraint), + Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint)); + Map<String, Integer> weightCount = checkPartitionUsage(assignment, weightProvider); + + int max = Collections.max(weightCount.values()); + int min = Collections.min(weightCount.values()); + // Since the accuracy of Default evenness constraint is 0.01, diff should be 1/100 of participant capacity in max. + Assert.assertTrue((max - min) <= defaultCapacity / 100); + } + + @Test + public void testZkBasedCapacityProvider() { + Map<String, Integer> resourceDefaultWeightMap = new HashMap<>(); + resourceDefaultWeightMap.put(resourceNames.get(0), resourceWeight * 2); + Map<String, Map<String, Integer>> partitionWeightMap = new HashMap<>(); + partitionWeightMap + .put(resourceNames.get(0), Collections.singletonMap(partitions.get(0), resourceWeight * 3)); + partitionWeightMap + .put(resourceNames.get(1), Collections.singletonMap(partitions.get(0), resourceWeight * 3)); + + ZkBasedPartitionWeightProvider weightProvider = + new ZkBasedPartitionWeightProvider(ZK_ADDR, CLUSTER_NAME, "Test"); + weightProvider.updateWeights(resourceDefaultWeightMap, partitionWeightMap, resourceWeight); + // verify before persist + validateWeight(weightProvider); + + // persist get values back + weightProvider.persistWeights(); + // verify after persist + weightProvider = new ZkBasedPartitionWeightProvider(ZK_ADDR, CLUSTER_NAME, "Test"); + validateWeight(weightProvider); + + weightProvider = new ZkBasedPartitionWeightProvider(ZK_ADDR, CLUSTER_NAME, "Fack"); + for (String resource : resourceNames) { + for (String partition : partitions) { + Assert.assertEquals(weightProvider.getPartitionWeight(resource, partition), + DEFAULT_WEIGHT_VALUE); + } + } + + // update with invalid value + weightProvider.updateWeights(Collections.EMPTY_MAP, Collections.EMPTY_MAP, -1); + try { + weightProvider.persistWeights(); + Assert.fail("Should fail to persist invalid weight information."); + } catch (HelixException hex) { + // expected + } + + Map<String, Integer> capacity = new HashMap<>(); + Map<String, Integer> usage = new HashMap<>(); + for (int i = 0; i < instanceNames.size(); i++) { + capacity.put(instanceNames.get(i), defaultCapacity + i); + usage.put(instanceNames.get(i), i); + } + + ZkBasedCapacityProvider capacityProvider = + new ZkBasedCapacityProvider(ZK_ADDR, CLUSTER_NAME, "Test"); + capacityProvider.updateCapacity(capacity, usage, defaultCapacity); + + for (String instance : instanceNames) { + Assert.assertEquals(capacityProvider.getParticipantCapacity(instance), + capacity.get(instance).intValue()); + Assert.assertEquals(capacityProvider.getParticipantUsage(instance), + usage.get(instance).intValue()); + } + + // persist get values back + capacityProvider.persistCapacity(); + capacityProvider = new ZkBasedCapacityProvider(ZK_ADDR, CLUSTER_NAME, "Test"); + for (String instance : instanceNames) { + Assert.assertEquals(capacityProvider.getParticipantCapacity(instance), + capacity.get(instance).intValue()); + Assert.assertEquals(capacityProvider.getParticipantUsage(instance), + usage.get(instance).intValue()); + } + + // update usage + String targetInstanceName = instanceNames.get(0); + int newUsgae = 12345; + capacityProvider.updateCapacity(Collections.EMPTY_MAP, + Collections.singletonMap(targetInstanceName, newUsgae), defaultCapacity); + Assert.assertEquals(capacityProvider.getParticipantUsage(targetInstanceName), newUsgae); + // check again without updating ZK + capacityProvider = new ZkBasedCapacityProvider(ZK_ADDR, CLUSTER_NAME, "Test"); + Assert.assertEquals(capacityProvider.getParticipantUsage(targetInstanceName), 0); + + // update with invalid value + capacityProvider.updateCapacity(Collections.EMPTY_MAP, Collections.EMPTY_MAP, -1); + try { + capacityProvider.persistCapacity(); + Assert.fail("Should fail to persist invalid weight information."); + } catch (HelixException hex) { + // expected + } + } + + @Test + public void testRebalanceUsingZkDataProvider() { + // capacity / weight + Map<String, Integer> capacity = new HashMap<>(); + for (String instance : instanceNames) { + capacity.put(instance, defaultCapacity); + } + + ZkBasedPartitionWeightProvider weightProvider = + new ZkBasedPartitionWeightProvider(ZK_ADDR, CLUSTER_NAME, "QPS"); + weightProvider.updateWeights(Collections.EMPTY_MAP, Collections.EMPTY_MAP, resourceWeight); + + ZkBasedCapacityProvider capacityProvider = + new ZkBasedCapacityProvider(ZK_ADDR, CLUSTER_NAME, "QPS"); + capacityProvider.updateCapacity(capacity, Collections.EMPTY_MAP, 0); + + TotalCapacityConstraint capacityConstraint = + new TotalCapacityConstraint(weightProvider, capacityProvider); + PartitionWeightAwareEvennessConstraint evenConstraint = + new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider); + + WeightAwareRebalanceUtil util = new WeightAwareRebalanceUtil(clusterConfig, instanceConfigs); + ResourcesStateMap assignment = util.buildIncrementalRebalanceAssignment(resourceConfigs, null, + Collections.<AbstractRebalanceHardConstraint>singletonList(capacityConstraint), + Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint)); + Map<String, Integer> weightCount = checkPartitionUsage(assignment, weightProvider); + + int max = Collections.max(weightCount.values()); + int min = Collections.min(weightCount.values()); + // Since the accuracy of Default evenness constraint is 0.01, diff should be 1/100 of participant capacity in max. + Assert.assertTrue((max - min) <= defaultCapacity / 100); + } + + @Test(dependsOnMethods = "testRebalanceUsingZkDataProvider") + public void testRebalanceWithExistingUsage() { + // capacity / weight + Map<String, Integer> capacity = new HashMap<>(); + Map<String, Integer> usage = new HashMap<>(); + for (int i = 0; i < instanceNames.size(); i++) { + String instance = instanceNames.get(i); + capacity.put(instance, defaultCapacity); + if (i % 7 == 0) { + usage.put(instance, defaultCapacity); + } + } + + ZkBasedPartitionWeightProvider weightProvider = + new ZkBasedPartitionWeightProvider(ZK_ADDR, CLUSTER_NAME, "QPS"); + weightProvider.updateWeights(Collections.EMPTY_MAP, Collections.EMPTY_MAP, resourceWeight); + + ZkBasedCapacityProvider capacityProvider = + new ZkBasedCapacityProvider(ZK_ADDR, CLUSTER_NAME, "QPS"); + capacityProvider.updateCapacity(capacity, usage, 0); + + TotalCapacityConstraint hardConstraint = + new TotalCapacityConstraint(weightProvider, capacityProvider); + PartitionWeightAwareEvennessConstraint evenConstraint = + new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider); + + WeightAwareRebalanceUtil util = new WeightAwareRebalanceUtil(clusterConfig, instanceConfigs); + ResourcesStateMap assignment = util.buildIncrementalRebalanceAssignment(resourceConfigs, null, + Collections.<AbstractRebalanceHardConstraint>singletonList(hardConstraint), + Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint)); + Map<String, Integer> weightCount = checkPartitionUsage(assignment, weightProvider); + + for (int i = 0; i < instanceNames.size(); i++) { + String instance = instanceNames.get(i); + if (i % 7 == 0) { + Assert.assertTrue(!weightCount.containsKey(instance)); + } else { + Assert.assertTrue(weightCount.get(instance) > 0); + } + } + } + + @Test(dependsOnMethods = "testRebalanceUsingZkDataProvider") + public void testRebalanceOption() { + // capacity / weight + Map<String, Integer> capacity = new HashMap<>(); + for (String instance : instanceNames) { + capacity.put(instance, defaultCapacity); + } + + ZkBasedPartitionWeightProvider weightProvider = + new ZkBasedPartitionWeightProvider(ZK_ADDR, CLUSTER_NAME, "QPS"); + weightProvider.updateWeights(Collections.EMPTY_MAP, Collections.EMPTY_MAP, resourceWeight); + + ZkBasedCapacityProvider capacityProvider = + new ZkBasedCapacityProvider(ZK_ADDR, CLUSTER_NAME, "QPS"); + capacityProvider.updateCapacity(capacity, Collections.EMPTY_MAP, 0); + + PartitionWeightAwareEvennessConstraint evenConstraint = + new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider); + + // Assume existing assignment + ResourcesStateMap existingAssignment = new ResourcesStateMap(); + String targetResource = resourceNames.get(0); + for (String partition : partitions) { + for (int i = 0; i < nReplicas; i++) { + existingAssignment + .setState(targetResource, new Partition(partition), instanceNames.get(i), topState); + } + } + + WeightAwareRebalanceUtil util = new WeightAwareRebalanceUtil(clusterConfig, instanceConfigs); + + // INCREMENTAL + ResourcesStateMap assignment = + util.buildIncrementalRebalanceAssignment(resourceConfigs, existingAssignment, + Collections.EMPTY_LIST, + Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint)); + // check if the existingAssignment is changed + for (String partition : partitions) { + Assert.assertTrue( + assignment.getInstanceStateMap(targetResource, new Partition(partition)).keySet() + .containsAll(instanceNames.subList(0, nReplicas))); + } + // still need to check for balance + Map<String, Integer> weightCount = checkPartitionUsage(assignment, weightProvider); + int max = Collections.max(weightCount.values()); + int min = Collections.min(weightCount.values()); + // Since the accuracy of Default evenness constraint is 0.01, diff should be 1/100 of participant capacity in max. + Assert.assertTrue((max - min) <= defaultCapacity / 100); + + // FULL + assignment = util.buildFullRebalanceAssignment(resourceConfigs, existingAssignment, + Collections.EMPTY_LIST, + Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint)); + // check if the existingAssignment is changed + for (String partition : partitions) { + Assert.assertFalse( + assignment.getInstanceStateMap(targetResource, new Partition(partition)).keySet() + .containsAll(instanceNames.subList(0, nReplicas))); + } + } + + @Test(dependsOnMethods = "testRebalanceUsingZkDataProvider") + public void testInvalidInput() { + // capacity / weight + Map<String, Integer> capacity = new HashMap<>(); + for (String instance : instanceNames) { + capacity.put(instance, defaultCapacity); + } + + ZkBasedPartitionWeightProvider weightProvider = + new ZkBasedPartitionWeightProvider(ZK_ADDR, CLUSTER_NAME, "QPS"); + weightProvider.updateWeights(Collections.EMPTY_MAP, Collections.EMPTY_MAP, resourceWeight); + + ZkBasedCapacityProvider capacityProvider = + new ZkBasedCapacityProvider(ZK_ADDR, CLUSTER_NAME, "QPS"); + capacityProvider.updateCapacity(capacity, Collections.EMPTY_MAP, 0); + + TotalCapacityConstraint capacityConstraint = + new TotalCapacityConstraint(weightProvider, capacityProvider); + + WeightAwareRebalanceUtil util = new WeightAwareRebalanceUtil(clusterConfig, instanceConfigs); + + // Empty constraint + try { + util.buildIncrementalRebalanceAssignment(resourceConfigs, null, Collections.EMPTY_LIST, + Collections.EMPTY_LIST); + Assert.fail("Should fail due to empty constraint list."); + } catch (HelixException ex) { + // expected + } + + ResourceConfig.Builder invalidResourceBuilder = new ResourceConfig.Builder("InvalidResource"); + invalidResourceBuilder.setStateModelDefRef("OnlineOffline"); + invalidResourceBuilder.setNumPartitions(nPartitions); + invalidResourceBuilder.setNumReplica(nReplicas); + for (String partition : partitions) { + invalidResourceBuilder.setPreferenceList(partition, Collections.EMPTY_LIST); + } + + // Auto mode resource config + try { + invalidResourceBuilder + .setRebalanceConfig(new RebalanceConfig(new ZNRecord("InvalidResource"))); + invalidResourceBuilder.getRebalanceConfig() + .setRebalanceMode(RebalanceConfig.RebalanceMode.FULL_AUTO); + util.buildIncrementalRebalanceAssignment( + Collections.singletonList(invalidResourceBuilder.build()), null, + Collections.<AbstractRebalanceHardConstraint>singletonList(capacityConstraint), + Collections.EMPTY_LIST); + Assert.fail("Should fail due to full auto resource config."); + } catch (HelixException ex) { + // expected + invalidResourceBuilder.getRebalanceConfig() + .setRebalanceMode(RebalanceConfig.RebalanceMode.CUSTOMIZED); + } + + // Auto mode resource config + try { + invalidResourceBuilder.setStateModelDefRef("CustomizedOnlineOffline"); + util.buildIncrementalRebalanceAssignment( + Collections.singletonList(invalidResourceBuilder.build()), null, + Collections.<AbstractRebalanceHardConstraint>singletonList(capacityConstraint), + Collections.EMPTY_LIST); + Assert.fail("Should fail due to unknown state model def ref."); + } catch (IllegalArgumentException ex) { + // expected + util.registerCustomizedStateModelDef("CustomizedOnlineOffline", OnlineOfflineSMD.build()); + ResourcesStateMap assignment = util.buildIncrementalRebalanceAssignment( + Collections.singletonList(invalidResourceBuilder.build()), null, + Collections.<AbstractRebalanceHardConstraint>singletonList(capacityConstraint), + Collections.EMPTY_LIST); + checkPartitionUsage(assignment, weightProvider); + } + } +}
