This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch topStatePOC in repository https://gitbox.apache.org/repos/asf/helix.git
commit 4916b89fe1c1d58428829f27982e86f34e902b1f Author: Jiajun Wang <[email protected]> AuthorDate: Mon Nov 30 16:08:25 2020 -0800 Fix the topstate capacity usage recording and add the test method to evaluate. --- .../ResourceTopStateUsageConstraint.java | 2 +- .../rebalancer/waged/model/AssignableNode.java | 37 +++++ .../WagedRebalancer/TestWagedRebalance.java | 180 +++++++++++++++++++-- 3 files changed, 209 insertions(+), 10 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java index 8ba9cdc..1209c04 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java @@ -40,7 +40,7 @@ class ResourceTopStateUsageConstraint extends UsageSoftConstraint { return 0; } float estimatedTopStateMaxUtilization = clusterContext.getEstimatedTopStateMaxUtilization(); - float projectedHighestUtilization = node.getProjectedHighestUtilization(replica.getCapacity()); + float projectedHighestUtilization = node.getProjectedHighestTopStateUtilization(replica.getCapacity()); return computeUtilizationScore(estimatedTopStateMaxUtilization, projectedHighestUtilization); } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java index d3d014d..f8307fe 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java @@ -62,6 +62,7 @@ public class AssignableNode implements Comparable<AssignableNode> { private Map<String, Map<String, AssignableReplica>> _currentAssignedReplicaMap; // A map of <capacity key, capacity value> that tracks the current available node capacity private Map<String, Integer> _remainingCapacity; + private Map<String, Integer> _remainingTopStateCapacity; /** * Update the node with a ClusterDataCache. This resets the current assignment and recalculates @@ -81,6 +82,7 @@ public class AssignableNode implements Comparable<AssignableNode> { // make a copy of max capacity _maxAllowedCapacity = ImmutableMap.copyOf(instanceCapacity); _remainingCapacity = new HashMap<>(instanceCapacity); + _remainingTopStateCapacity = new HashMap<>(instanceCapacity); _maxPartition = clusterConfig.getMaxPartitionsPerInstance(); _currentAssignedReplicaMap = new HashMap<>(); } @@ -93,6 +95,7 @@ public class AssignableNode implements Comparable<AssignableNode> { */ void assignInitBatch(Collection<AssignableReplica> replicas) { Map<String, Integer> totalPartitionCapacity = new HashMap<>(); + Map<String, Integer> totalTopStatePartitionCapacity = new HashMap<>(); for (AssignableReplica replica : replicas) { // TODO: the exception could occur in the middle of for loop and the previous added records cannot be reverted addToAssignmentRecord(replica); @@ -101,12 +104,18 @@ public class AssignableNode implements Comparable<AssignableNode> { totalPartitionCapacity.compute(capacity.getKey(), (key, totalValue) -> (totalValue == null) ? capacity.getValue() : totalValue + capacity.getValue()); + if (replica.isReplicaTopState()) { + totalTopStatePartitionCapacity.compute(capacity.getKey(), + (key, totalValue) -> (totalValue == null) ? capacity.getValue() + : totalValue + capacity.getValue()); + } } } // Update the global state after all single replications' calculation is done. for (String capacityKey : totalPartitionCapacity.keySet()) { updateRemainingCapacity(capacityKey, totalPartitionCapacity.get(capacityKey)); + updateRemainingTopStateCapacity(capacityKey, totalTopStatePartitionCapacity.get(capacityKey)); } } @@ -118,6 +127,10 @@ public class AssignableNode implements Comparable<AssignableNode> { addToAssignmentRecord(assignableReplica); assignableReplica.getCapacity().entrySet().stream() .forEach(capacity -> updateRemainingCapacity(capacity.getKey(), capacity.getValue())); + if (assignableReplica.isReplicaTopState()) { + assignableReplica.getCapacity().entrySet().stream() + .forEach(capacity -> updateRemainingTopStateCapacity(capacity.getKey(), capacity.getValue())); + } } /** @@ -148,6 +161,10 @@ public class AssignableNode implements Comparable<AssignableNode> { AssignableReplica removedReplica = partitionMap.remove(partitionName); removedReplica.getCapacity().entrySet().stream() .forEach(entry -> updateRemainingCapacity(entry.getKey(), -1 * entry.getValue())); + if (removedReplica.isReplicaTopState()) { + removedReplica.getCapacity().entrySet().stream() + .forEach(entry -> updateRemainingTopStateCapacity(entry.getKey(), -1 * entry.getValue())); + } } /** @@ -239,6 +256,17 @@ public class AssignableNode implements Comparable<AssignableNode> { return highestCapacityUtilization; } + public float getProjectedHighestTopStateUtilization(Map<String, Integer> newUsage) { + float highestCapacityUtilization = 0; + for (String capacityKey : _maxAllowedCapacity.keySet()) { + float capacityValue = _maxAllowedCapacity.get(capacityKey); + float utilization = (capacityValue - _remainingTopStateCapacity.get(capacityKey) + newUsage + .getOrDefault(capacityKey, 0)) / capacityValue; + highestCapacityUtilization = Math.max(highestCapacityUtilization, utilization); + } + return highestCapacityUtilization; + } + public String getInstanceName() { return _instanceName; } @@ -320,6 +348,15 @@ public class AssignableNode implements Comparable<AssignableNode> { _remainingCapacity.put(capacityKey, _remainingCapacity.get(capacityKey) - usage); } + private void updateRemainingTopStateCapacity(String capacityKey, int usage) { + if (!_remainingTopStateCapacity.containsKey(capacityKey)) { + //if the capacityKey belongs to replicas does not exist in the instance's capacity, + // it will be treated as if it has unlimited capacity of that capacityKey + return; + } + _remainingTopStateCapacity.put(capacityKey, _remainingTopStateCapacity.get(capacityKey) - usage); + } + /** * Get and validate the instance capacity from instance config. * @throws HelixException if any required capacity key is not configured in the instance config. diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java index a7049f9..0e3e354 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java @@ -19,6 +19,7 @@ package org.apache.helix.integration.rebalancer.WagedRebalancer; * under the License. */ +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -27,8 +28,11 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import com.google.common.collect.ImmutableMap; +import org.apache.commons.math.stat.descriptive.moment.StandardDeviation; +import org.apache.helix.BaseDataAccessor; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixDataAccessor; import org.apache.helix.TestHelper; @@ -37,15 +41,19 @@ import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore; +import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZNRecordSerializer; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.manager.zk.ZkBucketDataAccessor; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.Partition; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; @@ -53,12 +61,17 @@ import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; import org.apache.helix.util.HelixUtil; +import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.apache.helix.model.ResourceConfig.DEFAULT_PARTITION_KEY; + + public class TestWagedRebalance extends ZkTestBase { protected final int NUM_NODE = 6; protected static final int START_PORT = 12918; @@ -76,11 +89,8 @@ public class TestWagedRebalance extends ZkTestBase { private Set<String> _allDBs = new HashSet<>(); private int _replica = 3; - private static String[] _testModels = { - BuiltInStateModelDefinitions.OnlineOffline.name(), - BuiltInStateModelDefinitions.MasterSlave.name(), - BuiltInStateModelDefinitions.LeaderStandby.name() - }; + private static String[] _testModels = + {BuiltInStateModelDefinitions.OnlineOffline.name(), BuiltInStateModelDefinitions.MasterSlave.name(), BuiltInStateModelDefinitions.LeaderStandby.name()}; @BeforeClass public void beforeClass() throws Exception { @@ -677,8 +687,7 @@ public class TestWagedRebalance extends ZkTestBase { HelixClusterVerifier _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) .setDeactivatedNodeAwareness(true).setResources(_allDBs) - .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME) - .build(); + .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build(); try { Assert.assertTrue(_clusterVerifier.verify(5000)); } finally { @@ -722,8 +731,7 @@ public class TestWagedRebalance extends ZkTestBase { ZkHelixClusterVerifier _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) .setDeactivatedNodeAwareness(true).setResources(_allDBs) - .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME) - .build(); + .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build(); try { Assert.assertTrue(_clusterVerifier.verifyByPolling()); } finally { @@ -749,4 +757,158 @@ public class TestWagedRebalance extends ZkTestBase { } deleteCluster(CLUSTER_NAME); } + + public static void main(String[] args) + throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException { + String clusterName = "test"; + String zkAddr = "localhost:2181"; + + HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig(); + clientConfig.setZkSerializer(new ZNRecordSerializer()); + HelixZkClient zkClient = DedicatedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr), clientConfig); + BaseDataAccessor baseDataAccessor = new ZkBaseDataAccessor<>(zkClient); + + // Read cluster parameters from ZK + HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor); + ClusterConfig clusterConfig = + dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig()); + List<InstanceConfig> instanceConfigs = + dataAccessor.getChildValues(dataAccessor.keyBuilder().instanceConfigs(), true); + List<String> liveInstances = + instanceConfigs.stream().map(InstanceConfig::getInstanceName).collect(Collectors.toList()); + List<IdealState> idealStates = + dataAccessor.getChildValues(dataAccessor.keyBuilder().idealStates(), true); + List<ResourceConfig> resourceConfigs = + dataAccessor.getChildValues(dataAccessor.keyBuilder().resourceConfigs(), true); + + for (InstanceConfig instanceConfig : instanceConfigs) { + instanceConfig.setInstanceEnabled(true); + instanceConfig.getRecord().getMapFields() + .getOrDefault("HELIX_DISABLED_PARTITION", Collections.emptyMap()).clear(); + instanceConfig.getRecord().getListFields() + .getOrDefault("HELIX_DISABLED_PARTITION", Collections.emptyList()).clear(); + } + + clusterConfig.setInstanceCapacityKeys(Collections.singletonList("CU")); + + Map<String, Integer> defaultInstanceCapacityMap = clusterConfig.getDefaultInstanceCapacityMap(); + for (String key : clusterConfig.getInstanceCapacityKeys()) { + defaultInstanceCapacityMap + .put(key, clusterConfig.getDefaultInstanceCapacityMap().getOrDefault(key, 0) * 2); + } + clusterConfig.setDefaultInstanceCapacityMap(defaultInstanceCapacityMap); + + List<IdealState> filteredIdealStates = new ArrayList<>(); + for (IdealState idealState : idealStates) { + if (idealState.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO) && idealState + .getStateModelDefRef().equals("MasterSlave")) { + filteredIdealStates.add(idealState); + } + } + + Map<String, ResourceAssignment> baselineResult = new HashMap<>(); + for (IdealState idealState : filteredIdealStates) { + ResourceAssignment resourceAssignment = new ResourceAssignment(idealState.getResourceName()); + Map<String, Map<String, String>> partitionMap = HelixUtil + .getIdealAssignmentForFullAuto(clusterConfig, instanceConfigs, liveInstances, idealState, + new ArrayList<>(idealState.getPartitionSet()), idealState.getRebalanceStrategy()); + for (String partition : partitionMap.keySet()) { + resourceAssignment.addReplicaMap(new Partition(partition), partitionMap.get(partition)); + } + baselineResult.put(idealState.getResourceName(), resourceAssignment); + } + outputAssignments(dataAccessor, clusterConfig, filteredIdealStates, baselineResult); + + for (IdealState idealState : filteredIdealStates) { + idealState.setRebalancerClassName(WagedRebalancer.class.getName()); + } + + Map<String, ResourceAssignment> utilResult = HelixUtil + .getTargetAssignmentForWagedFullAuto(zkAddr, clusterConfig, instanceConfigs, liveInstances, + filteredIdealStates, resourceConfigs); + + outputAssignments(dataAccessor, clusterConfig, filteredIdealStates, utilResult); + } + + private static void outputAssignments(HelixDataAccessor dataAccessor, ClusterConfig clusterConfig, + List<IdealState> filteredIdealStates, Map<String, ResourceAssignment> assignmentMap) + throws IOException { + Map<String, Integer> defaultWeightMap = clusterConfig.getDefaultPartitionWeightMap(); + + Map<String, Integer> partitionCountMap = new HashMap<>(); + Map<String, Integer> topStateCountMap = new HashMap<>(); + Map<String, Map<String, Integer>> partitionWeightMap = new HashMap<>(); + Map<String, Map<String, Integer>> topStateWeightMap = new HashMap<>(); + + for (IdealState idealState : filteredIdealStates) { + StateModelDefinition stateModelDefinition = + BuiltInStateModelDefinitions.valueOf(idealState.getStateModelDefRef()) + .getStateModelDefinition(); + + ResourceConfig resourceConfig = dataAccessor + .getProperty(dataAccessor.keyBuilder().resourceConfig(idealState.getResourceName())); + + Map<String, Map<String, Integer>> fullWeightMap = resourceConfig.getPartitionCapacityMap(); + + for (String partition : idealState.getPartitionSet()) { + Map<String, String> instanceStateMap = + assignmentMap.get(idealState.getResourceName()).getRecord().getMapField(partition); + Map<String, Integer> weightMap = new HashMap<>(defaultWeightMap); + weightMap.putAll( + fullWeightMap.getOrDefault(partition, fullWeightMap.get(DEFAULT_PARTITION_KEY))); + + for (String instanceName : instanceStateMap.keySet()) { + String state = instanceStateMap.get(instanceName); + if (state.equals(stateModelDefinition.getTopState())) { + topStateCountMap.put(instanceName, 1 + topStateCountMap.getOrDefault(instanceName, 0)); + + Map<String, Integer> topStateWeights = + topStateWeightMap.computeIfAbsent(instanceName, map -> new HashMap<>()); + for (String key : weightMap.keySet()) { + topStateWeights.put(key, topStateWeights.getOrDefault(key, 0) + weightMap.get(key)); + } + } + + partitionCountMap.put(instanceName, 1 + partitionCountMap.getOrDefault(instanceName, 0)); + + Map<String, Integer> weights = + partitionWeightMap.computeIfAbsent(instanceName, map -> new HashMap<>()); + for (String key : weightMap.keySet()) { + weights.put(key, weights.getOrDefault(key, 0) + weightMap.get(key)); + } + } + } + } + //System.out.println("Partition weights: " + partitionWeightMap); + //System.out.println("Topstate partition weights: " + topStateWeightMap); + + List<Integer> regWeightList = + partitionWeightMap.values().stream().map(map -> map.get("CU")).collect(Collectors.toList()); + + List<Integer> weightList = + topStateWeightMap.values().stream().map(map -> map.get("CU")).collect(Collectors.toList()); + + int max = weightList.stream().max(Integer::compareTo).get(); + int min = weightList.stream().min(Integer::compareTo).get(); + StandardDeviation standardDeviation = new StandardDeviation(); + + double[] nums = new double[weightList.size()]; + for (int i = 0; i < weightList.size(); i++) { + nums[i] = weightList.get(i); + } + double std = standardDeviation.evaluate(nums); + + System.out.println("Topstate Weights Max " + max + " Min " + min + " STD " + std); + + int regmax = regWeightList.stream().max(Integer::compareTo).get(); + int regmin = regWeightList.stream().min(Integer::compareTo).get(); + double[] regnums = new double[regWeightList.size()]; + for (int i = 0; i < regWeightList.size(); i++) { + regnums[i] = regWeightList.get(i); + } + double regstd = standardDeviation.evaluate(regnums); + + System.out.println("Regular Weights Max " + regmax + " Min " + regmin + " STD " + regstd); + } }
