This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch wagedRebalancer in repository https://gitbox.apache.org/repos/asf/helix.git
commit 4e9f6558249cd03f4c8687e04bfee66c1648ec56 Author: jiajunwang <[email protected]> AuthorDate: Fri Jul 26 11:42:52 2019 -0700 Adding the configuration items of the WAGED rebalancer. (#348) * Adding the configuration items of the WAGED rebalancer. Including: Instance Capacity Keys, Rebalance Preferences, Instance Capacity Details, Partition Capacity (the weight) Details. Also adding test to cover the new configuration items. --- .../java/org/apache/helix/model/ClusterConfig.java | 129 +++++++++++--- .../org/apache/helix/model/InstanceConfig.java | 62 +++++-- .../org/apache/helix/model/ResourceConfig.java | 139 ++++++++++++++- .../org/apache/helix/model/TestClusterConfig.java | 130 ++++++++++++++ .../org/apache/helix/model/TestInstanceConfig.java | 66 +++++++- .../org/apache/helix/model/TestResourceConfig.java | 186 +++++++++++++++++++++ 6 files changed, 669 insertions(+), 43 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java index 5efecc9..67411ca 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -19,12 +19,8 @@ package org.apache.helix.model; * under the License. */ +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.helix.HelixException; import org.apache.helix.HelixProperty; import org.apache.helix.ZNRecord; @@ -32,6 +28,12 @@ import org.apache.helix.api.config.HelixConfigProperty; import org.apache.helix.api.config.StateTransitionThrottleConfig; import org.apache.helix.api.config.StateTransitionTimeoutConfig; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + /** * Cluster configurations */ @@ -80,7 +82,19 @@ public class ClusterConfig extends HelixProperty { DISABLED_INSTANCES, // Specifies job types and used for quota allocation - QUOTA_TYPES + QUOTA_TYPES, + + // The required instance capacity keys for resource partition assignment calculation. + INSTANCE_CAPACITY_KEYS, + // The preference of the rebalance result. + // EVENNESS - Evenness of the resource utilization, partition, and top state distribution. + // LESS_MOVEMENT - the tendency of keeping the current assignment instead of moving the partition for optimal assignment. + REBALANCE_PREFERENCE + } + + public enum GlobalRebalancePreferenceKey { + EVENNESS, + LESS_MOVEMENT } private final static int DEFAULT_MAX_CONCURRENT_TASK_PER_INSTANCE = 40; @@ -95,6 +109,15 @@ public class ClusterConfig extends HelixProperty { public final static String TASK_QUOTA_RATIO_NOT_SET = "-1"; + // Default preference for all the aspects should be the same to ensure balanced setup. + public final static Map<GlobalRebalancePreferenceKey, Integer> + DEFAULT_GLOBAL_REBALANCE_PREFERENCE = + ImmutableMap.<GlobalRebalancePreferenceKey, Integer>builder() + .put(GlobalRebalancePreferenceKey.EVENNESS, 1) + .put(GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1).build(); + private final static int MAX_REBALANCE_PREFERENCE = 10; + private final static int MIN_REBALANCE_PREFERENCE = 0; + /** * Instantiate for a specific cluster * @param cluster the cluster identifier @@ -113,21 +136,21 @@ public class ClusterConfig extends HelixProperty { /** * Set task quota type with the ratio of this quota. - * @param quotaType String + * @param quotaType String * @param quotaRatio int */ public void setTaskQuotaRatio(String quotaType, int quotaRatio) { if (_record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()) == null) { _record.setMapField(ClusterConfigProperty.QUOTA_TYPES.name(), new HashMap<String, String>()); } - _record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()).put(quotaType, - Integer.toString(quotaRatio)); + _record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()) + .put(quotaType, Integer.toString(quotaRatio)); } /** * Set task quota type with the ratio of this quota. Quota ratio must be a String that is * parse-able into an int. - * @param quotaType String + * @param quotaType String * @param quotaRatio String */ public void setTaskQuotaRatio(String quotaType, String quotaRatio) { @@ -210,8 +233,8 @@ public class ClusterConfig extends HelixProperty { * @return */ public Boolean isPersistIntermediateAssignment() { - return _record.getBooleanField(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString(), - false); + return _record + .getBooleanField(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString(), false); } /** @@ -233,8 +256,8 @@ public class ClusterConfig extends HelixProperty { } public Boolean isPipelineTriggersDisabled() { - return _record.getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(), - false); + return _record + .getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(), false); } /** @@ -403,8 +426,8 @@ public class ClusterConfig extends HelixProperty { * @return */ public int getNumOfflineInstancesForAutoExit() { - return _record.getIntField(ClusterConfigProperty.NUM_OFFLINE_INSTANCES_FOR_AUTO_EXIT.name(), - -1); + return _record + .getIntField(ClusterConfigProperty.NUM_OFFLINE_INSTANCES_FOR_AUTO_EXIT.name(), -1); } /** @@ -444,9 +467,7 @@ public class ClusterConfig extends HelixProperty { if (obj instanceof ClusterConfig) { ClusterConfig that = (ClusterConfig) obj; - if (this.getId().equals(that.getId())) { - return true; - } + return this.getId().equals(that.getId()); } return false; } @@ -490,8 +511,8 @@ public class ClusterConfig extends HelixProperty { } if (!configStrs.isEmpty()) { - _record.setListField(ClusterConfigProperty.STATE_TRANSITION_THROTTLE_CONFIGS.name(), - configStrs); + _record + .setListField(ClusterConfigProperty.STATE_TRANSITION_THROTTLE_CONFIGS.name(), configStrs); } } @@ -579,7 +600,7 @@ public class ClusterConfig extends HelixProperty { public int getErrorPartitionThresholdForLoadBalance() { return _record.getIntField( ClusterConfigProperty.ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE.name(), - DEFAULT_ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE); + DEFAULT_ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE); } /** @@ -658,6 +679,70 @@ public class ClusterConfig extends HelixProperty { } /** + * Set the required Instance Capacity Keys. + * @param capacityKeys + */ + public void setInstanceCapacityKeys(List<String> capacityKeys) { + if (capacityKeys == null || capacityKeys.isEmpty()) { + throw new IllegalArgumentException("The input instance capacity key list is empty."); + } + _record.setListField(ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(), capacityKeys); + } + + /** + * @return The required Instance Capacity Keys. If not configured, return an empty list. + */ + public List<String> getInstanceCapacityKeys() { + List<String> capacityKeys = _record.getListField(ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name()); + if (capacityKeys == null) { + return Collections.emptyList(); + } + return capacityKeys; + } + + /** + * Set the global rebalancer's assignment preference. + * @param preference A map of the GlobalRebalancePreferenceKey and the corresponding weight. + * The ratio of the configured weights will determine the rebalancer's behavior. + */ + public void setGlobalRebalancePreference(Map<GlobalRebalancePreferenceKey, Integer> preference) { + Map<String, String> preferenceMap = new HashMap<>(); + + preference.entrySet().stream().forEach(entry -> { + if (entry.getValue() > MAX_REBALANCE_PREFERENCE + || entry.getValue() < MIN_REBALANCE_PREFERENCE) { + throw new IllegalArgumentException(String + .format("Invalid global rebalance preference configuration. Key %s, Value %d.", + entry.getKey().name(), entry.getValue())); + } + preferenceMap.put(entry.getKey().name(), Integer.toString(entry.getValue())); + }); + + _record.setMapField(ClusterConfigProperty.REBALANCE_PREFERENCE.name(), preferenceMap); + } + + /** + * Get the global rebalancer's assignment preference. + */ + public Map<GlobalRebalancePreferenceKey, Integer> getGlobalRebalancePreference() { + Map<String, String> preferenceStrMap = + _record.getMapField(ClusterConfigProperty.REBALANCE_PREFERENCE.name()); + if (preferenceStrMap != null && !preferenceStrMap.isEmpty()) { + Map<GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>(); + for (GlobalRebalancePreferenceKey key : GlobalRebalancePreferenceKey.values()) { + if (!preferenceStrMap.containsKey(key.name())) { + // If any key is not configured with a value, return the default config. + return DEFAULT_GLOBAL_REBALANCE_PREFERENCE; + } + preference.put(key, Integer.parseInt(preferenceStrMap.get(key.name()))); + } + return preference; + } + // If configuration is not complete, return the default one. + return DEFAULT_GLOBAL_REBALANCE_PREFERENCE; + } + + /** * Get IdealState rules defined in the cluster config. * @return */ diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java index f65a1bd..88fd1dd 100644 --- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java @@ -19,6 +19,14 @@ package org.apache.helix.model; * under the License. */ +import com.google.common.base.Splitter; +import org.apache.helix.HelixException; +import org.apache.helix.HelixProperty; +import org.apache.helix.ZNRecord; +import org.apache.helix.util.HelixUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -27,15 +35,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; - -import org.apache.helix.HelixException; -import org.apache.helix.HelixProperty; -import org.apache.helix.ZNRecord; -import org.apache.helix.util.HelixUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Splitter; +import java.util.stream.Collectors; /** * Instance configurations @@ -55,7 +55,8 @@ public class InstanceConfig extends HelixProperty { INSTANCE_WEIGHT, DOMAIN, DELAY_REBALANCE_ENABLED, - MAX_CONCURRENT_TASK + MAX_CONCURRENT_TASK, + INSTANCE_CAPACITY_MAP } public static final int WEIGHT_NOT_SET = -1; @@ -505,6 +506,47 @@ public class InstanceConfig extends HelixProperty { _record.setIntField(InstanceConfigProperty.MAX_CONCURRENT_TASK.name(), maxConcurrentTask); } + /** + * Get the instance capacity information from the map fields + * + * @return data map if it exists, or empty map + */ + public Map<String, Integer> getInstanceCapacityMap() { + Map<String, String> capacityData = + _record.getMapField(InstanceConfigProperty.INSTANCE_CAPACITY_MAP.name()); + + if (capacityData != null) { + return capacityData.entrySet().stream().collect( + Collectors.toMap(entry -> entry.getKey(), entry -> Integer.parseInt(entry.getValue()))); + } + return Collections.emptyMap(); + } + + /** + * Set the instance capacity information with an Integer mapping + * @param capacityDataMap - map of instance capacity data + * @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty + */ + public void setInstanceCapacityMap(Map<String, Integer> capacityDataMap) + throws IllegalArgumentException { + if (capacityDataMap == null || capacityDataMap.size() == 0) { + throw new IllegalArgumentException("Capacity Data is empty"); + } + + Map<String, String> capacityData = new HashMap<>(); + + capacityDataMap.entrySet().stream().forEach(entry -> { + if (entry.getValue() < 0) { + throw new IllegalArgumentException(String + .format("Capacity Data contains a negative value: %s = %d", entry.getKey(), + entry.getValue())); + } + capacityData.put(entry.getKey(), Integer.toString(entry.getValue())); + }); + + _record.setMapField(InstanceConfigProperty.INSTANCE_CAPACITY_MAP.name(), capacityData); + } + @Override public boolean equals(Object obj) { if (obj instanceof InstanceConfig) { diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java index 274640c..1ead08e 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java @@ -19,19 +19,23 @@ package org.apache.helix.model; * under the License. */ -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import java.util.TreeMap; import org.apache.helix.HelixProperty; import org.apache.helix.ZNRecord; import org.apache.helix.api.config.HelixConfigProperty; import org.apache.helix.api.config.RebalanceConfig; import org.apache.helix.api.config.StateTransitionTimeoutConfig; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + /** * Resource configurations */ @@ -53,7 +57,8 @@ public class ResourceConfig extends HelixProperty { RESOURCE_TYPE, GROUP_ROUTING_ENABLED, EXTERNAL_VIEW_DISABLED, - DELAY_REBALANCE_ENABLED + DELAY_REBALANCE_ENABLED, + PARTITION_CAPACITY_MAP } public enum ResourceConfigConstants { @@ -61,6 +66,10 @@ public class ResourceConfig extends HelixProperty { } private static final Logger _logger = LoggerFactory.getLogger(ResourceConfig.class.getName()); + private static final ObjectMapper _objectMapper = new ObjectMapper(); + + public static final String DEFAULT_PARTITION_KEY = "DEFAULT"; + /** * Instantiate for a specific instance * @@ -92,10 +101,24 @@ public class ResourceConfig extends HelixProperty { String stateModelDefRef, String stateModelFactoryName, String numReplica, int minActiveReplica, int maxPartitionsPerInstance, String instanceGroupTag, Boolean helixEnabled, String resourceGroupName, String resourceType, - Boolean groupRoutingEnabled, Boolean externalViewDisabled, - RebalanceConfig rebalanceConfig, StateTransitionTimeoutConfig stateTransitionTimeoutConfig, + Boolean groupRoutingEnabled, Boolean externalViewDisabled, RebalanceConfig rebalanceConfig, + StateTransitionTimeoutConfig stateTransitionTimeoutConfig, Map<String, List<String>> listFields, Map<String, Map<String, String>> mapFields, Boolean p2pMessageEnabled) { + this(resourceId, monitorDisabled, numPartitions, stateModelDefRef, stateModelFactoryName, + numReplica, minActiveReplica, maxPartitionsPerInstance, instanceGroupTag, helixEnabled, + resourceGroupName, resourceType, groupRoutingEnabled, externalViewDisabled, rebalanceConfig, + stateTransitionTimeoutConfig, listFields, mapFields, p2pMessageEnabled, null); + } + + private ResourceConfig(String resourceId, Boolean monitorDisabled, int numPartitions, + String stateModelDefRef, String stateModelFactoryName, String numReplica, + int minActiveReplica, int maxPartitionsPerInstance, String instanceGroupTag, + Boolean helixEnabled, String resourceGroupName, String resourceType, + Boolean groupRoutingEnabled, Boolean externalViewDisabled, + RebalanceConfig rebalanceConfig, StateTransitionTimeoutConfig stateTransitionTimeoutConfig, + Map<String, List<String>> listFields, Map<String, Map<String, String>> mapFields, + Boolean p2pMessageEnabled, Map<String, Map<String, Integer>> partitionCapacityMap) { super(resourceId); if (monitorDisabled != null) { @@ -172,6 +195,15 @@ public class ResourceConfig extends HelixProperty { if (mapFields != null) { _record.setMapFields(mapFields); } + + if (partitionCapacityMap != null) { + try { + setPartitionCapacityMap(partitionCapacityMap); + } catch (IOException e) { + throw new IllegalArgumentException( + "Failed to set partition capacity. Invalid capacity configuration."); + } + } } @@ -350,6 +382,64 @@ public class ResourceConfig extends HelixProperty { } /** + * Get the partition capacity information from a JSON among the map fields. + * <PartitionName or DEFAULT_PARTITION_KEY, <Capacity Key, Capacity Number>> + * + * @return data map if it exists, or empty map + * @throws IOException - when JSON conversion fails + */ + public Map<String, Map<String, Integer>> getPartitionCapacityMap() throws IOException { + Map<String, String> partitionCapacityData = + _record.getMapField(ResourceConfigProperty.PARTITION_CAPACITY_MAP.name()); + Map<String, Map<String, Integer>> partitionCapacityMap = new HashMap<>(); + if (partitionCapacityData != null) { + for (String partition : partitionCapacityData.keySet()) { + Map<String, Integer> capacities = _objectMapper + .readValue(partitionCapacityData.get(partition), + new TypeReference<Map<String, Integer>>() { + }); + partitionCapacityMap.put(partition, capacities); + } + } + return partitionCapacityMap; + } + + /** + * Set the partition capacity information with a map <PartitionName or DEFAULT_PARTITION_KEY, <Capacity Key, Capacity Number>> + * + * @param partitionCapacityMap - map of partition capacity data + * @throws IllegalArgumentException - when any of the data value is a negative number or map is empty + * @throws IOException - when JSON parsing fails + */ + public void setPartitionCapacityMap(Map<String, Map<String, Integer>> partitionCapacityMap) + throws IllegalArgumentException, IOException { + if (partitionCapacityMap == null || partitionCapacityMap.isEmpty()) { + throw new IllegalArgumentException("Capacity Map is empty"); + } + if (!partitionCapacityMap.containsKey(DEFAULT_PARTITION_KEY)) { + throw new IllegalArgumentException(String + .format("The default partition capacity with the default key %s is required.", + DEFAULT_PARTITION_KEY)); + } + + Map<String, String> newCapacityRecord = new HashMap<>(); + for (String partition : partitionCapacityMap.keySet()) { + Map<String, Integer> capacities = partitionCapacityMap.get(partition); + // Verify the input is valid + if (capacities.isEmpty()) { + throw new IllegalArgumentException("Capacity Data is empty"); + } + if (capacities.entrySet().stream().anyMatch(entry -> entry.getValue() < 0)) { + throw new IllegalArgumentException( + String.format("Capacity Data contains a negative value:%s", capacities.toString())); + } + newCapacityRecord.put(partition, _objectMapper.writeValueAsString(capacities)); + } + + _record.setMapField(ResourceConfigProperty.PARTITION_CAPACITY_MAP.name(), newCapacityRecord); + } + + /** * Put a set of simple configs. * * @param configsMap @@ -476,6 +566,7 @@ public class ResourceConfig extends HelixProperty { private StateTransitionTimeoutConfig _stateTransitionTimeoutConfig; private Map<String, List<String>> _preferenceLists; private Map<String, Map<String, String>> _mapFields; + private Map<String, Map<String, Integer>> _partitionCapacityMap; public Builder(String resourceId) { _resourceId = resourceId; @@ -664,6 +755,23 @@ public class ResourceConfig extends HelixProperty { return _preferenceLists; } + public Builder setPartitionCapacity(Map<String, Integer> defaultCapacity) { + setPartitionCapacity(DEFAULT_PARTITION_KEY, defaultCapacity); + return this; + } + + public Builder setPartitionCapacity(String partition, Map<String, Integer> capacity) { + if (_partitionCapacityMap == null) { + _partitionCapacityMap = new HashMap<>(); + } + _partitionCapacityMap.put(partition, capacity); + return this; + } + + public Map<String, Integer> getPartitionCapacity(String partition) { + return _partitionCapacityMap.get(partition); + } + public Builder setMapField(String key, Map<String, String> fields) { if (_mapFields == null) { _mapFields = new TreeMap<>(); @@ -708,6 +816,19 @@ public class ResourceConfig extends HelixProperty { } } } + + if (_partitionCapacityMap != null) { + if (_partitionCapacityMap.keySet().stream() + .noneMatch(partition -> partition.equals(DEFAULT_PARTITION_KEY))) { + throw new IllegalArgumentException( + "Partition capacity is configured without the DEFAULT capacity!"); + } + if (_partitionCapacityMap.values().stream() + .anyMatch(capacity -> capacity.values().stream().anyMatch(value -> value < 0))) { + throw new IllegalArgumentException( + "Partition capacity is configured with negative capacity value!"); + } + } } public ResourceConfig build() { @@ -718,7 +839,7 @@ public class ResourceConfig extends HelixProperty { _stateModelFactoryName, _numReplica, _minActiveReplica, _maxPartitionsPerInstance, _instanceGroupTag, _helixEnabled, _resourceGroupName, _resourceType, _groupRoutingEnabled, _externalViewDisabled, _rebalanceConfig, _stateTransitionTimeoutConfig, _preferenceLists, - _mapFields, _p2pMessageEnabled); + _mapFields, _p2pMessageEnabled, _partitionCapacityMap); } } } diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java new file mode 100644 index 0000000..209b196 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java @@ -0,0 +1,130 @@ +package org.apache.helix.model; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.ImmutableList; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.helix.model.ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS; +import static org.apache.helix.model.ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT; + +public class TestClusterConfig { + + @Test + public void testGetCapacityKeys() { + List<String> keys = ImmutableList.of("CPU", "MEMORY", "Random"); + + ClusterConfig testConfig = new ClusterConfig("testId"); + testConfig.getRecord() + .setListField(ClusterConfig.ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(), keys); + + Assert.assertEquals(testConfig.getInstanceCapacityKeys(), keys); + } + + @Test + public void testGetCapacityKeysEmpty() { + ClusterConfig testConfig = new ClusterConfig("testId"); + Assert.assertEquals(testConfig.getInstanceCapacityKeys(), Collections.emptyList()); + } + + @Test + public void testSetCapacityKeys() { + List<String> keys = ImmutableList.of("CPU", "MEMORY", "Random"); + + ClusterConfig testConfig = new ClusterConfig("testId"); + testConfig.setInstanceCapacityKeys(keys); + + Assert.assertEquals(keys, testConfig.getRecord() + .getListField(ClusterConfig.ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name())); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testSetCapacityKeysEmptyList() { + ClusterConfig testConfig = new ClusterConfig("testId"); + testConfig.setInstanceCapacityKeys(Collections.emptyList()); + } + + @Test + public void testGetRebalancePreference() { + Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>(); + preference.put(EVENNESS, 5); + preference.put(LESS_MOVEMENT, 3); + + Map<String, String> mapFieldData = new HashMap<>(); + for (ClusterConfig.GlobalRebalancePreferenceKey key : preference.keySet()) { + mapFieldData.put(key.name(), String.valueOf(preference.get(key))); + } + + ClusterConfig testConfig = new ClusterConfig("testId"); + testConfig.getRecord() + .setMapField(ClusterConfig.ClusterConfigProperty.REBALANCE_PREFERENCE.name(), mapFieldData); + + Assert.assertEquals(testConfig.getGlobalRebalancePreference(), preference); + } + + @Test + public void testGetRebalancePreferenceDefault() { + ClusterConfig testConfig = new ClusterConfig("testId"); + Assert.assertEquals(testConfig.getGlobalRebalancePreference(), + ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE); + + Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>(); + preference.put(EVENNESS, 5); + testConfig.setGlobalRebalancePreference(preference); + + Assert.assertEquals(testConfig.getGlobalRebalancePreference(), + ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE); + } + + @Test + public void testSetRebalancePreference() { + Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>(); + preference.put(EVENNESS, 5); + preference.put(LESS_MOVEMENT, 3); + + Map<String, String> mapFieldData = new HashMap<>(); + for (ClusterConfig.GlobalRebalancePreferenceKey key : preference.keySet()) { + mapFieldData.put(key.name(), String.valueOf(preference.get(key))); + } + + ClusterConfig testConfig = new ClusterConfig("testId"); + testConfig.setGlobalRebalancePreference(preference); + + Assert.assertEquals(testConfig.getRecord() + .getMapField(ClusterConfig.ClusterConfigProperty.REBALANCE_PREFERENCE.name()), + mapFieldData); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testSetRebalancePreferenceInvalidNumber() { + Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>(); + preference.put(EVENNESS, -1); + preference.put(LESS_MOVEMENT, 3); + + ClusterConfig testConfig = new ClusterConfig("testId"); + testConfig.setGlobalRebalancePreference(preference); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java index 38b1c92..f0da05f 100644 --- a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java +++ b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java @@ -19,12 +19,14 @@ package org.apache.helix.model; * under the License. */ -import java.util.Map; - +import com.google.common.collect.ImmutableMap; import org.apache.helix.ZNRecord; import org.testng.Assert; import org.testng.annotations.Test; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; /** * Created with IntelliJ IDEA. @@ -58,4 +60,64 @@ public class TestInstanceConfig { Map<String, String> parsedDomain = instanceConfig.getDomainAsMap(); Assert.assertTrue(parsedDomain.isEmpty()); } + + @Test + public void testGetInstanceCapacityMap() { + Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, + "item2", 2, + "item3", 3); + + Map<String, String> capacityDataMapString = ImmutableMap.of("item1", "1", + "item2", "2", + "item3", "3"); + + ZNRecord rec = new ZNRecord("testId"); + rec.setMapField(InstanceConfig.InstanceConfigProperty.INSTANCE_CAPACITY_MAP.name(), capacityDataMapString); + InstanceConfig testConfig = new InstanceConfig(rec); + + Assert.assertTrue(testConfig.getInstanceCapacityMap().equals(capacityDataMap)); + } + + @Test + public void testGetInstanceCapacityMapEmpty() { + InstanceConfig testConfig = new InstanceConfig("testId"); + + Assert.assertTrue(testConfig.getInstanceCapacityMap().equals(Collections.emptyMap())); + } + + @Test + public void testSetInstanceCapacityMap() { + Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, + "item2", 2, + "item3", 3); + + Map<String, String> capacityDataMapString = ImmutableMap.of("item1", "1", + "item2", "2", + "item3", "3"); + + InstanceConfig testConfig = new InstanceConfig("testConfig"); + testConfig.setInstanceCapacityMap(capacityDataMap); + + Assert.assertEquals(testConfig.getRecord().getMapField(InstanceConfig.InstanceConfigProperty. + INSTANCE_CAPACITY_MAP.name()), capacityDataMapString); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Capacity Data is empty") + public void testSetInstanceCapacityMapEmpty() { + Map<String, Integer> capacityDataMap = new HashMap<>(); + + InstanceConfig testConfig = new InstanceConfig("testConfig"); + testConfig.setInstanceCapacityMap(capacityDataMap); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "Capacity Data contains a negative value: item3 = -3") + public void testSetInstanceCapacityMapInvalid() { + Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, + "item2", 2, + "item3", -3); + + InstanceConfig testConfig = new InstanceConfig("testConfig"); + testConfig.setInstanceCapacityMap(capacityDataMap); + } } diff --git a/helix-core/src/test/java/org/apache/helix/model/TestResourceConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestResourceConfig.java new file mode 100644 index 0000000..8099486 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/model/TestResourceConfig.java @@ -0,0 +1,186 @@ +package org.apache.helix.model; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.ImmutableMap; +import org.apache.helix.ZNRecord; +import org.codehaus.jackson.map.ObjectMapper; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class TestResourceConfig { + private static final ObjectMapper _objectMapper = new ObjectMapper(); + + @Test + public void testGetPartitionCapacityMap() throws IOException { + Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, + "item2", 2, + "item3", 3); + + ZNRecord rec = new ZNRecord("testId"); + rec.setMapField(ResourceConfig.ResourceConfigProperty.PARTITION_CAPACITY_MAP.name(), Collections + .singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, + _objectMapper.writeValueAsString(capacityDataMap))); + ResourceConfig testConfig = new ResourceConfig(rec); + + Assert.assertTrue(testConfig.getPartitionCapacityMap().get(ResourceConfig.DEFAULT_PARTITION_KEY) + .equals(capacityDataMap)); + } + + @Test + public void testGetPartitionCapacityMapEmpty() throws IOException { + ResourceConfig testConfig = new ResourceConfig("testId"); + + Assert.assertTrue(testConfig.getPartitionCapacityMap().equals(Collections.emptyMap())); + } + + @Test(expectedExceptions = IOException.class) + public void testGetPartitionCapacityMapInvalidJson() throws IOException { + ZNRecord rec = new ZNRecord("testId"); + rec.setMapField(ResourceConfig.ResourceConfigProperty.PARTITION_CAPACITY_MAP.name(), + Collections.singletonMap("test", "gibberish")); + ResourceConfig testConfig = new ResourceConfig(rec); + + testConfig.getPartitionCapacityMap(); + } + + @Test(dependsOnMethods = "testGetPartitionCapacityMap", expectedExceptions = IOException.class) + public void testGetPartitionCapacityMapInvalidJsonType() throws IOException { + Map<String, String> capacityDataMap = ImmutableMap.of("item1", "1", + "item2", "2", + "item3", "three"); + + ZNRecord rec = new ZNRecord("testId"); + rec.setMapField(ResourceConfig.ResourceConfigProperty.PARTITION_CAPACITY_MAP.name(), Collections + .singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, + _objectMapper.writeValueAsString(capacityDataMap))); + ResourceConfig testConfig = new ResourceConfig(rec); + + testConfig.getPartitionCapacityMap(); + } + + @Test + public void testSetPartitionCapacityMap() throws IOException { + Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, + "item2", 2, + "item3", 3); + + ResourceConfig testConfig = new ResourceConfig("testConfig"); + testConfig.setPartitionCapacityMap( + Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMap)); + + Assert.assertEquals(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty. + PARTITION_CAPACITY_MAP.name()).get(ResourceConfig.DEFAULT_PARTITION_KEY), + _objectMapper.writeValueAsString(capacityDataMap)); + } + + @Test + public void testSetMultiplePartitionCapacityMap() throws IOException { + Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, + "item2", 2, + "item3", 3); + + Map<String, Map<String, Integer>> totalCapacityMap = + ImmutableMap.of(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMap, + "partition2", capacityDataMap, + "partition3", capacityDataMap); + + ResourceConfig testConfig = new ResourceConfig("testConfig"); + testConfig.setPartitionCapacityMap(totalCapacityMap); + + Assert.assertNull(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty. + PARTITION_CAPACITY_MAP.name()).get("partition1")); + Assert.assertEquals(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty. + PARTITION_CAPACITY_MAP.name()).get(ResourceConfig.DEFAULT_PARTITION_KEY), + _objectMapper.writeValueAsString(capacityDataMap)); + Assert.assertEquals(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty. + PARTITION_CAPACITY_MAP.name()).get("partition2"), + _objectMapper.writeValueAsString(capacityDataMap)); + Assert.assertEquals(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty. + PARTITION_CAPACITY_MAP.name()).get("partition3"), + _objectMapper.writeValueAsString(capacityDataMap)); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Capacity Data is empty") + public void testSetPartitionCapacityMapEmpty() throws IOException { + Map<String, Integer> capacityDataMap = new HashMap<>(); + + ResourceConfig testConfig = new ResourceConfig("testConfig"); + testConfig.setPartitionCapacityMap( + Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMap)); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "The default partition capacity with the default key DEFAULT is required.") + public void testSetPartitionCapacityMapWithoutDefault() throws IOException { + Map<String, Integer> capacityDataMap = new HashMap<>(); + + ResourceConfig testConfig = new ResourceConfig("testConfig"); + testConfig.setPartitionCapacityMap( + Collections.singletonMap("Random", capacityDataMap)); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Capacity Data contains a negative value:.+") + public void testSetPartitionCapacityMapInvalid() throws IOException { + Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, + "item2", 2, + "item3", -3); + + ResourceConfig testConfig = new ResourceConfig("testConfig"); + testConfig.setPartitionCapacityMap( + Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMap)); + } + + @Test + public void testWithResourceBuilder() throws IOException { + Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, + "item2", 2, + "item3", 3); + + ResourceConfig.Builder builder = new ResourceConfig.Builder("testConfig"); + builder.setPartitionCapacity(capacityDataMap); + builder.setPartitionCapacity("partition1", capacityDataMap); + + Assert.assertEquals( + builder.build().getPartitionCapacityMap().get(ResourceConfig.DEFAULT_PARTITION_KEY), + capacityDataMap); + Assert.assertEquals( + builder.build().getPartitionCapacityMap().get("partition1"), + capacityDataMap); + Assert.assertNull( + builder.build().getPartitionCapacityMap().get("Random")); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "The default partition capacity with the default key DEFAULT is required.") + public void testWithResourceBuilderInvalidInput() { + Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, + "item2", 2, + "item3", 3); + + ResourceConfig.Builder builder = new ResourceConfig.Builder("testConfig"); + builder.setPartitionCapacity("Random", capacityDataMap); + + builder.build(); + } +}
