This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 2f22df63daccb86836fba411d65e78fbecf174ee
Author: narendly <naren...@gmail.com>
AuthorDate: Mon Feb 25 17:47:58 2019 -0800

    [HELIX-793] TASK: Make TaskAssigner honor instance constraints
        Previously, ThreadCountBasedTaskAssigner was assigning to all 
AssignableInstances. This could potentially be problematic because some users 
may wish to use InstanceGroupTags, in which case we must filter out instances 
that do not have the appropriate tags. This RB adds a logic that helps 
TaskAssigner honor such constraints.
    
        Changelist:
        1. TaskAssigner only assigns to AssignableInstances contained in 
eligible instances
        2. Add a test for this logic change
---
 .../helix/task/AssignableInstanceManager.java      | 10 ++++
 .../ThreadCountBasedTaskAssignmentCalculator.java  |  2 +-
 .../apache/helix/task/assigner/TaskAssigner.java   |  4 +-
 .../assigner/ThreadCountBasedTaskAssigner.java     | 14 +++--
 .../assigner/TestThreadCountBasedTaskAssigner.java | 67 +++++++++++++++++-----
 5 files changed, 78 insertions(+), 19 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java 
b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
index 242eab2..564a32b 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
@@ -255,6 +255,15 @@ public class AssignableInstanceManager {
   }
 
   /**
+   * Returns an AssignableInstance object by name.
+   * @param instanceName
+   * @return
+   */
+  public AssignableInstance getAssignableInstance(String instanceName) {
+    return _assignableInstanceMap.get(instanceName);
+  }
+
+  /**
    * Returns all AssignableInstances that support a given quota type.
    * @param quotaType
    * @return unmodifiable set of AssignableInstances
@@ -339,6 +348,7 @@ public class AssignableInstanceManager {
   public void assign(String instanceName, TaskAssignResult result) throws 
IllegalStateException {
     if (result != null && _assignableInstanceMap.containsKey(instanceName)) {
       _assignableInstanceMap.get(instanceName).assign(result);
+      _taskAssignResultMap.put(result.getTaskConfig().getId(), result);
     }
 
     if (_globalThreadBasedQuotaMap.containsKey(result.getQuotaType())) {
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
 
b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
index 227b3d1..6727d9d 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
@@ -94,7 +94,7 @@ public class ThreadCountBasedTaskAssignmentCalculator extends 
TaskAssignmentCalc
 
     // Assign tasks to AssignableInstances
     Map<String, TaskAssignResult> taskAssignResultMap =
-        _taskAssigner.assignTasks(_assignableInstanceManager, taskConfigs, 
quotaType);
+        _taskAssigner.assignTasks(_assignableInstanceManager, instances, 
taskConfigs, quotaType);
 
     // TODO: Do this with Quota Manager is ready
     // Cache TaskAssignResultMap to prevent double-assign
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java 
b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java
index 60a7ad3..244ecd8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java
@@ -19,6 +19,7 @@ package org.apache.helix.task.assigner;
  * under the License.
  */
 
+import java.util.Collection;
 import java.util.Map;
 import org.apache.helix.task.AssignableInstanceManager;
 import org.apache.helix.task.TaskConfig;
@@ -54,10 +55,11 @@ public interface TaskAssigner {
    * When an assignment decision is made, AssignableInstance.assign() must be 
called for the
    * instance to modify its internal capacity profile.
    * @param assignableInstanceManager AssignableInstanceManager
+   * @param instances instances to assign to (need this to honor instance 
group tags)
    * @param tasks TaskConfigs of the same quota type
    * @param quotaType quota type of the tasks
    * @return taskID -> TaskAssignmentResult mappings
    */
   Map<String, TaskAssignResult> assignTasks(AssignableInstanceManager 
assignableInstanceManager,
-      Iterable<TaskConfig> tasks, String quotaType);
+      Collection<String> instances, Iterable<TaskConfig> tasks, String 
quotaType);
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
 
b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
index 92f56d4..146fd14 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
@@ -19,9 +19,11 @@ package org.apache.helix.task.assigner;
  * under the License.
  */
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.PriorityQueue;
 import org.apache.helix.model.LiveInstance;
