This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 2f22df63daccb86836fba411d65e78fbecf174ee Author: narendly <naren...@gmail.com> AuthorDate: Mon Feb 25 17:47:58 2019 -0800 [HELIX-793] TASK: Make TaskAssigner honor instance constraints Previously, ThreadCountBasedTaskAssigner was assigning to all AssignableInstances. This could potentially be problematic because some users may wish to use InstanceGroupTags, in which case we must filter out instances that do not have the appropriate tags. This RB adds a logic that helps TaskAssigner honor such constraints. Changelist: 1. TaskAssigner only assigns to AssignableInstances contained in eligible instances 2. Add a test for this logic change --- .../helix/task/AssignableInstanceManager.java | 10 ++++ .../ThreadCountBasedTaskAssignmentCalculator.java | 2 +- .../apache/helix/task/assigner/TaskAssigner.java | 4 +- .../assigner/ThreadCountBasedTaskAssigner.java | 14 +++-- .../assigner/TestThreadCountBasedTaskAssigner.java | 67 +++++++++++++++++----- 5 files changed, 78 insertions(+), 19 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java index 242eab2..564a32b 100644 --- a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java +++ b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java @@ -255,6 +255,15 @@ public class AssignableInstanceManager { } /** + * Returns an AssignableInstance object by name. + * @param instanceName + * @return + */ + public AssignableInstance getAssignableInstance(String instanceName) { + return _assignableInstanceMap.get(instanceName); + } + + /** * Returns all AssignableInstances that support a given quota type. * @param quotaType * @return unmodifiable set of AssignableInstances @@ -339,6 +348,7 @@ public class AssignableInstanceManager { public void assign(String instanceName, TaskAssignResult result) throws IllegalStateException { if (result != null && _assignableInstanceMap.containsKey(instanceName)) { _assignableInstanceMap.get(instanceName).assign(result); + _taskAssignResultMap.put(result.getTaskConfig().getId(), result); } if (_globalThreadBasedQuotaMap.containsKey(result.getQuotaType())) { diff --git a/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java index 227b3d1..6727d9d 100644 --- a/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java +++ b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java @@ -94,7 +94,7 @@ public class ThreadCountBasedTaskAssignmentCalculator extends TaskAssignmentCalc // Assign tasks to AssignableInstances Map<String, TaskAssignResult> taskAssignResultMap = - _taskAssigner.assignTasks(_assignableInstanceManager, taskConfigs, quotaType); + _taskAssigner.assignTasks(_assignableInstanceManager, instances, taskConfigs, quotaType); // TODO: Do this with Quota Manager is ready // Cache TaskAssignResultMap to prevent double-assign diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java index 60a7ad3..244ecd8 100644 --- a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java +++ b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java @@ -19,6 +19,7 @@ package org.apache.helix.task.assigner; * under the License. */ +import java.util.Collection; import java.util.Map; import org.apache.helix.task.AssignableInstanceManager; import org.apache.helix.task.TaskConfig; @@ -54,10 +55,11 @@ public interface TaskAssigner { * When an assignment decision is made, AssignableInstance.assign() must be called for the * instance to modify its internal capacity profile. * @param assignableInstanceManager AssignableInstanceManager + * @param instances instances to assign to (need this to honor instance group tags) * @param tasks TaskConfigs of the same quota type * @param quotaType quota type of the tasks * @return taskID -> TaskAssignmentResult mappings */ Map<String, TaskAssignResult> assignTasks(AssignableInstanceManager assignableInstanceManager, - Iterable<TaskConfig> tasks, String quotaType); + Collection<String> instances, Iterable<TaskConfig> tasks, String quotaType); } diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java index 92f56d4..146fd14 100644 --- a/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java +++ b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java @@ -19,9 +19,11 @@ package org.apache.helix.task.assigner; * under the License. */ +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.PriorityQueue; import org.apache.helix.model.LiveInstance; @@ -70,10 +72,14 @@ public class ThreadCountBasedTaskAssigner implements TaskAssigner { @Override public Map<String, TaskAssignResult> assignTasks( - AssignableInstanceManager assignableInstanceManager, Iterable<TaskConfig> tasks, - String quotaType) { - Iterable<AssignableInstance> assignableInstances = - assignableInstanceManager.getAssignableInstanceMap().values(); + AssignableInstanceManager assignableInstanceManager, Collection<String> instances, + Iterable<TaskConfig> tasks, String quotaType) { + Iterable<AssignableInstance> assignableInstances = new HashSet<>(); + // Only add the AssignableInstances that are also in instances + for (String instance : instances) { + ((HashSet<AssignableInstance>) assignableInstances) + .add(assignableInstanceManager.getAssignableInstance(instance)); + } if (tasks == null || !tasks.iterator().hasNext()) { logger.warn("No task to assign!"); diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/TestThreadCountBasedTaskAssigner.java b/helix-core/src/test/java/org/apache/helix/task/assigner/TestThreadCountBasedTaskAssigner.java index 232b12f..8d389d7 100644 --- a/helix-core/src/test/java/org/apache/helix/task/assigner/TestThreadCountBasedTaskAssigner.java +++ b/helix-core/src/test/java/org/apache/helix/task/assigner/TestThreadCountBasedTaskAssigner.java @@ -20,16 +20,20 @@ package org.apache.helix.task.assigner; */ import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.helix.common.caches.TaskDataCache; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.task.AssignableInstanceManager; +import org.apache.helix.task.TaskAssignmentCalculator; import org.apache.helix.task.TaskConfig; import org.testng.Assert; import org.testng.annotations.Test; @@ -50,8 +54,8 @@ public class TestThreadCountBasedTaskAssigner extends AssignerTestBase { List<TaskConfig> tasks = createTaskConfigs(taskCountPerType); // Assign - Map<String, TaskAssignResult> results = - assigner.assignTasks(assignableInstanceManager, tasks, quotaType); + Map<String, TaskAssignResult> results = assigner.assignTasks(assignableInstanceManager, + assignableInstanceManager.getAssignableInstanceNames(), tasks, quotaType); // Check success assertAssignmentResults(results.values(), true); @@ -72,8 +76,9 @@ public class TestThreadCountBasedTaskAssigner extends AssignerTestBase { TaskAssigner assigner = new ThreadCountBasedTaskAssigner(); int taskCount = 10; List<TaskConfig> tasks = createTaskConfigs(taskCount); - Map<String, TaskAssignResult> results = - assigner.assignTasks(new AssignableInstanceManager(), tasks, "Dummy"); + AssignableInstanceManager assignableInstanceManager = new AssignableInstanceManager(); + Map<String, TaskAssignResult> results = assigner.assignTasks(assignableInstanceManager, + assignableInstanceManager.getAssignableInstanceNames(), tasks, "Dummy"); Assert.assertEquals(results.size(), taskCount); for (TaskAssignResult result : results.values()) { Assert.assertFalse(result.isSuccessful()); @@ -87,9 +92,9 @@ public class TestThreadCountBasedTaskAssigner extends AssignerTestBase { public void testAssignmentFailureNoTask() { TaskAssigner assigner = new ThreadCountBasedTaskAssigner(); AssignableInstanceManager assignableInstanceManager = createAssignableInstanceManager(1, 10); - Map<String, TaskAssignResult> results = assigner - .assignTasks(assignableInstanceManager, Collections.<TaskConfig>emptyList(), - AssignableInstance.DEFAULT_QUOTA_TYPE); + Map<String, TaskAssignResult> results = assigner.assignTasks(assignableInstanceManager, + assignableInstanceManager.getAssignableInstanceNames(), + Collections.<TaskConfig> emptyList(), AssignableInstance.DEFAULT_QUOTA_TYPE); Assert.assertTrue(results.isEmpty()); } @@ -101,8 +106,8 @@ public class TestThreadCountBasedTaskAssigner extends AssignerTestBase { AssignableInstanceManager assignableInstanceManager = createAssignableInstanceManager(2, 10); List<TaskConfig> tasks = createTaskConfigs(20); - Map<String, TaskAssignResult> results = - assigner.assignTasks(assignableInstanceManager, tasks, testQuotaTypes[0]); + Map<String, TaskAssignResult> results = assigner.assignTasks(assignableInstanceManager, + assignableInstanceManager.getAssignableInstanceNames(), tasks, testQuotaTypes[0]); int successCnt = 0; int failCnt = 0; for (TaskAssignResult rst : results.values()) { @@ -128,8 +133,8 @@ public class TestThreadCountBasedTaskAssigner extends AssignerTestBase { tasks.addAll(createTaskConfigs(10, false)); Collections.shuffle(tasks); - Map<String, TaskAssignResult> results = - assigner.assignTasks(assignableInstanceManager, tasks, testQuotaTypes[0]); + Map<String, TaskAssignResult> results = assigner.assignTasks(assignableInstanceManager, + assignableInstanceManager.getAssignableInstanceNames(), tasks, testQuotaTypes[0]); Assert.assertEquals(results.size(), 10); assertAssignmentResults(results.values(), true); } @@ -144,6 +149,43 @@ public class TestThreadCountBasedTaskAssigner extends AssignerTestBase { } } + @Test + public void testAssignmentToGivenInstances() { + int totalNumberOfInstances = 10; + int eligibleNumberOfInstances = 5; + String instanceNameFormat = "instance-%s"; + + TaskAssigner assigner = new ThreadCountBasedTaskAssigner(); + AssignableInstanceManager assignableInstanceManager = createAssignableInstanceManager(10, 20); + List<TaskConfig> tasks = createTaskConfigs(100, false); + Set<String> eligibleInstances = new HashSet<>(); + + // Add only eligible number of instances + for (int i = 0; i < eligibleNumberOfInstances; i++) { + eligibleInstances.add(String.format(instanceNameFormat, i)); + } + + Map<String, TaskAssignResult> result = assigner.assignTasks(assignableInstanceManager, + eligibleInstances, tasks, testQuotaTypes[0]); + + for (int i = 0; i < totalNumberOfInstances; i++) { + String instance = String.format(instanceNameFormat, i); + Set<String> test = assignableInstanceManager.getAssignableInstance(instance) + .getCurrentAssignments(); + boolean isAssignmentEmpty = assignableInstanceManager.getAssignableInstance(instance) + .getCurrentAssignments().isEmpty(); + // Check that assignment only took place to eligible number of instances and that assignment + // did not happen to non-eligible AssignableInstances + if (i < eligibleNumberOfInstances) { + // Must have tasks assigned to these instances + Assert.assertFalse(isAssignmentEmpty); + } else { + // These instances should have no tasks assigned to them + Assert.assertTrue(isAssignmentEmpty); + } + } + } + private void profileAssigner(int assignBatchSize, int instanceCount, int taskCount) { int trail = 100; long totalTime = 0; @@ -160,6 +202,7 @@ public class TestThreadCountBasedTaskAssigner extends AssignerTestBase { long start = System.currentTimeMillis(); for (int j = 0; j < taskCount / assignBatchSize; j++) { allResults.add(assigner.assignTasks(assignableInstanceManager, + assignableInstanceManager.getAssignableInstanceNames(), tasks.subList(j * assignBatchSize, (j + 1) * assignBatchSize), testQuotaTypes[0])); } long duration = System.currentTimeMillis() - start; @@ -197,7 +240,6 @@ public class TestThreadCountBasedTaskAssigner extends AssignerTestBase { private AssignableInstanceManager createAssignableInstanceManager(int count, int threadCount) { AssignableInstanceManager assignableInstanceManager = new AssignableInstanceManager(); - List<AssignableInstance> instances = new ArrayList<>(); ClusterConfig clusterConfig = createClusterConfig(testQuotaTypes, testQuotaRatio, false); String instanceNameFormat = "instance-%s"; Map<String, LiveInstance> liveInstanceMap = new HashMap<>(); @@ -208,7 +250,6 @@ public class TestThreadCountBasedTaskAssigner extends AssignerTestBase { new String[] { LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name() }, new String[] { Integer.toString(threadCount) }, instanceName)); instanceConfigMap.put(instanceName, new InstanceConfig(instanceName)); - } assignableInstanceManager