This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch topStatePOC in repository https://gitbox.apache.org/repos/asf/helix.git
commit b49d7ae6eadd403a1dc8276917ac0e3e8f8eca92 Author: Neal Sun <[email protected]> AuthorDate: Mon Nov 23 18:27:26 2020 -0800 POC --- .../ConstraintBasedAlgorithmFactory.java | 2 +- .../ResourceTopStateUsageConstraint.java | 46 ++++++++++++++++++++++ .../rebalancer/waged/model/ClusterContext.java | 17 ++++++++ 3 files changed, 64 insertions(+), 1 deletion(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java index 934bfa7..237d16c 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java @@ -41,7 +41,7 @@ public class ConstraintBasedAlgorithmFactory { put(PartitionMovementConstraint.class.getSimpleName(), 2f); put(InstancePartitionsCountConstraint.class.getSimpleName(), 1f); put(ResourcePartitionAntiAffinityConstraint.class.getSimpleName(), 1f); - put(ResourceTopStateAntiAffinityConstraint.class.getSimpleName(), 3f); + put(ResourceTopStateUsageConstraint.class.getSimpleName(), 3f); put(MaxCapacityUsageInstanceConstraint.class.getSimpleName(), 5f); } }; diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java new file mode 100644 index 0000000..8ba9cdc --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java @@ -0,0 +1,46 @@ +package org.apache.helix.controller.rebalancer.waged.constraints; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; +import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; +import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; + + +/** + * Evaluate the proposed assignment according to the top state resource usage on the instance. + * The higher the maximum usage value for the capacity key, the lower the score will be, implying + * that it is that much less desirable to assign anything on the given node. + * It is a greedy approach since it evaluates only on the most used capacity key. + */ +class ResourceTopStateUsageConstraint extends UsageSoftConstraint { + @Override + protected double getAssignmentScore(AssignableNode node, AssignableReplica replica, + ClusterContext clusterContext) { + if (!replica.isReplicaTopState()) { + // For non top state replica, this constraint is not applicable. + // So return zero on any assignable node candidate. + return 0; + } + float estimatedTopStateMaxUtilization = clusterContext.getEstimatedTopStateMaxUtilization(); + float projectedHighestUtilization = node.getProjectedHighestUtilization(replica.getCapacity()); + return computeUtilizationScore(estimatedTopStateMaxUtilization, projectedHighestUtilization); + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java index 46392c9..2f6650b 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java @@ -43,6 +43,8 @@ public class ClusterContext { private final Map<String, Integer> _estimatedMaxPartitionByResource = new HashMap<>(); // This estimation helps to ensure global resource usage evenness. private final float _estimatedMaxUtilization; + // This estimation helps to ensure global resource top state usage evenness. + private final float _estimatedTopStateMaxUtilization; // map{zoneName : map{resourceName : set(partitionNames)}} private Map<String, Map<String, Set<String>>> _assignmentForFaultZoneMap = new HashMap<>(); @@ -63,6 +65,7 @@ public class ClusterContext { int totalReplicas = 0; int totalTopStateReplicas = 0; Map<String, Integer> totalUsage = new HashMap<>(); + Map<String, Integer> totalTopStateUsage = new HashMap<>(); Map<String, Integer> totalCapacity = new HashMap<>(); for (Map.Entry<String, List<AssignableReplica>> entry : replicaSet.stream() @@ -77,6 +80,9 @@ public class ClusterContext { for (AssignableReplica replica : entry.getValue()) { if (replica.isReplicaTopState()) { totalTopStateReplicas += 1; + replica.getCapacity().entrySet().stream().forEach(capacityEntry -> totalTopStateUsage + .compute(capacityEntry.getKey(), (k, v) -> (v == null) ? capacityEntry.getValue() + : (v + capacityEntry.getValue()))); } replica.getCapacity().entrySet().stream().forEach(capacityEntry -> totalUsage .compute(capacityEntry.getKey(), @@ -90,15 +96,22 @@ public class ClusterContext { if (totalCapacity.isEmpty()) { // If no capacity is configured, we treat the cluster as fully utilized. _estimatedMaxUtilization = 1f; + _estimatedTopStateMaxUtilization = 1f; } else { float estimatedMaxUsage = 0; + float estimatedTopStateMaxUsage = 0; for (String capacityKey : totalCapacity.keySet()) { int maxCapacity = totalCapacity.get(capacityKey); int usage = totalUsage.getOrDefault(capacityKey, 0); float utilization = (maxCapacity == 0) ? 1 : (float) usage / maxCapacity; estimatedMaxUsage = Math.max(estimatedMaxUsage, utilization); + + int topStateUsage = totalTopStateUsage.getOrDefault(capacityKey, 0); + float topStateUtilization = (maxCapacity == 0) ? 1 : (float) topStateUsage / maxCapacity; + estimatedTopStateMaxUsage = Math.max(estimatedTopStateMaxUsage, topStateUtilization); } _estimatedMaxUtilization = estimatedMaxUsage; + _estimatedTopStateMaxUtilization = estimatedTopStateMaxUsage; } _estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, instanceCount); _estimatedMaxTopStateCount = estimateAvgReplicaCount(totalTopStateReplicas, instanceCount); @@ -135,6 +148,10 @@ public class ClusterContext { return _estimatedMaxUtilization; } + public float getEstimatedTopStateMaxUtilization() { + return _estimatedTopStateMaxUtilization; + } + public Set<String> getPartitionsForResourceAndFaultZone(String resourceName, String faultZoneId) { return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, Collections.emptyMap()) .getOrDefault(resourceName, Collections.emptySet());