@@ -70,10 +72,14 @@ public class ThreadCountBasedTaskAssigner implements 
TaskAssigner {
 
   @Override
   public Map<String, TaskAssignResult> assignTasks(
-      AssignableInstanceManager assignableInstanceManager, 
Iterable<TaskConfig> tasks,
-      String quotaType) {
-    Iterable<AssignableInstance> assignableInstances =
-        assignableInstanceManager.getAssignableInstanceMap().values();
+      AssignableInstanceManager assignableInstanceManager, Collection<String> 
instances,
+      Iterable<TaskConfig> tasks, String quotaType) {
+    Iterable<AssignableInstance> assignableInstances = new HashSet<>();
+    // Only add the AssignableInstances that are also in instances
+    for (String instance : instances) {
+      ((HashSet<AssignableInstance>) assignableInstances)
+          .add(assignableInstanceManager.getAssignableInstance(instance));
+    }
 
     if (tasks == null || !tasks.iterator().hasNext()) {
       logger.warn("No task to assign!");
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/assigner/TestThreadCountBasedTaskAssigner.java
 
b/helix-core/src/test/java/org/apache/helix/task/assigner/TestThreadCountBasedTaskAssigner.java
index 232b12f..8d389d7 100644
--- 
a/helix-core/src/test/java/org/apache/helix/task/assigner/TestThreadCountBasedTaskAssigner.java
+++ 
b/helix-core/src/test/java/org/apache/helix/task/assigner/TestThreadCountBasedTaskAssigner.java
@@ -20,16 +20,20 @@ package org.apache.helix.task.assigner;
  */
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import org.apache.helix.common.caches.TaskDataCache;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.task.AssignableInstanceManager;
+import org.apache.helix.task.TaskAssignmentCalculator;
 import org.apache.helix.task.TaskConfig;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -50,8 +54,8 @@ public class TestThreadCountBasedTaskAssigner extends 
AssignerTestBase {
       List<TaskConfig> tasks = createTaskConfigs(taskCountPerType);
 
       // Assign
-      Map<String, TaskAssignResult> results =
-          assigner.assignTasks(assignableInstanceManager, tasks, quotaType);
+      Map<String, TaskAssignResult> results = 
assigner.assignTasks(assignableInstanceManager,
+          assignableInstanceManager.getAssignableInstanceNames(), tasks, 
quotaType);
 
       // Check success
       assertAssignmentResults(results.values(), true);
@@ -72,8 +76,9 @@ public class TestThreadCountBasedTaskAssigner extends 
AssignerTestBase {
     TaskAssigner assigner = new ThreadCountBasedTaskAssigner();
     int taskCount = 10;
     List<TaskConfig> tasks = createTaskConfigs(taskCount);
-    Map<String, TaskAssignResult> results =
-        assigner.assignTasks(new AssignableInstanceManager(), tasks, "Dummy");
+    AssignableInstanceManager assignableInstanceManager = new 
AssignableInstanceManager();
+    Map<String, TaskAssignResult> results = 
assigner.assignTasks(assignableInstanceManager,
+        assignableInstanceManager.getAssignableInstanceNames(), tasks, 
"Dummy");
     Assert.assertEquals(results.size(), taskCount);
     for (TaskAssignResult result : results.values()) {
       Assert.assertFalse(result.isSuccessful());
@@ -87,9 +92,9 @@ public class TestThreadCountBasedTaskAssigner extends 
AssignerTestBase {
   public void testAssignmentFailureNoTask() {
     TaskAssigner assigner = new ThreadCountBasedTaskAssigner();
     AssignableInstanceManager assignableInstanceManager = 
createAssignableInstanceManager(1, 10);
-    Map<String, TaskAssignResult> results = assigner
-        .assignTasks(assignableInstanceManager, 
Collections.<TaskConfig>emptyList(),
-            AssignableInstance.DEFAULT_QUOTA_TYPE);
+    Map<String, TaskAssignResult> results = 
assigner.assignTasks(assignableInstanceManager,
+        assignableInstanceManager.getAssignableInstanceNames(),
+        Collections.<TaskConfig> emptyList(), 
AssignableInstance.DEFAULT_QUOTA_TYPE);
     Assert.assertTrue(results.isEmpty());
   }
 
@@ -101,8 +106,8 @@ public class TestThreadCountBasedTaskAssigner extends 
AssignerTestBase {
     AssignableInstanceManager assignableInstanceManager = 
createAssignableInstanceManager(2, 10);
     List<TaskConfig> tasks = createTaskConfigs(20);
 
-    Map<String, TaskAssignResult> results =
-        assigner.assignTasks(assignableInstanceManager, tasks, 
testQuotaTypes[0]);
+    Map<String, TaskAssignResult> results = 
assigner.assignTasks(assignableInstanceManager,
+        assignableInstanceManager.getAssignableInstanceNames(), tasks, 
testQuotaTypes[0]);
     int successCnt = 0;
     int failCnt = 0;
     for (TaskAssignResult rst : results.values()) {
@@ -128,8 +133,8 @@ public class TestThreadCountBasedTaskAssigner extends 
AssignerTestBase {
     tasks.addAll(createTaskConfigs(10, false));
     Collections.shuffle(tasks);
 
-    Map<String, TaskAssignResult> results =
-        assigner.assignTasks(assignableInstanceManager, tasks, 
testQuotaTypes[0]);
+    Map<String, TaskAssignResult> results = 
assigner.assignTasks(assignableInstanceManager,
+        assignableInstanceManager.getAssignableInstanceNames(), tasks, 
testQuotaTypes[0]);
     Assert.assertEquals(results.size(), 10);
     assertAssignmentResults(results.values(), true);
   }
@@ -144,6 +149,43 @@ public class TestThreadCountBasedTaskAssigner extends 
AssignerTestBase {
     }
   }
 
+  @Test
+  public void testAssignmentToGivenInstances() {
+    int totalNumberOfInstances = 10;
+    int eligibleNumberOfInstances = 5;
+    String instanceNameFormat = "instance-%s";
+
+    TaskAssigner assigner = new ThreadCountBasedTaskAssigner();
+    AssignableInstanceManager assignableInstanceManager = 
createAssignableInstanceManager(10, 20);
+    List<TaskConfig> tasks = createTaskConfigs(100, false);
+    Set<String> eligibleInstances = new HashSet<>();
+
+    // Add only eligible number of instances
+    for (int i = 0; i < eligibleNumberOfInstances; i++) {
+      eligibleInstances.add(String.format(instanceNameFormat, i));
+    }
+
+    Map<String, TaskAssignResult> result = 
assigner.assignTasks(assignableInstanceManager,
+        eligibleInstances, tasks, testQuotaTypes[0]);
+
+    for (int i = 0; i < totalNumberOfInstances; i++) {
+      String instance = String.format(instanceNameFormat, i);
+      Set<String> test = 
assignableInstanceManager.getAssignableInstance(instance)
+          .getCurrentAssignments();
+      boolean isAssignmentEmpty = 
assignableInstanceManager.getAssignableInstance(instance)
+          .getCurrentAssignments().isEmpty();
+      // Check that assignment only took place to eligible number of instances 
and that assignment
+      // did not happen to non-eligible AssignableInstances
+      if (i < eligibleNumberOfInstances) {
+        // Must have tasks assigned to these instances
+        Assert.assertFalse(isAssignmentEmpty);
+      } else {
+        // These instances should have no tasks assigned to them
+        Assert.assertTrue(isAssignmentEmpty);
+      }
+    }
+  }
+
   private void profileAssigner(int assignBatchSize, int instanceCount, int 
taskCount) {
     int trail = 100;
     long totalTime = 0;
@@ -160,6 +202,7 @@ public class TestThreadCountBasedTaskAssigner extends 
AssignerTestBase {
       long start = System.currentTimeMillis();
       for (int j = 0; j < taskCount / assignBatchSize; j++) {
         allResults.add(assigner.assignTasks(assignableInstanceManager,
+            assignableInstanceManager.getAssignableInstanceNames(),
             tasks.subList(j * assignBatchSize, (j + 1) * assignBatchSize), 
testQuotaTypes[0]));
       }
       long duration = System.currentTimeMillis() - start;
@@ -197,7 +240,6 @@ public class TestThreadCountBasedTaskAssigner extends 
AssignerTestBase {
 
   private AssignableInstanceManager createAssignableInstanceManager(int count, 
int threadCount) {
     AssignableInstanceManager assignableInstanceManager = new 
AssignableInstanceManager();
-    List<AssignableInstance> instances = new ArrayList<>();
     ClusterConfig clusterConfig = createClusterConfig(testQuotaTypes, 
testQuotaRatio, false);
     String instanceNameFormat = "instance-%s";
     Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
@@ -208,7 +250,6 @@ public class TestThreadCountBasedTaskAssigner extends 
AssignerTestBase {
           new String[] { 
LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name() },
           new String[] { Integer.toString(threadCount) }, instanceName));
       instanceConfigMap.put(instanceName, new InstanceConfig(instanceName));
-
     }
 
     assignableInstanceManager

Reply via email to