Repository: helix
Updated Branches:
  refs/heads/master a9cdecc64 -> e44b29e03


[HELIX-718] provide a method in AssignableInstance to set current assignments


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e44b29e0
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e44b29e0
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e44b29e0

Branch: refs/heads/master
Commit: e44b29e03ef4c807e940cde717ed2f6fff58a273
Parents: a9cdecc
Author: Harry Zhang <[email protected]>
Authored: Mon Jul 9 15:59:27 2018 -0700
Committer: Harry Zhang <[email protected]>
Committed: Mon Jul 9 15:59:27 2018 -0700

----------------------------------------------------------------------
 .../helix/task/assigner/AssignableInstance.java | 45 ++++++++++++++++++--
 .../task/assigner/TestAssignableInstance.java   | 25 +++++++++++
 2 files changed, 66 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/e44b29e0/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
 
b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
index 5d252eb..8883987 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
@@ -186,6 +186,12 @@ public class AssignableInstance {
     }
   }
 
+  /**
+   * Update this AssignableInstance with new configs
+   * @param clusterConfig cluster config
+   * @param instanceConfig instance config
+   * @param liveInstance live instance object
+   */
   public void updateConfigs(ClusterConfig clusterConfig, InstanceConfig 
instanceConfig,
       LiveInstance liveInstance) {
     logger.info("Updating configs for AssignableInstance {}", 
_instanceConfig.getInstanceName());
@@ -319,13 +325,22 @@ public class AssignableInstance {
     _currentAssignments.add(result.getTaskConfig().getId());
 
     // update resource usage
+    // TODO (harry): get requested resource type from task config
     String resourceType = 
LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name();
     String quotaType = result.getTaskConfig().getQuotaType();
 
-    // Assume used capacity is updated, and if resource type / quota type is 
not supported
-    // we have already failed the assignment
-    int curUsage = _usedCapacity.get(resourceType).get(quotaType);
-    _usedCapacity.get(resourceType).put(quotaType, curUsage + 1);
+    // Resource type / quota type might have already changed, i.e. we are 
recovering
+    // current assignments for a live instance, but currently running tasks's 
quota
+    // type has already been removed by user. So we do the deduction with best 
effort
+    if (_usedCapacity.containsKey(resourceType) && 
_usedCapacity.get(resourceType)
+        .containsKey(quotaType)) {
+      int curUsage = _usedCapacity.get(resourceType).get(quotaType);
+      _usedCapacity.get(resourceType).put(quotaType, curUsage + 1);
+    } else {
+      logger.warn(
+          "Task's requested resource type and quota type is no longer 
supported. TaskConfig: %s; UsedCapacity: %s",
+          result.getTaskConfig(), _usedCapacity);
+    }
 
     logger.info("Assigned task {} to instance {}", 
result.getTaskConfig().getId(),
         _instanceConfig.getInstanceName());
@@ -359,6 +374,28 @@ public class AssignableInstance {
   }
 
   /**
+   * This method is used for forcing AssignableInstance to match current 
assignment state. It
+   * returns with TaskAssignResult for proper release current assignments when 
they are finished.
+   * @param tasks taskId -> taskConfig mapping
+   * @return taskId -> TaskAssignResult mapping
+   */
+  public Map<String, TaskAssignResult> setCurrentAssignments(Map<String, 
TaskConfig> tasks) {
+    Map<String, TaskAssignResult> assignment = new HashMap<>();
+    for (Map.Entry<String, TaskConfig> entry : tasks.entrySet()) {
+      TaskAssignResult assignResult =
+          new TaskAssignResult(entry.getValue(), this, true, 
fitnessScoreFactor, null,
+              "Recovered TaskAssignResult from current state");
+      try {
+        assign(assignResult);
+        assignment.put(entry.getKey(), assignResult);
+      } catch (IllegalStateException e) {
+        logger.error("Failed to set current assignment for task {}.", 
entry.getValue().getId(), e);
+      }
+    }
+    return assignment;
+  }
+
+  /**
    * Returns a set of taskIDs
    */
   public Set<String> getCurrentAssignments() {

http://git-wip-us.apache.org/repos/asf/helix/blob/e44b29e0/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
 
b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
index 02a0d39..9b5974a 100644
--- 
a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
@@ -281,6 +281,31 @@ public class TestAssignableInstance {
         .assertEquals(result.getFailureReason(), 
TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA);
   }
 
+  @Test
+  public void testSetCurrentAssignment() {
+    AssignableInstance ai =
+        new AssignableInstance(createClusterConfig(testQuotaTypes, 
testQuotaRatio, true),
+            new InstanceConfig(testInstanceName), createLiveInstance(
+            new String[] { 
LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name() },
+            new String[] { "40" }));
+
+    Map<String, TaskConfig> currentAssignments = new HashMap<>();
+    currentAssignments.put("supportedTask", new TaskConfig("", null, 
"supportedTask", ""));
+    TaskConfig unsupportedTask = new TaskConfig("", null, "unsupportedTask", 
"");
+    unsupportedTask.setQuotaType("UnsupportedQuotaType");
+    currentAssignments.put("unsupportedTask", unsupportedTask);
+
+    Map<String, TaskAssignResult> results = 
ai.setCurrentAssignments(currentAssignments);
+    for (TaskAssignResult rst : results.values()) {
+      Assert.assertTrue(rst.isSuccessful());
+      Assert.assertEquals(rst.getAssignableInstance(), ai);
+    }
+    Assert.assertEquals(ai.getCurrentAssignments().size(), 2);
+    Assert.assertEquals(
+        (int) 
ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
+            .get(TaskConfig.DEFAULT_QUOTA_TYPE), 1);
+  }
+
   private Map<String, Integer> createResourceQuotaPerTypeMap(String[] types, 
int[] quotas) {
     Map<String, Integer> ret = new HashMap<>();
     for (int i = 0; i < types.length; i++) {

Reply via email to