Repository: helix Updated Branches: refs/heads/master 580f1facc -> 11b721350
[HELIX-751] TASK: Fix AssignableInstanceComparator so that it sorts unsupported quota types Currently, if the quota type does not exist, it will not sort AssignableInstances based on availability. This does not cause immediate problems, but it would be nice to have them sorted because we now allow unsupported quota types run as DEFAULT type. Changelist: 1. Comparator sorts AssignableInstances in a PriorityQueue by DEFAULT type's availability when the quota type given is unsupported Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/11b72135 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/11b72135 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/11b72135 Branch: refs/heads/master Commit: 11b721350091247cd3aa9716345ebdd53199ab53 Parents: 580f1fa Author: Hunter Lee <[email protected]> Authored: Wed Sep 19 10:11:31 2018 -0700 Committer: Hunter Lee <[email protected]> Committed: Fri Sep 21 13:40:20 2018 -0700 ---------------------------------------------------------------------- .../assigner/ThreadCountBasedTaskAssigner.java | 24 ++++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/11b72135/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java index 5d749fb..e7300f4 100644 --- a/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java +++ b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java @@ -31,9 +31,7 @@ import org.slf4j.LoggerFactory; public class ThreadCountBasedTaskAssigner implements TaskAssigner { private static final Logger logger = LoggerFactory.getLogger(ThreadCountBasedTaskAssigner.class); - private static final int SCHED_QUEUE_INIT_CAPACITY = 200; - private static final String DEFAULT_QUOTA_TYPE = "DEFAULT"; /** * Assigns given tasks to given AssignableInstances assuming the DEFAULT quota type for all tasks. @@ -43,7 +41,7 @@ public class ThreadCountBasedTaskAssigner implements TaskAssigner { */ public Map<String, TaskAssignResult> assignTasks(Iterable<AssignableInstance> assignableInstances, Iterable<TaskConfig> tasks) { - return assignTasks(assignableInstances, tasks, DEFAULT_QUOTA_TYPE); + return assignTasks(assignableInstances, tasks, AssignableInstance.DEFAULT_QUOTA_TYPE); } /** @@ -74,7 +72,7 @@ public class ThreadCountBasedTaskAssigner implements TaskAssigner { if (quotaType == null || quotaType.equals("") || quotaType.equals("null")) { // Sometimes null is stored as a String literal logger.warn("Quota type is null. Assigning it as DEFAULT type!"); - quotaType = DEFAULT_QUOTA_TYPE; + quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE; } logger.info("Assigning tasks with quota type {}", quotaType); @@ -94,8 +92,10 @@ public class ThreadCountBasedTaskAssigner implements TaskAssigner { } // TODO: Review this logic - // TODO: 1. It assumes that the only mode of failure is due to insufficient capacity. This assumption may not always be true. Verify - // TODO: 2. All TaskAssignResults will get failureReason/Description/TaskID for the first task that failed. This will need correction + // TODO: 1. It assumes that the only mode of failure is due to insufficient capacity. This + // assumption may not always be true. Verify + // TODO: 2. All TaskAssignResults will get failureReason/Description/TaskID for the first task + // that failed. This will need correction // Every time we try to assign the task to the least-used instance, if that fails, // we assume all subsequent tasks will fail with same reason if (lastFailure != null) { @@ -180,10 +180,14 @@ public class ThreadCountBasedTaskAssigner implements TaskAssigner { private Integer getRemainingUsage(Map<String, Map<String, Integer>> capacity, Map<String, Map<String, Integer>> used) { - if (capacity.containsKey(RESOURCE_TYPE) - && capacity.get(RESOURCE_TYPE).containsKey(_quotaType)) { - return capacity.get(RESOURCE_TYPE).get(_quotaType) - - used.get(RESOURCE_TYPE).get(_quotaType); + if (capacity.containsKey(RESOURCE_TYPE)) { + String quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE; + if (capacity.get(RESOURCE_TYPE).containsKey(_quotaType)) { + // If the quotaType is not supported, sort as DEFAULT because it will be assigned as + // DEFAULT + quotaType = _quotaType; + } + return capacity.get(RESOURCE_TYPE).get(quotaType) - used.get(RESOURCE_TYPE).get(quotaType); } return 0; }
