Repository: helix
Updated Branches:
  refs/heads/master 580f1facc -> 11b721350


[HELIX-751] TASK: Fix AssignableInstanceComparator so that it sorts unsupported 
quota types

Currently, if the quota type does not exist, it will not sort 
AssignableInstances based on availability. This does not cause immediate 
problems, but it would be nice to have them sorted because we now allow 
unsupported quota types run as DEFAULT type.
Changelist:
1. Comparator sorts AssignableInstances in a PriorityQueue by DEFAULT type's 
availability when the quota type given is unsupported


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/11b72135
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/11b72135
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/11b72135

Branch: refs/heads/master
Commit: 11b721350091247cd3aa9716345ebdd53199ab53
Parents: 580f1fa
Author: Hunter Lee <[email protected]>
Authored: Wed Sep 19 10:11:31 2018 -0700
Committer: Hunter Lee <[email protected]>
Committed: Fri Sep 21 13:40:20 2018 -0700

----------------------------------------------------------------------
 .../assigner/ThreadCountBasedTaskAssigner.java  | 24 ++++++++++++--------
 1 file changed, 14 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/11b72135/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
 
b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
index 5d749fb..e7300f4 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
@@ -31,9 +31,7 @@ import org.slf4j.LoggerFactory;
 
 public class ThreadCountBasedTaskAssigner implements TaskAssigner {
   private static final Logger logger = 
LoggerFactory.getLogger(ThreadCountBasedTaskAssigner.class);
-
   private static final int SCHED_QUEUE_INIT_CAPACITY = 200;
-  private static final String DEFAULT_QUOTA_TYPE = "DEFAULT";
 
   /**
    * Assigns given tasks to given AssignableInstances assuming the DEFAULT 
quota type for all tasks.
@@ -43,7 +41,7 @@ public class ThreadCountBasedTaskAssigner implements 
TaskAssigner {
    */
   public Map<String, TaskAssignResult> 
assignTasks(Iterable<AssignableInstance> assignableInstances,
       Iterable<TaskConfig> tasks) {
-    return assignTasks(assignableInstances, tasks, DEFAULT_QUOTA_TYPE);
+    return assignTasks(assignableInstances, tasks, 
AssignableInstance.DEFAULT_QUOTA_TYPE);
   }
 
   /**
@@ -74,7 +72,7 @@ public class ThreadCountBasedTaskAssigner implements 
TaskAssigner {
     if (quotaType == null || quotaType.equals("") || quotaType.equals("null")) 
{
       // Sometimes null is stored as a String literal
       logger.warn("Quota type is null. Assigning it as DEFAULT type!");
-      quotaType = DEFAULT_QUOTA_TYPE;
+      quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
     }
 
     logger.info("Assigning tasks with quota type {}", quotaType);
@@ -94,8 +92,10 @@ public class ThreadCountBasedTaskAssigner implements 
TaskAssigner {
       }
 
       // TODO: Review this logic
-      // TODO: 1. It assumes that the only mode of failure is due to 
insufficient capacity. This assumption may not always be true. Verify
-      // TODO: 2. All TaskAssignResults will get 
failureReason/Description/TaskID for the first task that failed. This will need 
correction
+      // TODO: 1. It assumes that the only mode of failure is due to 
insufficient capacity. This
+      // assumption may not always be true. Verify
+      // TODO: 2. All TaskAssignResults will get 
failureReason/Description/TaskID for the first task
+      // that failed. This will need correction
       // Every time we try to assign the task to the least-used instance, if 
that fails,
       // we assume all subsequent tasks will fail with same reason
       if (lastFailure != null) {
@@ -180,10 +180,14 @@ public class ThreadCountBasedTaskAssigner implements 
TaskAssigner {
 
     private Integer getRemainingUsage(Map<String, Map<String, Integer>> 
capacity,
         Map<String, Map<String, Integer>> used) {
-      if (capacity.containsKey(RESOURCE_TYPE)
-          && capacity.get(RESOURCE_TYPE).containsKey(_quotaType)) {
-        return capacity.get(RESOURCE_TYPE).get(_quotaType)
-            - used.get(RESOURCE_TYPE).get(_quotaType);
+      if (capacity.containsKey(RESOURCE_TYPE)) {
+        String quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
+        if (capacity.get(RESOURCE_TYPE).containsKey(_quotaType)) {
+          // If the quotaType is not supported, sort as DEFAULT because it 
will be assigned as
+          // DEFAULT
+          quotaType = _quotaType;
+        }
+        return capacity.get(RESOURCE_TYPE).get(quotaType) - 
used.get(RESOURCE_TYPE).get(quotaType);
       }
       return 0;
     }

Reply via email to