Repository: helix Updated Branches: refs/heads/master e44b29e03 -> 4c3ad2aec
[HELIX-718] implement ThreadCountBasedTaskAssigner Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4c3ad2ae Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4c3ad2ae Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4c3ad2ae Branch: refs/heads/master Commit: 4c3ad2aecc07de97d5f1976a61858ddbe2f836ed Parents: e44b29e Author: Harry Zhang <[email protected]> Authored: Mon Jul 9 16:04:19 2018 -0700 Committer: Harry Zhang <[email protected]> Committed: Mon Jul 9 16:36:55 2018 -0700 ---------------------------------------------------------------------- .../helix/task/assigner/TaskAssignResult.java | 2 +- .../assigner/ThreadCountBasedTaskAssigner.java | 174 ++++++++++++++++ .../helix/task/assigner/AssignerTestBase.java | 69 +++++++ .../task/assigner/TestAssignableInstance.java | 38 +--- .../TestThreadCountBasedTaskAssigner.java | 206 +++++++++++++++++++ 5 files changed, 451 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/4c3ad2ae/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java index 00d7db1..f81749c 100644 --- a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java +++ b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java @@ -77,7 +77,7 @@ public class TaskAssignResult implements Comparable<TaskAssignResult> { * @return instance name. Null if assignment was not successful */ public String getInstanceName() { - return _node.getInstanceName(); + return _node == null ? null : _node.getInstanceName(); } /** http://git-wip-us.apache.org/repos/asf/helix/blob/4c3ad2ae/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java new file mode 100644 index 0000000..ece7290 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java @@ -0,0 +1,174 @@ +package org.apache.helix.task.assigner; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Random; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.task.TaskConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ThreadCountBasedTaskAssigner implements TaskAssigner { + private static final Logger logger = + LoggerFactory.getLogger(ThreadCountBasedTaskAssigner.class); + + private static final int SCHED_QUEUE_INIT_CAPACITY = 200; + + /** + * This is a simple task assigning algorithm that uses the following assumptions to achieve + * efficiency in assigning tasks: + * 1. All tasks have same quota type + * 2. All tasks only need 1 thread for assignment, no other things to consider + * + * The algorithm ensures the spread-out of tasks with same quota type or tasks from same job, with + * best effort. + * NOTE: once we have more things to consider during scheduling, we will need to come up with + * a more generic task assignment algorithm + * @param assignableInstances String -> AssignableInstanceMapping + * @param tasks String -> TaskConfig + * @return taskID -> TaskAssignmentResult mapping per task + */ + public Map<String, TaskAssignResult> assignTasks(Iterable<AssignableInstance> assignableInstances, + Iterable<TaskConfig> tasks) { + if (tasks == null || !tasks.iterator().hasNext()) { + logger.warn("No task to assign!"); + return Collections.emptyMap(); + } + if (assignableInstances == null || !assignableInstances.iterator().hasNext()) { + logger.warn("No instance to assign!"); + return buildNoInstanceAssignment(tasks); + } + + // get quota type + String quotaType = tasks.iterator().next().getQuotaType(); + logger.info("Assigning tasks with quota type {}", quotaType); + + // Build a sched queue + PriorityQueue<AssignableInstance> queue = buildSchedQueue(quotaType, assignableInstances); + + // Assign + Map<String, TaskAssignResult> assignResults = new HashMap<>(); + TaskAssignResult lastFailure = null; + for (TaskConfig task : tasks) { + + // Dedup + if (assignResults.containsKey(task.getId())) { + logger.warn("Duplicated task assignment {}", task); + continue; + } + + // Every time we try to assign the task to the least-used instance, if that fails, + // we assume all subsequent tasks will fail with same reason + if (lastFailure != null) { + assignResults.put(task.getId(), + new TaskAssignResult(task, null, false, lastFailure.getFitnessScore(), + lastFailure.getFailureReason(), lastFailure.getFailureDescription())); + continue; + } + + // Try to assign the task to least used instance + AssignableInstance instance = queue.poll(); + TaskAssignResult result = instance.tryAssign(task); + assignResults.put(task.getId(), result); + + if (!result.isSuccessful()){ + // For all failure reasons other than duplicated assignment, we can fail + // subsequent tasks + lastFailure = result; + } else { + // If the task is successfully accepted by the instance, assign it to the instance + instance.assign(result); + + // requeue the instance to rank again + queue.offer(instance); + } + } + logger.info("Finished assigning tasks with quota type {}", quotaType); + return assignResults; + } + + private PriorityQueue<AssignableInstance> buildSchedQueue(String quotaType, + Iterable<AssignableInstance> instances) { + AssignableInstanceComparator comparator = new AssignableInstanceComparator(quotaType); + PriorityQueue<AssignableInstance> queue = + new PriorityQueue<>(SCHED_QUEUE_INIT_CAPACITY, comparator); + for (AssignableInstance assignableInstance : instances) { + queue.offer(assignableInstance); + } + return queue; + } + + private Map<String, TaskAssignResult> buildNoInstanceAssignment(Iterable<TaskConfig> tasks) { + Map<String, TaskAssignResult> result = new HashMap<>(); + for (TaskConfig taskConfig : tasks) { + result.put(taskConfig.getId(), new TaskAssignResult(taskConfig, null, false, 0, + TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA, "No assignable instance to assign")); + } + return result; + } + + private class AssignableInstanceComparator implements Comparator<AssignableInstance> { + + /** + * Resource type this comparator needs to compare + */ + private final String RESOURCE_TYPE = LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name(); + + /** + * Resource quota type this comparator needs to compare + */ + private final String _quotaType; + + public AssignableInstanceComparator(String quotaType) { + _quotaType = quotaType; + } + + /** + * Using this comparator, AssignableInstance will be sorted based on availability of + * quota given job type in the priority queue. Top of the queue will be the one with + * highest priority + * + * @return a negative integer, zero, or a positive integer as the + * first argument is less than, equal to, or greater than the + * second + */ + @Override + public int compare(AssignableInstance o1, AssignableInstance o2) { + Integer o1RemainingCapacity = getRemainingUsage(o1.getTotalCapacity(), o1.getUsedCapacity()); + Integer o2RemainingCapacity = getRemainingUsage(o2.getTotalCapacity(), o2.getUsedCapacity()); + return o2RemainingCapacity - o1RemainingCapacity; + } + + private Integer getRemainingUsage(Map<String, Map<String, Integer>> capacity, + Map<String, Map<String, Integer>> used) { + if (capacity.containsKey(RESOURCE_TYPE) && capacity.get(RESOURCE_TYPE) + .containsKey(_quotaType)) { + return capacity.get(RESOURCE_TYPE).get(_quotaType) - used.get(RESOURCE_TYPE) + .get(_quotaType); + } + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/4c3ad2ae/helix-core/src/test/java/org/apache/helix/task/assigner/AssignerTestBase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/AssignerTestBase.java b/helix-core/src/test/java/org/apache/helix/task/assigner/AssignerTestBase.java new file mode 100644 index 0000000..4030df7 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/task/assigner/AssignerTestBase.java @@ -0,0 +1,69 @@ +package org.apache.helix.task.assigner; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.Map; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.task.TaskConfig; + +/* package */ class AssignerTestBase { + + private static final String testClusterName = "testCluster"; + static final String testInstanceName = "testInstance"; + + static final String[] testResourceTypes = new String[] {"Resource1", "Resource2", "Resource3"}; + static final String[] testResourceCapacity = new String[] {"20", "50", "100"}; + + static final String[] testQuotaTypes = new String[] {"Type1", "Type2", "Type3"}; + static final String[] testQuotaRatio = new String[] {"50", "30", "20"}; + private static final String defaultQuotaRatio = "100"; + + /* package */ LiveInstance createLiveInstance(String[] resourceTypes, String[] resourceCapacity) { + return createLiveInstance(resourceTypes, resourceCapacity, testInstanceName); + } + + /* package */ LiveInstance createLiveInstance(String[] resourceTypes, String[] resourceCapacity, String instancename) { + LiveInstance li = new LiveInstance(instancename); + if (resourceCapacity != null && resourceTypes != null) { + Map<String, String> resMap = new HashMap<>(); + for (int i = 0; i < resourceCapacity.length; i++) { + resMap.put(resourceTypes[i], resourceCapacity[i]); + } + li.setResourceCapacityMap(resMap); + } + return li; + } + + /* package */ ClusterConfig createClusterConfig(String[] quotaTypes, String[] quotaRatio, + boolean addDefaultQuota) { + ClusterConfig clusterConfig = new ClusterConfig(testClusterName); + if (quotaTypes != null && quotaRatio != null) { + for (int i = 0; i < quotaTypes.length; i++) { + clusterConfig.setTaskQuotaRatio(quotaTypes[i], quotaRatio[i]); + } + } + if (addDefaultQuota) { + clusterConfig.setTaskQuotaRatio(TaskConfig.DEFAULT_QUOTA_TYPE, defaultQuotaRatio); + } + return clusterConfig; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/4c3ad2ae/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java index 9b5974a..f1c92e3 100644 --- a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java +++ b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java @@ -29,17 +29,7 @@ import org.apache.helix.task.TaskStateModelFactory; import org.testng.Assert; import org.testng.annotations.Test; -public class TestAssignableInstance { - private static final String testClusterName = "testCluster"; - private static final String testInstanceName = "testInstance"; - - private static final String[] testResourceTypes = new String[] {"Resource1", "Resource2", "Resource3"}; - private static final String[] testResourceCapacity = new String[] {"20", "50", "100"}; - - private static final String[] testQuotaTypes = new String[] {"Type1", "Type2", "Type3"}; - private static final String[] testQuotaRatio = new String[] {"50", "30", "20"}; - private static final String defaultQuotaRatio = "100"; - +public class TestAssignableInstance extends AssignerTestBase { @Test public void testInvalidInitialization() { @@ -330,30 +320,4 @@ public class TestAssignableInstance { } return expectedQuotaPerType; } - - private LiveInstance createLiveInstance(String[] resourceTypes, String[] resourceCapacity) { - LiveInstance li = new LiveInstance(testInstanceName); - if (resourceCapacity != null && resourceTypes != null) { - Map<String, String> resMap = new HashMap<>(); - for (int i = 0; i < resourceCapacity.length; i++) { - resMap.put(resourceTypes[i], resourceCapacity[i]); - } - li.setResourceCapacityMap(resMap); - } - return li; - } - - private ClusterConfig createClusterConfig(String[] quotaTypes, String[] quotaRatio, - boolean addDefaultQuota) { - ClusterConfig clusterConfig = new ClusterConfig(testClusterName); - if (quotaTypes != null && quotaRatio != null) { - for (int i = 0; i < quotaTypes.length; i++) { - clusterConfig.setTaskQuotaRatio(quotaTypes[i], quotaRatio[i]); - } - } - if (addDefaultQuota) { - clusterConfig.setTaskQuotaRatio(TaskConfig.DEFAULT_QUOTA_TYPE, defaultQuotaRatio); - } - return clusterConfig; - } } http://git-wip-us.apache.org/repos/asf/helix/blob/4c3ad2ae/helix-core/src/test/java/org/apache/helix/task/assigner/TestThreadCountBasedTaskAssigner.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/TestThreadCountBasedTaskAssigner.java b/helix-core/src/test/java/org/apache/helix/task/assigner/TestThreadCountBasedTaskAssigner.java new file mode 100644 index 0000000..ec8753c --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/task/assigner/TestThreadCountBasedTaskAssigner.java @@ -0,0 +1,206 @@ +package org.apache.helix.task.assigner; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.task.TaskConfig; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestThreadCountBasedTaskAssigner extends AssignerTestBase { + + @Test + public void testSuccessfulAssignment() { + TaskAssigner assigner = new ThreadCountBasedTaskAssigner(); + int taskCountPerType = 150; + int instanceCount = 20; + int threadCount = 50; + List<AssignableInstance> instances = createAssignableInstances(instanceCount, threadCount); + + for (String quotaType : testQuotaTypes) { + // Create tasks + List<TaskConfig> tasks = createTaskConfigs(taskCountPerType, quotaType); + + // Assign + Map<String, TaskAssignResult> results = assigner.assignTasks(instances, tasks); + + // Check success + assertAssignmentResults(results.values(), true); + + // Check evenness + for (AssignableInstance instance : instances) { + int assignedCount = instance.getUsedCapacity() + .get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()).get(quotaType); + Assert.assertTrue(assignedCount <= taskCountPerType / instanceCount + 1 + && assignedCount >= taskCountPerType / instanceCount); + } + } + } + + @Test + public void testAssignmentFailureNoInstance() { + TaskAssigner assigner = new ThreadCountBasedTaskAssigner(); + int taskCount = 10; + List<TaskConfig> tasks = createTaskConfigs(taskCount, "Dummy"); + Map<String, TaskAssignResult> results = + assigner.assignTasks(Collections.<AssignableInstance>emptyList(), tasks); + Assert.assertEquals(results.size(), taskCount); + for (TaskAssignResult result : results.values()) { + Assert.assertFalse(result.isSuccessful()); + Assert.assertNull(result.getAssignableInstance()); + Assert.assertEquals(result.getFailureReason(), + TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA); + } + } + + @Test + public void testAssignmentFailureNoTask() { + TaskAssigner assigner = new ThreadCountBasedTaskAssigner(); + List<AssignableInstance> instances = createAssignableInstances(1, 10); + Map<String, TaskAssignResult> results = + assigner.assignTasks(instances, Collections.<TaskConfig>emptyList()); + Assert.assertTrue(results.isEmpty()); + } + + @Test + public void testAssignmentFailureInsufficientQuota() { + TaskAssigner assigner = new ThreadCountBasedTaskAssigner(); + + // 10 * Type1 quota + List<AssignableInstance> instances = createAssignableInstances(2, 10); + List<TaskConfig> tasks = createTaskConfigs(20, testQuotaTypes[0]); + + Map<String, TaskAssignResult> results = assigner.assignTasks(instances, tasks); + int successCnt = 0; + int failCnt = 0; + for (TaskAssignResult rst : results.values()) { + if (rst.isSuccessful()) { + successCnt += 1; + } else { + failCnt += 1; + Assert.assertEquals(rst.getFailureReason(), + TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA); + } + } + Assert.assertEquals(successCnt, 10); + Assert.assertEquals(failCnt, 10); + } + + @Test + public void testAssignmentFailureDuplicatedTask() { + TaskAssigner assigner = new ThreadCountBasedTaskAssigner(); + List<AssignableInstance> instances = createAssignableInstances(1, 20); + List<TaskConfig> tasks = createTaskConfigs(10, testQuotaTypes[0], false); + + // Duplicate all tasks + tasks.addAll(createTaskConfigs(10, testQuotaTypes[0], false)); + Collections.shuffle(tasks); + + Map<String, TaskAssignResult> results = assigner.assignTasks(instances, tasks); + Assert.assertEquals(results.size(), 10); + assertAssignmentResults(results.values(), true); + } + + @Test(enabled = false, description = "Not enabling profiling tests") + public void testAssignerProfiling() { + int instanceCount = 1000; + int taskCount = 50000; + for (int batchSize : new int[] {10000, 5000, 2000, 1000, 500, 100}) { + System.out.println("testing batch size: " + batchSize); + profileAssigner(batchSize, instanceCount, taskCount); + } + } + + private void profileAssigner(int assignBatchSize, int instanceCount, int taskCount) { + int trail = 100; + long totalTime = 0; + for (int i = 0; i < trail; i++) { + TaskAssigner assigner = new ThreadCountBasedTaskAssigner(); + + // 50 * instanceCount number of tasks + List<AssignableInstance> instances = createAssignableInstances(instanceCount, 100); + List<TaskConfig> tasks = createTaskConfigs(taskCount, testQuotaTypes[0]); + List<Map<String, TaskAssignResult>> allResults = new ArrayList<>(); + + // Assign + long start = System.currentTimeMillis(); + for (int j = 0; j < taskCount / assignBatchSize; j++) { + allResults.add(assigner + .assignTasks(instances, tasks.subList(j * assignBatchSize, (j + 1) * assignBatchSize))); + } + long duration = System.currentTimeMillis() - start; + totalTime += duration; + + // Validate + for (Map<String, TaskAssignResult> results : allResults) { + for (TaskAssignResult rst : results.values()) { + Assert.assertTrue(rst.isSuccessful()); + } + } + } + System.out.println("Average time: " + totalTime / trail + "ms"); + } + + private void assertAssignmentResults(Iterable<TaskAssignResult> results, boolean expected) { + for (TaskAssignResult rst : results) { + Assert.assertEquals(rst.isSuccessful(), expected); + } + } + + private List<TaskConfig> createTaskConfigs(int count, String quotaType) { + return createTaskConfigs(count, quotaType, true); + } + + private List<TaskConfig> createTaskConfigs(int count, String quotaType, boolean randomID) { + List<TaskConfig> tasks = new ArrayList<>(); + for (int i = 0; i < count; i++) { + TaskConfig task = + new TaskConfig(null, null, randomID ? UUID.randomUUID().toString() : "task-" + i, null); + task.setQuotaType(quotaType); + tasks.add(task); + } + return tasks; + } + + private List<AssignableInstance> createAssignableInstances(int count, int threadCount) { + List<AssignableInstance> instances = new ArrayList<>(); + String instanceNameFormat = "instance-%s"; + for (int i = 0; i < count; i++) { + String instanceName = String.format(instanceNameFormat, i); + instances.add( + new AssignableInstance( + createClusterConfig(testQuotaTypes, testQuotaRatio, false), + new InstanceConfig(instanceName), + createLiveInstance( + new String[] { LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name() }, + new String[] { Integer.toString(threadCount) }, + instanceName) + ) + ); + } + return instances; + } +}
