Repository: helix Updated Branches: refs/heads/master 84c2feabb -> 2049f93ab
[HELIX-718] implement AssignableInstance Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2049f93a Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2049f93a Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2049f93a Branch: refs/heads/master Commit: 2049f93abe8e56a754e4880a9157959ef24cd89e Parents: 84c2fea Author: Harry Zhang <[email protected]> Authored: Mon Jul 9 15:49:33 2018 -0700 Committer: Harry Zhang <[email protected]> Committed: Mon Jul 9 15:49:33 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/TaskConfig.java | 4 +- .../helix/task/assigner/AssignableInstance.java | 325 +++++++++++++++++- .../helix/task/assigner/TaskAssignResult.java | 5 +- .../helix/task/assigner/TaskAssigner.java | 4 +- .../task/assigner/TestAssignableInstance.java | 334 +++++++++++++++++++ 5 files changed, 650 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/2049f93a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java index a447929..d3a8b34 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java @@ -42,7 +42,7 @@ public class TaskConfig { } private static final Logger LOG = LoggerFactory.getLogger(TaskConfig.class); - public static final String QUOTA_TYPE_NOT_SET = "QUOTA_TYPE_NOT_SET"; + public static final String DEFAULT_QUOTA_TYPE = "DEFAULT"; private final Map<String, String> _configMap; @@ -133,7 +133,7 @@ public class TaskConfig { */ public String getQuotaType() { return _configMap.containsKey(TaskConfigProperty.TASK_QUOTA_TYPE.name()) ? - _configMap.get(TaskConfigProperty.TASK_QUOTA_TYPE.name()) : QUOTA_TYPE_NOT_SET; + _configMap.get(TaskConfigProperty.TASK_QUOTA_TYPE.name()) : DEFAULT_QUOTA_TYPE; } /** http://git-wip-us.apache.org/repos/asf/helix/blob/2049f93a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java index fe59275..5d252eb 100644 --- a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java +++ b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java @@ -19,44 +19,277 @@ package org.apache.helix.task.assigner; * under the License. */ +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.task.TaskConfig; +import org.apache.helix.task.TaskStateModelFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * AssignableInstance contains instance capacity profile and methods that control capacity and help * with task assignment. */ public class AssignableInstance { + private static final Logger logger = LoggerFactory.getLogger(AssignableInstance.class); /** - * Caches tasks currently assigned to this instance. + * Fitness score will be calculated from 0 to 1000 + */ + private static final int fitnessScoreFactor = 1000; + + /** + * Caches IDs of tasks currently assigned to this instance. * Every pipeline iteration will compare Task states in this map to Task states in TaskDataCache. * Tasks in a terminal state (finished or failed) will be removed as soon as they reach the state. */ - private Map<String, TaskAssignResult> _currentAssignments; + private Set<String> _currentAssignments; private ClusterConfig _clusterConfig; private InstanceConfig _instanceConfig; private LiveInstance _liveInstance; + /** + * A map recording instance's total capacity: + * map{resourceType : map{quotaType : quota}} + */ + private Map<String, Map<String, Integer>> _totalCapacity; + + /** + * A map recording instance's used capacity + * map{resourceType : map{quotaType : quota}} + */ + private Map<String, Map<String, Integer>> _usedCapacity; + public AssignableInstance(ClusterConfig clusterConfig, InstanceConfig instanceConfig, LiveInstance liveInstance) { + if (clusterConfig == null || instanceConfig == null || liveInstance == null) { + throw new IllegalArgumentException( + "ClusterConfig, InstanceConfig, LiveInstance cannot be null!"); + } + + if (!instanceConfig.getInstanceName().equals(liveInstance.getInstanceName())) { + throw new IllegalArgumentException(String + .format("Instance name from LiveInstance (%s) and InstanceConfig (%s) don't match!", + liveInstance.getInstanceName(), instanceConfig.getInstanceName())); + } _clusterConfig = clusterConfig; _instanceConfig = instanceConfig; _liveInstance = liveInstance; + + _currentAssignments = new HashSet<>(); + _totalCapacity = new HashMap<>(); + _usedCapacity = new HashMap<>(); + refreshTotalCapacity(); + } + + /** + * When task quota ratio / instance's resource capacity change, we need to update instance + * capacity cache. Couple of corner cases to clarify for updating capacity: + * 1. User shrinks capacity and used capacity exceeds total capacity - current assignment + * will not be affected (used > total is ok) but no further assignment decision will + * be made on this instance until spaces get freed up + * 2. User removed a quotaType but there are still tasks with stale quota type assigned on + * this instance - current assignment will not be affected, and further assignment will + * NOT be made for stale quota type + * 3. User removed a resourceType but there are still tasks with stale resource type assigned + * on this instance - current assignment will not be affected, but no further assignment + * with stale resource type request will be allowed on this instance + */ + private void refreshTotalCapacity() { + // Create a temp total capacity record in case we fail to parse configurations, we + // still retain existing source of truth + Map<String, Map<String, Integer>> tempTotalCapacity = new HashMap<>(); + Map<String, String> typeQuotaRatio = _clusterConfig.getTaskQuotaRatioMap(); + Map<String, String> resourceCapacity = _liveInstance.getResourceCapacityMap(); + + if (resourceCapacity == null) { + resourceCapacity = new HashMap<>(); + resourceCapacity.put(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name(), + Integer.toString(TaskStateModelFactory.TASK_THREADPOOL_SIZE)); + logger.info("No resource capacity provided in LiveInstance {}, assuming default capacity: {}", + _instanceConfig.getInstanceName(), resourceCapacity); + } + + if (typeQuotaRatio == null) { + typeQuotaRatio = new HashMap<>(); + typeQuotaRatio.put(TaskConfig.DEFAULT_QUOTA_TYPE, Integer.toString(1)); + logger.info("No quota type ratio provided in LiveInstance {}, assuming default ratio: {}", + _instanceConfig.getInstanceName(), typeQuotaRatio); + } + + logger.info( + "Updating capacity for AssignableInstance {}. Resource Capacity: {}; Type Quota Ratio: {}", + _instanceConfig.getInstanceName(), resourceCapacity, typeQuotaRatio); + + // Reconcile current and new resource types + try { + for (final Map.Entry<String, String> resEntry : resourceCapacity.entrySet()) { + String resourceType = resEntry.getKey(); + int capacity = Integer.valueOf(resEntry.getValue()); + + if (!_totalCapacity.containsKey(resourceType)) { + logger.info("Adding InstanceResourceType {}", resourceType); + _usedCapacity.put(resourceType, new HashMap<String, Integer>()); + } + tempTotalCapacity.put(resourceType, new HashMap<String, Integer>()); + + int totalRatio = 0; + for (String val : typeQuotaRatio.values()) { + totalRatio += Integer.valueOf(val); + } + + // Setup per-type resource quota based on given total capacity + for (Map.Entry<String, String> typeQuotaEntry : typeQuotaRatio.entrySet()) { + // Calculate total quota for a given type + String quotaType = typeQuotaEntry.getKey(); + int quotaRatio = Integer.valueOf(typeQuotaEntry.getValue()); + int quota = Math.round(capacity * (float)quotaRatio / (float)totalRatio); + + // Honor non-zero quota ratio for non-zero capacity even if it is rounded to zero + if (capacity != 0 && quotaRatio != 0 && quota == 0) { + quota = 1; + } + + // record total quota of the resource + tempTotalCapacity.get(resourceType).put(quotaType, quota); + + // Add quota for new quota type + if (!_usedCapacity.get(resourceType).containsKey(quotaType)) { + logger.info("Adding QuotaType {} for resource {}", quotaType, resourceType); + _usedCapacity.get(resourceType).put(quotaType, 0); + } + } + + // For removed quota type, remove record from used capacity + _usedCapacity.get(resourceType).keySet().retainAll(typeQuotaRatio.keySet()); + } + + // Update total capacity map + _totalCapacity = tempTotalCapacity; + + // Purge used capacity for resource deleted + _usedCapacity.keySet().retainAll(resourceCapacity.keySet()); + + logger.info( + "Finished updating capacity for AssignableInstance {}. Current capacity {}. Current usage: {}", + _instanceConfig.getInstanceName(), _totalCapacity, _usedCapacity); + } catch (Exception e) { + // TODO: properly escalate error + logger.error( + "Failed to update capacity for Assignableinstance {}, still using current capacity {}. Current usage: {}", + _instanceConfig.getInstanceName(), _totalCapacity, _usedCapacity, e); + } + } + + public void updateConfigs(ClusterConfig clusterConfig, InstanceConfig instanceConfig, + LiveInstance liveInstance) { + logger.info("Updating configs for AssignableInstance {}", _instanceConfig.getInstanceName()); + boolean refreshCapacity = false; + if (clusterConfig != null) { + if (!clusterConfig.getTaskQuotaRatioMap().equals(_clusterConfig.getTaskQuotaRatioMap())) { + refreshCapacity = true; + } + _clusterConfig = clusterConfig; + logger.info("Updated cluster config"); + } + + if (liveInstance != null) { + if (!_instanceConfig.getInstanceName().equals(liveInstance.getInstanceName())) { + logger.error( + "Cannot update live instance with different instance name. Current: {}; new: {}", + _instanceConfig.getInstanceName(), liveInstance.getInstanceName()); + } else { + if (!liveInstance.getResourceCapacityMap().equals(_liveInstance.getResourceCapacityMap())) { + refreshCapacity = true; + } + _liveInstance = liveInstance; + logger.info("Updated live instance"); + } + } + + if (instanceConfig != null) { + if (!_instanceConfig.getInstanceName().equals(instanceConfig.getInstanceName())) { + logger.error( + "Cannot update instance config with different instance name. Current: {}; new: {}", + _instanceConfig.getInstanceName(), instanceConfig.getInstanceName()); + } else { + _instanceConfig = instanceConfig; + logger.info("Updated instance config"); + } + } + + if (refreshCapacity) { + refreshTotalCapacity(); + } + + logger.info("Updated configs for AssignableInstance {}", _instanceConfig.getInstanceName()); } /** * Tries to assign the given task on this instance and returns TaskAssignResult. Instance capacity * profile is NOT modified by tryAssign. - * @param task - * @return + * + * When calculating fitness of an assignment, this function will rate assignment from 0 to 1000, + * and the assignment that has a higher score will be a better fit. + * + * @param task task config + * @return TaskAssignResult + * @throws IllegalArgumentException if task is null */ - public TaskAssignResult tryAssign(TaskConfig task) { - // TODO: implement - return null; + public TaskAssignResult tryAssign(TaskConfig task) throws IllegalArgumentException { + if (task == null) { + throw new IllegalArgumentException("Task is null!"); + } + + if (_currentAssignments.contains(task.getId())) { + return new TaskAssignResult(task, this, false, 0, + TaskAssignResult.FailureReason.TASK_ALREADY_ASSIGNED, String + .format("Task %s is already assigned to this instance. Need to release it first", + task.getId())); + } + + // For now we only have 1 type of resource so just hard code it here + String resourceType = LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name(); + + // Fail when no such resource type + if (!_totalCapacity.containsKey(resourceType)) { + return new TaskAssignResult(task, this, false, 0, + TaskAssignResult.FailureReason.NO_SUCH_RESOURCE_TYPE, String + .format("Requested resource type %s not supported. Available resource types: %s", + resourceType, _totalCapacity.keySet())); + } + + String quotaType = task.getQuotaType(); + + // Fail when no such quota type + if (!_totalCapacity.get(resourceType).containsKey(quotaType)) { + return new TaskAssignResult(task, this, false, 0, + TaskAssignResult.FailureReason.NO_SUCH_QUOTA_TYPE, String + .format("Requested quota type %s not defined. Available quota types: %s", quotaType, + _totalCapacity.get(resourceType).keySet())); + } + + int capacity = _totalCapacity.get(resourceType).get(quotaType); + int usage = _usedCapacity.get(resourceType).get(quotaType); + + // Fail with insufficient quota + if (capacity <= usage) { + return new TaskAssignResult(task, this, false, 0, + TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA, String + .format("Insufficient quota %s::%s. Capacity: %s, Current Usage: %s", resourceType, + quotaType, capacity, usage)); + } + + // More remaining capacity leads to higher fitness score + int fitness = Math.round((float)(capacity - usage) / capacity * fitnessScoreFactor); + + return new TaskAssignResult(task, this, true, fitness, + null, ""); } /** @@ -64,29 +297,71 @@ public class AssignableInstance { * 1. Deduct the amount of resource required by this task * 2. Add this TaskAssignResult to _currentAssignments * @param result - * @throws IllegalStateException if TaskAssignResult is not successful + * @throws IllegalStateException if TaskAssignResult is not successful or the task is double + * assigned, or the task is not assigned to this instance */ public void assign(TaskAssignResult result) throws IllegalStateException { - // TODO: implement - return; + if (!result.isSuccessful()) { + throw new IllegalStateException("Cannot assign a failed result: " + result); + } + + if (!result.getInstanceName().equals(getInstanceName())) { + throw new IllegalStateException(String.format( + "Cannot assign a result for a different instance. This instance: %s; Result: %s", + getInstanceName(), result)); + } + + if (_currentAssignments.contains(result.getTaskConfig().getId())) { + throw new IllegalStateException( + "Cannot double assign task " + result.getTaskConfig().getId()); + } + + _currentAssignments.add(result.getTaskConfig().getId()); + + // update resource usage + String resourceType = LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name(); + String quotaType = result.getTaskConfig().getQuotaType(); + + // Assume used capacity is updated, and if resource type / quota type is not supported + // we have already failed the assignment + int curUsage = _usedCapacity.get(resourceType).get(quotaType); + _usedCapacity.get(resourceType).put(quotaType, curUsage + 1); + + logger.info("Assigned task {} to instance {}", result.getTaskConfig().getId(), + _instanceConfig.getInstanceName()); } /** * Performs the following to release resource for a task: * 1. Release the resource by adding back what the task required. * 2. Remove the TaskAssignResult from _currentAssignments - * @param taskID - * @throws IllegalArgumentException if task is not found + * @param taskConfig config of this task */ - public void release(String taskID) throws IllegalArgumentException { - // TODO: implement - return; + public void release(TaskConfig taskConfig) { + if (!_currentAssignments.contains(taskConfig.getId())) { + logger.warn("Task {} is not assigned on instance {}", taskConfig.getId(), + _instanceConfig.getInstanceName()); + return; + } + String quotaType = taskConfig.getQuotaType(); + String resourceType = LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name(); + + // We might be releasing a task whose resource requirement / quota type is out-dated, + // thus we need to check to avoid NPE + if (_usedCapacity.containsKey(resourceType) && _usedCapacity.get(resourceType) + .containsKey(quotaType)) { + int curUsage = _usedCapacity.get(resourceType).get(quotaType); + _usedCapacity.get(resourceType).put(quotaType, curUsage - 1); + } + _currentAssignments.remove(taskConfig.getId()); + logger.info("Released task {} from instance {}", taskConfig.getId(), + _instanceConfig.getInstanceName()); } /** - * Returns taskID -> TaskAssignResult mappings. + * Returns a set of taskIDs */ - public Map<String, TaskAssignResult> getCurrentAssignments() { + public Set<String> getCurrentAssignments() { return _currentAssignments; } @@ -96,4 +371,20 @@ public class AssignableInstance { public String getInstanceName() { return _instanceConfig.getInstanceName(); } + + /** + * Returns total capacity of the AssignableInstance + * @return map{resourceType : map{quotaType : quota}} + */ + public Map<String, Map<String, Integer>> getTotalCapacity() { + return _totalCapacity; + } + + /** + * Returns used capacity of the AssignableInstance + * @return map{resourceType : map{quotaType : usedQuota}} + */ + public Map<String, Map<String, Integer>> getUsedCapacity() { + return _usedCapacity; + } } http://git-wip-us.apache.org/repos/asf/helix/blob/2049f93a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java index 6d7b4e8..00d7db1 100644 --- a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java +++ b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java @@ -34,7 +34,10 @@ public class TaskAssignResult implements Comparable<TaskAssignResult> { NO_SUCH_RESOURCE_TYPE, // Required quota type is not configured - NO_SUCH_QUOTA_TYPE + NO_SUCH_QUOTA_TYPE, + + // Task cannot be assigned twice on a node without releasing it first + TASK_ALREADY_ASSIGNED } private final boolean _isAssignmentSuccessful; http://git-wip-us.apache.org/repos/asf/helix/blob/2049f93a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java index 66614a2..79fbd64 100644 --- a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java +++ b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java @@ -28,8 +28,8 @@ public interface TaskAssigner { * Assign a collection of tasks on a collection of assignableInstances. * When an assignment decision is made, AssignableInstance.assign() must be called for the * instance to modify its internal capacity profile. - * @param assignableInstances - * @param tasks + * @param assignableInstances String -> AssignableInstanceMapping + * @param tasks String -> TaskConfig * @return taskID -> TaskAssignmentResult mapping per task */ Map<String, TaskAssignResult> assignTasks(Iterable<AssignableInstance> assignableInstances, http://git-wip-us.apache.org/repos/asf/helix/blob/2049f93a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java new file mode 100644 index 0000000..02a0d39 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java @@ -0,0 +1,334 @@ +package org.apache.helix.task.assigner; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.Map; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.task.TaskConfig; +import org.apache.helix.task.TaskStateModelFactory; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestAssignableInstance { + private static final String testClusterName = "testCluster"; + private static final String testInstanceName = "testInstance"; + + private static final String[] testResourceTypes = new String[] {"Resource1", "Resource2", "Resource3"}; + private static final String[] testResourceCapacity = new String[] {"20", "50", "100"}; + + private static final String[] testQuotaTypes = new String[] {"Type1", "Type2", "Type3"}; + private static final String[] testQuotaRatio = new String[] {"50", "30", "20"}; + private static final String defaultQuotaRatio = "100"; + + + @Test + public void testInvalidInitialization() { + try { + AssignableInstance ai = new AssignableInstance(null, null, null); + Assert.fail("Expecting IllegalArgumentException"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("cannot be null")); + } + + try { + ClusterConfig clusterConfig = new ClusterConfig("testCluster"); + InstanceConfig instanceConfig = new InstanceConfig("instance"); + LiveInstance liveInstance = new LiveInstance("another-instance"); + AssignableInstance ai = new AssignableInstance(clusterConfig, instanceConfig, liveInstance); + Assert.fail("Expecting IllegalArgumentException"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("don't match")); + } + } + + @Test + public void testInitializationWithQuotaUnset() { + // Initialize AssignableInstance with neither resource capacity nor quota ratio provided + AssignableInstance ai = new AssignableInstance( + createClusterConfig(null, null, false), + new InstanceConfig(testInstanceName), + createLiveInstance(null, null) + ); + Assert.assertEquals(ai.getUsedCapacity().size(), 1); + Assert.assertEquals( + (int) ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()) + .get(TaskConfig.DEFAULT_QUOTA_TYPE), 0); + Assert.assertEquals( + (int) ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()) + .get(TaskConfig.DEFAULT_QUOTA_TYPE), TaskStateModelFactory.TASK_THREADPOOL_SIZE); + Assert.assertEquals(ai.getCurrentAssignments().size(), 0); + } + + @Test + public void testInitializationWithOnlyCapacity() { + // Initialize AssignableInstance with only resource capacity provided + AssignableInstance ai = new AssignableInstance( + createClusterConfig(null, null, false), + new InstanceConfig(testInstanceName), + createLiveInstance(testResourceTypes, testResourceCapacity) + ); + Assert.assertEquals(ai.getTotalCapacity().size(), testResourceTypes.length); + Assert.assertEquals(ai.getUsedCapacity().size(), testResourceTypes.length); + for (int i = 0; i < testResourceTypes.length; i++) { + Assert.assertEquals(ai.getTotalCapacity().get(testResourceTypes[i]).size(), 1); + Assert.assertEquals(ai.getUsedCapacity().get(testResourceTypes[i]).size(), 1); + Assert.assertEquals( + ai.getTotalCapacity().get(testResourceTypes[i]).get(TaskConfig.DEFAULT_QUOTA_TYPE), + Integer.valueOf(testResourceCapacity[i]) + ); + Assert.assertEquals( + ai.getUsedCapacity().get(testResourceTypes[i]).get(TaskConfig.DEFAULT_QUOTA_TYPE), + Integer.valueOf(0) + ); + } + } + + @Test + public void testInitializationWithOnlyQuotaType() { + // Initialize AssignableInstance with only quota type provided + AssignableInstance ai = new AssignableInstance( + createClusterConfig(testQuotaTypes, testQuotaRatio, false), + new InstanceConfig(testInstanceName), + createLiveInstance(null, null) + ); + + Assert.assertEquals(ai.getTotalCapacity().size(), 1); + Assert.assertEquals(ai.getUsedCapacity().size(), 1); + Assert.assertEquals( + ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()).size(), + testQuotaTypes.length + ); + Assert.assertEquals( + ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()).size(), + testQuotaTypes.length + ); + Assert.assertEquals( + ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()), + calculateExpectedQuotaPerType(TaskStateModelFactory.TASK_THREADPOOL_SIZE, testQuotaTypes, + testQuotaRatio)); + Assert.assertEquals(ai.getCurrentAssignments().size(), 0); + } + + @Test + public void testInitializationWithQuotaAndCapacity() { + // Initialize AssignableInstance with both capacity and quota type provided + AssignableInstance ai = new AssignableInstance( + createClusterConfig(testQuotaTypes, testQuotaRatio, false), + new InstanceConfig(testInstanceName), + createLiveInstance(testResourceTypes, testResourceCapacity) + ); + + Map<String, Integer> usedResourcePerType = + createResourceQuotaPerTypeMap(testQuotaTypes, new int[] { 0, 0, 0 }); + for (int i = 0; i < testResourceTypes.length; i++) { + Assert.assertEquals(ai.getTotalCapacity().get(testResourceTypes[i]), + calculateExpectedQuotaPerType(Integer.valueOf(testResourceCapacity[i]), testQuotaTypes, + testQuotaRatio)); + Assert.assertEquals(ai.getUsedCapacity().get(testResourceTypes[i]), usedResourcePerType); + } + } + + @Test + public void testAssignableInstanceUpdateConfigs() { + AssignableInstance ai = new AssignableInstance( + createClusterConfig(testQuotaTypes, testQuotaRatio, false), + new InstanceConfig(testInstanceName), + createLiveInstance(testResourceTypes, testResourceCapacity) + ); + + String[] newResources = new String[] {"Resource2", "Resource3", "Resource4"}; + String[] newResourceCapacities = new String[] {"100", "150", "50"}; + + String[] newTypes = new String[] {"Type3", "Type4", "Type5", "Type6"}; + String[] newTypeRatio = new String[] {"20", "40", "25", "25"}; + + LiveInstance newLiveInstance = createLiveInstance(newResources, newResourceCapacities); + ClusterConfig newClusterConfig = createClusterConfig(newTypes, newTypeRatio, false); + ai.updateConfigs(newClusterConfig, null, newLiveInstance); + + Assert.assertEquals(ai.getUsedCapacity().size(), newResourceCapacities.length); + Assert.assertEquals(ai.getTotalCapacity().size(), newResourceCapacities.length); + + for (int i = 0; i < newResources.length; i++) { + Assert.assertEquals(ai.getTotalCapacity().get(newResources[i]), + calculateExpectedQuotaPerType(Integer.valueOf(newResourceCapacities[i]), newTypes, + newTypeRatio)); + Assert.assertEquals(ai.getUsedCapacity().get(newResources[i]), + createResourceQuotaPerTypeMap(newTypes, new int[] { 0, 0, 0, 0 })); + } + } + + @Test + public void testNormalTryAssign() { + AssignableInstance ai = new AssignableInstance( + createClusterConfig(null, null, true), + new InstanceConfig(testInstanceName), + createLiveInstance(null, null) + ); + + // When nothing is configured, we should use default quota to assign + Map<String, TaskAssignResult> results = new HashMap<>(); + for (int i = 0; i < TaskStateModelFactory.TASK_THREADPOOL_SIZE; i++) { + String taskId = Integer.toString(i); + TaskConfig task = new TaskConfig("", null, taskId, null); + TaskAssignResult result = ai.tryAssign(task); + Assert.assertTrue(result.isSuccessful()); + ai.assign(result); + results.put(taskId, result); + } + + // We are out of quota now and we should not be able to assign + String taskId = "TaskCannotAssign"; + TaskConfig task = new TaskConfig("", null, taskId, null); + TaskAssignResult result = ai.tryAssign(task); + Assert.assertFalse(result.isSuccessful()); + Assert.assertEquals(result.getFailureReason(), TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA); + try { + ai.assign(result); + Assert.fail("Expecting IllegalStateException"); + } catch (IllegalStateException e) { + // OK + } + + // After releasing 1 task, we should be able to schedule + ai.release(results.get("1").getTaskConfig()); + result = ai.tryAssign(task); + Assert.assertTrue(result.isSuccessful()); + + // release all tasks, check remaining resources + for (TaskAssignResult rst : results.values()) { + ai.release(rst.getTaskConfig()); + } + + Assert.assertEquals( + (int) ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()) + .get(TaskConfig.DEFAULT_QUOTA_TYPE), 0); + } + + @Test + public void testTryAssignFailure() { + AssignableInstance ai = + new AssignableInstance(createClusterConfig(testQuotaTypes, testQuotaRatio, false), + new InstanceConfig(testInstanceName), + createLiveInstance(testResourceTypes, testResourceCapacity)); + + // No such resource type + String taskId = "testTask"; + TaskConfig task = new TaskConfig("", null, taskId, ""); + TaskAssignResult result = ai.tryAssign(task); + Assert.assertFalse(result.isSuccessful()); + Assert.assertEquals(result.getFailureReason(), + TaskAssignResult.FailureReason.NO_SUCH_RESOURCE_TYPE); + + // No such quota type + ai.updateConfigs(null, null, createLiveInstance( + new String[] { LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name() }, + new String[] { "1" })); + + result = ai.tryAssign(task); + Assert.assertFalse(result.isSuccessful()); + Assert + .assertEquals(result.getFailureReason(), TaskAssignResult.FailureReason.NO_SUCH_QUOTA_TYPE); + + ai.updateConfigs(createClusterConfig(testQuotaTypes, testQuotaRatio, true), null, null); + + task.setQuotaType(TaskConfig.DEFAULT_QUOTA_TYPE); + result = ai.tryAssign(task); + Assert.assertTrue(result.isSuccessful()); + ai.assign(result); + try { + ai.assign(result); + Assert.fail("Expecting IllegalArgumentException"); + } catch (IllegalStateException e) { + // OK + } + + // Duplicate assignment + result = ai.tryAssign(task); + Assert.assertFalse(result.isSuccessful()); + Assert.assertEquals(result.getFailureReason(), TaskAssignResult.FailureReason.TASK_ALREADY_ASSIGNED); + + // Insufficient quota + ai.release(task); + ai.updateConfigs(null, null, createLiveInstance( + new String[] { LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name() }, + new String[] { "0" })); + + task.setQuotaType(TaskConfig.DEFAULT_QUOTA_TYPE); + result = ai.tryAssign(task); + Assert.assertFalse(result.isSuccessful()); + Assert + .assertEquals(result.getFailureReason(), TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA); + } + + private Map<String, Integer> createResourceQuotaPerTypeMap(String[] types, int[] quotas) { + Map<String, Integer> ret = new HashMap<>(); + for (int i = 0; i < types.length; i++) { + ret.put(types[i], quotas[i]); + } + return ret; + } + + private Map<String, Integer> calculateExpectedQuotaPerType(int capacity, String[] quotaTypes, + String[] quotaRatios) { + Integer totalQuota = 0; + Map<String, Integer> expectedQuotaPerType = new HashMap<>(); + + for (String ratio : quotaRatios) { + totalQuota += Integer.valueOf(ratio); + } + + for (int i = 0; i < quotaRatios.length; i++) { + expectedQuotaPerType.put(quotaTypes[i], + Math.round((float)capacity * Integer.valueOf(quotaRatios[i]) + / totalQuota)); + } + return expectedQuotaPerType; + } + + private LiveInstance createLiveInstance(String[] resourceTypes, String[] resourceCapacity) { + LiveInstance li = new LiveInstance(testInstanceName); + if (resourceCapacity != null && resourceTypes != null) { + Map<String, String> resMap = new HashMap<>(); + for (int i = 0; i < resourceCapacity.length; i++) { + resMap.put(resourceTypes[i], resourceCapacity[i]); + } + li.setResourceCapacityMap(resMap); + } + return li; + } + + private ClusterConfig createClusterConfig(String[] quotaTypes, String[] quotaRatio, + boolean addDefaultQuota) { + ClusterConfig clusterConfig = new ClusterConfig(testClusterName); + if (quotaTypes != null && quotaRatio != null) { + for (int i = 0; i < quotaTypes.length; i++) { + clusterConfig.setTaskQuotaRatio(quotaTypes[i], quotaRatio[i]); + } + } + if (addDefaultQuota) { + clusterConfig.setTaskQuotaRatio(TaskConfig.DEFAULT_QUOTA_TYPE, defaultQuotaRatio); + } + return clusterConfig; + } +}
