Repository: helix Updated Branches: refs/heads/master a9cdecc64 -> e44b29e03
[HELIX-718] provide a method in AssignableInstance to set current assignments Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e44b29e0 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e44b29e0 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e44b29e0 Branch: refs/heads/master Commit: e44b29e03ef4c807e940cde717ed2f6fff58a273 Parents: a9cdecc Author: Harry Zhang <[email protected]> Authored: Mon Jul 9 15:59:27 2018 -0700 Committer: Harry Zhang <[email protected]> Committed: Mon Jul 9 15:59:27 2018 -0700 ---------------------------------------------------------------------- .../helix/task/assigner/AssignableInstance.java | 45 ++++++++++++++++++-- .../task/assigner/TestAssignableInstance.java | 25 +++++++++++ 2 files changed, 66 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/e44b29e0/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java index 5d252eb..8883987 100644 --- a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java +++ b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java @@ -186,6 +186,12 @@ public class AssignableInstance { } } + /** + * Update this AssignableInstance with new configs + * @param clusterConfig cluster config + * @param instanceConfig instance config + * @param liveInstance live instance object + */ public void updateConfigs(ClusterConfig clusterConfig, InstanceConfig instanceConfig, LiveInstance liveInstance) { logger.info("Updating configs for AssignableInstance {}", _instanceConfig.getInstanceName()); @@ -319,13 +325,22 @@ public class AssignableInstance { _currentAssignments.add(result.getTaskConfig().getId()); // update resource usage + // TODO (harry): get requested resource type from task config String resourceType = LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name(); String quotaType = result.getTaskConfig().getQuotaType(); - // Assume used capacity is updated, and if resource type / quota type is not supported - // we have already failed the assignment - int curUsage = _usedCapacity.get(resourceType).get(quotaType); - _usedCapacity.get(resourceType).put(quotaType, curUsage + 1); + // Resource type / quota type might have already changed, i.e. we are recovering + // current assignments for a live instance, but currently running tasks's quota + // type has already been removed by user. So we do the deduction with best effort + if (_usedCapacity.containsKey(resourceType) && _usedCapacity.get(resourceType) + .containsKey(quotaType)) { + int curUsage = _usedCapacity.get(resourceType).get(quotaType); + _usedCapacity.get(resourceType).put(quotaType, curUsage + 1); + } else { + logger.warn( + "Task's requested resource type and quota type is no longer supported. TaskConfig: %s; UsedCapacity: %s", + result.getTaskConfig(), _usedCapacity); + } logger.info("Assigned task {} to instance {}", result.getTaskConfig().getId(), _instanceConfig.getInstanceName()); @@ -359,6 +374,28 @@ public class AssignableInstance { } /** + * This method is used for forcing AssignableInstance to match current assignment state. It + * returns with TaskAssignResult for proper release current assignments when they are finished. + * @param tasks taskId -> taskConfig mapping + * @return taskId -> TaskAssignResult mapping + */ + public Map<String, TaskAssignResult> setCurrentAssignments(Map<String, TaskConfig> tasks) { + Map<String, TaskAssignResult> assignment = new HashMap<>(); + for (Map.Entry<String, TaskConfig> entry : tasks.entrySet()) { + TaskAssignResult assignResult = + new TaskAssignResult(entry.getValue(), this, true, fitnessScoreFactor, null, + "Recovered TaskAssignResult from current state"); + try { + assign(assignResult); + assignment.put(entry.getKey(), assignResult); + } catch (IllegalStateException e) { + logger.error("Failed to set current assignment for task {}.", entry.getValue().getId(), e); + } + } + return assignment; + } + + /** * Returns a set of taskIDs */ public Set<String> getCurrentAssignments() { http://git-wip-us.apache.org/repos/asf/helix/blob/e44b29e0/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java index 02a0d39..9b5974a 100644 --- a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java +++ b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java @@ -281,6 +281,31 @@ public class TestAssignableInstance { .assertEquals(result.getFailureReason(), TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA); } + @Test + public void testSetCurrentAssignment() { + AssignableInstance ai = + new AssignableInstance(createClusterConfig(testQuotaTypes, testQuotaRatio, true), + new InstanceConfig(testInstanceName), createLiveInstance( + new String[] { LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name() }, + new String[] { "40" })); + + Map<String, TaskConfig> currentAssignments = new HashMap<>(); + currentAssignments.put("supportedTask", new TaskConfig("", null, "supportedTask", "")); + TaskConfig unsupportedTask = new TaskConfig("", null, "unsupportedTask", ""); + unsupportedTask.setQuotaType("UnsupportedQuotaType"); + currentAssignments.put("unsupportedTask", unsupportedTask); + + Map<String, TaskAssignResult> results = ai.setCurrentAssignments(currentAssignments); + for (TaskAssignResult rst : results.values()) { + Assert.assertTrue(rst.isSuccessful()); + Assert.assertEquals(rst.getAssignableInstance(), ai); + } + Assert.assertEquals(ai.getCurrentAssignments().size(), 2); + Assert.assertEquals( + (int) ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()) + .get(TaskConfig.DEFAULT_QUOTA_TYPE), 1); + } + private Map<String, Integer> createResourceQuotaPerTypeMap(String[] types, int[] quotas) { Map<String, Integer> ret = new HashMap<>(); for (int i = 0; i < types.length; i++) {
