Repository: helix
Updated Branches:
  refs/heads/master 4511dbc6c -> 034424cc4


[HELIX-720] [TASK] Implement AssignableInstanceManager

AssignableInstanceManager supports job quotas in Task Framework by 1. 
Re-creates AssignableInstance map with correct resource usage based on 
TaskContexts 2. Provides an update API that refreshes instances and configs.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/034424cc
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/034424cc
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/034424cc

Branch: refs/heads/master
Commit: 034424cc4852bda55e74ddbbf42db4c7f293262c
Parents: 4511dbc
Author: Hunter Lee <[email protected]>
Authored: Mon Jul 9 17:07:41 2018 -0700
Committer: Hunter Lee <[email protected]>
Committed: Mon Jul 9 18:23:43 2018 -0700

----------------------------------------------------------------------
 .../helix/task/AssignableInstanceManager.java   | 217 ++++++++++++++++
 .../helix/task/assigner/AssignableInstance.java |  37 +--
 .../task/TestAssignableInstanceManager.java     | 248 +++++++++++++++++++
 ...signableInstanceManagerControllerSwitch.java | 149 +++++++++++
 .../task/assigner/TestAssignableInstance.java   |  15 +-
 5 files changed, 646 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/034424cc/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java 
b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
new file mode 100644
index 0000000..4ede2b8
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
@@ -0,0 +1,217 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.task.assigner.AssignableInstance;
+import org.apache.helix.task.assigner.TaskAssignResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AssignableInstanceManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AssignableInstanceManager.class);
+  // Instance name -> AssignableInstance
+  private Map<String, AssignableInstance> _assignableInstanceMap;
+  // TaskID -> TaskAssignResult TODO: Hunter: Move this if not needed
+  private Map<String, TaskAssignResult> _taskAssignResultMap;
+
+  /**
+   * Constructor for AssignableInstanceManager. Builds AssignableInstances 
based on
+   * WorkflowContexts, JobContexts, and LiveInstances. Note that the lists of 
LiveInstances and
+   * InstanceConfigs must match, meaning a LiveInstance and an InstanceConfig 
at the same index
+   * represent the same instance.
+   * @param clusterConfig
+   * @param taskDataCache
+   * @param liveInstances
+   * @param instanceConfigs
+   */
+  public AssignableInstanceManager(ClusterConfig clusterConfig, TaskDataCache 
taskDataCache,
+      Map<String, LiveInstance> liveInstances, Map<String, InstanceConfig> 
instanceConfigs) {
+    // Build the cache from scratch
+    _assignableInstanceMap = new HashMap<>();
+    _taskAssignResultMap = new HashMap<>();
+
+    // Create all AssignableInstance objects based on what's in liveInstances
+    for (Map.Entry<String, LiveInstance> liveInstanceEntry : 
liveInstances.entrySet()) {
+      // Prepare instance-specific metadata
+      String instanceName = liveInstanceEntry.getKey();
+      LiveInstance liveInstance = liveInstanceEntry.getValue();
+      if (!instanceConfigs.containsKey(instanceName)) {
+        continue; // Ill-formatted input; skip over this instance
+      }
+      InstanceConfig instanceConfig = instanceConfigs.get(instanceName);
+
+      // Create an AssignableInstance
+      AssignableInstance assignableInstance =
+          new AssignableInstance(clusterConfig, instanceConfig, liveInstance);
+      _assignableInstanceMap.put(instanceConfig.getInstanceName(), 
assignableInstance);
+      LOG.info("AssignableInstance created for instance: {}", instanceName);
+    }
+
+    // Update task profiles by traversing all TaskContexts
+    Map<String, JobConfig> jobConfigMap = taskDataCache.getJobConfigMap();
+    for (String jobName : jobConfigMap.keySet()) {
+      JobConfig jobConfig = jobConfigMap.get(jobName);
+      JobContext jobContext = taskDataCache.getJobContext(jobName);
+      if (jobConfig == null || jobContext == null) {
+        LOG.warn(
+            "JobConfig or JobContext for this job is null. Skipping this job! 
Job name: {}, JobConfig: {}, JobContext: {}",
+            jobName, jobConfig, jobContext);
+        continue; // Ignore this job if either the config or context is null
+      }
+      Set<Integer> taskIndices = jobContext.getPartitionSet(); // Each integer 
represents a task in
+      // this job (this is NOT taskId)
+      for (int taskIndex : taskIndices) {
+        TaskPartitionState taskState = jobContext.getPartitionState(taskIndex);
+        if (taskState == TaskPartitionState.INIT || taskState == 
TaskPartitionState.RUNNING) {
+          // Because task state is INIT or RUNNING, find the right 
AssignableInstance and subtract
+          // the right amount of resources. STOPPED means it's been cancelled, 
so it will be
+          // re-assigned and therefore does not use instances' resources
+
+          String assignedInstance = 
jobContext.getAssignedParticipant(taskIndex);
+          String taskId = jobContext.getTaskIdForPartition(taskIndex);
+          if (assignedInstance == null) {
+            LOG.warn(
+                "This task's TaskContext does not have an assigned instance! 
Task will be ignored. "
+                    + "Job: {}, TaskId: {}, TaskIndex: {}",
+                jobContext.getName(), taskId, taskIndex);
+            continue;
+          }
+          if (_assignableInstanceMap.containsKey(assignedInstance)) {
+            TaskConfig taskConfig = jobConfig.getTaskConfig(taskId);
+            AssignableInstance assignableInstance = 
_assignableInstanceMap.get(assignedInstance);
+            TaskAssignResult taskAssignResult =
+                assignableInstance.restoreTaskAssignResult(taskId, taskConfig);
+            if (taskAssignResult.isSuccessful()) {
+              _taskAssignResultMap.put(taskId, taskAssignResult);
+              LOG.info("TaskAssignResult restored for taskId: {}, assigned on 
instance: {}", taskId,
+                  assignedInstance);
+            }
+          } else {
+            LOG.warn(
+                "While building AssignableInstance map, discovered that the 
instance a task is assigned to is no "
+                    + "longer a LiveInstance! TaskAssignResult will not be 
created and no resource will be taken "
+                    + "up for this task. Job: {}, TaskId: {}, TaskIndex: {}, 
Instance: {}",
+                jobContext.getName(), taskId, taskIndex, assignedInstance);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Updates AssignableInstances when there are any config changes. This 
update will be based on the
+   * list of LiveInstances provided.
+   * @param clusterConfig
+   * @param liveInstances
+   * @param instanceConfigs
+   */
+  public void updateAssignableInstances(ClusterConfig clusterConfig,
+      Map<String, LiveInstance> liveInstances, Map<String, InstanceConfig> 
instanceConfigs) {
+    // Keep a collection to determine what's no longer a LiveInstance, in 
which case the
+    // corresponding AssignableInstance must be removed
+    Collection<AssignableInstance> staleAssignableInstances =
+        new HashSet<>(_assignableInstanceMap.values());
+
+    for (Map.Entry<String, LiveInstance> liveInstanceEntry : 
liveInstances.entrySet()) {
+      // Prepare instance-specific metadata
+      String instanceName = liveInstanceEntry.getKey();
+      LiveInstance liveInstance = liveInstanceEntry.getValue();
+      if (!instanceConfigs.containsKey(instanceName)) {
+        continue; // Ill-formatted input; skip over this instance
+      }
+      InstanceConfig instanceConfig = instanceConfigs.get(instanceName);
+
+      // Update configs for currently existing instance
+      if (_assignableInstanceMap.containsKey(instanceName)) {
+        _assignableInstanceMap.get(instanceName).updateConfigs(clusterConfig, 
instanceConfig,
+            liveInstance);
+      } else {
+        // create a new AssignableInstance for a newly added LiveInstance; 
this is a new
+        // LiveInstance so TaskAssignResults are not re-created and no tasks 
are assigned
+        AssignableInstance assignableInstance =
+            new AssignableInstance(clusterConfig, instanceConfig, 
liveInstance);
+        _assignableInstanceMap.put(instanceName, assignableInstance);
+        LOG.info("AssignableInstance created for instance: {} during 
updateAssignableInstances",
+            instanceName);
+      }
+      // Remove because we've confirmed that this AssignableInstance is a 
LiveInstance as well
+      
staleAssignableInstances.remove(_assignableInstanceMap.get(instanceName));
+    }
+
+    // AssignableInstances that are not live need to be removed from the map 
because they are not
+    // live
+    for (AssignableInstance instanceToBeRemoved : staleAssignableInstances) {
+      // Remove all tasks on this instance first
+      for (String taskToRemove : instanceToBeRemoved.getCurrentAssignments()) {
+        // Check that AssignableInstances match
+        if 
(_taskAssignResultMap.get(taskToRemove).getAssignableInstance().getInstanceName()
+            .equals(instanceToBeRemoved.getInstanceName())) {
+          _taskAssignResultMap.remove(taskToRemove); // TODO: Hunter: Move 
this if necessary
+          LOG.info(
+              "TaskAssignResult removed because its assigned instance is no 
longer live. TaskID: {}, instance: {}",
+              taskToRemove, instanceToBeRemoved.getInstanceName());
+        }
+      }
+      _assignableInstanceMap.remove(instanceToBeRemoved.getInstanceName());
+      LOG.info(
+          "Non-live AssignableInstance removed for instance: {} during 
updateAssignableInstances",
+          instanceToBeRemoved.getInstanceName());
+    }
+  }
+
+  /**
+   * Returns all instanceName -> AssignableInstance mappings.
+   * @return assignableInstanceMap
+   */
+  public Map<String, AssignableInstance> getAssignableInstanceMap() {
+    return Collections.unmodifiableMap(_assignableInstanceMap);
+  }
+
+  /**
+   * Returns all AssignableInstances that support a given quota type.
+   * @param quotaType
+   * @return unmodifiable set of AssignableInstances
+   */
+  public Set<AssignableInstance> getAssignableInstancesForQuotaType(String 
quotaType) {
+    // TODO: Currently, quota types are global settings across all 
AssignableInstances. When this
+    // TODO: becomes customizable, we need to actually implement this so that 
it doesn't return all
+    // TODO: AssignableInstances
+    return Collections.unmodifiableSet(new 
HashSet<>(_assignableInstanceMap.values()));
+  }
+
+  /**
+   * Returns taskId -> TaskAssignResult mappings.
+   * @return taskAssignResultMap
+   */
+  public Map<String, TaskAssignResult> getTaskAssignResultMap() {
+    return _taskAssignResultMap;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/034424cc/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
 
b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
index 8883987..b56b3b9 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
@@ -374,25 +374,26 @@ public class AssignableInstance {
   }
 
   /**
-   * This method is used for forcing AssignableInstance to match current 
assignment state. It
-   * returns with TaskAssignResult for proper release current assignments when 
they are finished.
-   * @param tasks taskId -> taskConfig mapping
-   * @return taskId -> TaskAssignResult mapping
+   * This method restores a TaskAssignResult for a given task in this 
AssignableInstance when this
+   * AssignableInstance is created from scratch due to events like a 
controller switch. It
+   * returns a TaskAssignResult to be used for proper release of resources 
when the task is in a
+   * terminal state.
+   * @param taskId of the task
+   * @param taskConfig of the task
+   * @return TaskAssignResult with isSuccessful = true if successful. If 
assigning it to an instance
+   *         fails, TaskAssignResult's getSuccessful() will return false
    */
-  public Map<String, TaskAssignResult> setCurrentAssignments(Map<String, 
TaskConfig> tasks) {
-    Map<String, TaskAssignResult> assignment = new HashMap<>();
-    for (Map.Entry<String, TaskConfig> entry : tasks.entrySet()) {
-      TaskAssignResult assignResult =
-          new TaskAssignResult(entry.getValue(), this, true, 
fitnessScoreFactor, null,
-              "Recovered TaskAssignResult from current state");
-      try {
-        assign(assignResult);
-        assignment.put(entry.getKey(), assignResult);
-      } catch (IllegalStateException e) {
-        logger.error("Failed to set current assignment for task {}.", 
entry.getValue().getId(), e);
-      }
+  public TaskAssignResult restoreTaskAssignResult(String taskId, TaskConfig 
taskConfig) {
+    TaskAssignResult assignResult = new TaskAssignResult(taskConfig, this, 
true, fitnessScoreFactor,
+        null, "Recovered TaskAssignResult from current state");
+    try {
+      assign(assignResult);
+    } catch (IllegalStateException e) {
+      logger.error("Failed to set current assignment for task {}.", taskId, e);
+      return new TaskAssignResult(taskConfig, this, false, fitnessScoreFactor,
+          null, "Recovered TaskAssignResult from current state");
     }
-    return assignment;
+    return assignResult;
   }
 
   /**
@@ -424,4 +425,4 @@ public class AssignableInstance {
   public Map<String, Map<String, Integer>> getUsedCapacity() {
     return _usedCapacity;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/034424cc/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java
 
b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java
new file mode 100644
index 0000000..a0457a6
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java
@@ -0,0 +1,248 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.task.assigner.AssignableInstance;
+import org.apache.helix.task.assigner.TaskAssignResult;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestAssignableInstanceManager {
+  private static final int NUM_PARTICIPANTS = 3;
+  private static final int NUM_JOBS = 3;
+  private static final int NUM_TASKS = 3;
+  private static final String CLUSTER_NAME = "TestCluster_0";
+  private static final String INSTANCE_PREFIX = "Instance_";
+  private static final String JOB_PREFIX = "Job_";
+  private static final String TASK_PREFIX = "Task_";
+
+  private ClusterConfig _clusterConfig;
+  private MockTaskDataCache _taskDataCache;
+  private AssignableInstanceManager _assignableInstanceManager;
+  private Map<String, LiveInstance> _liveInstances;
+  private Map<String, InstanceConfig> _instanceConfigs;
+  private Set<String> _taskIDs; // To keep track of what tasks were created
+
+  @BeforeClass
+  public void beforeClass() {
+    System.out.println(
+        "START " + this.getClass().getSimpleName() + " at " + new 
Date(System.currentTimeMillis()));
+    _clusterConfig = new ClusterConfig(CLUSTER_NAME);
+    _taskDataCache = new MockTaskDataCache(CLUSTER_NAME);
+    _liveInstances = new HashMap<>();
+    _instanceConfigs = new HashMap<>();
+    _taskIDs = new HashSet<>();
+
+    // Populate live instances and their corresponding instance configs
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      String instanceName = INSTANCE_PREFIX + i;
+      LiveInstance liveInstance = new LiveInstance(instanceName);
+      InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+      _liveInstances.put(instanceName, liveInstance);
+      _instanceConfigs.put(instanceName, instanceConfig);
+    }
+
+    // Populate taskDataCache with JobConfigs and JobContexts
+    for (int i = 0; i < NUM_JOBS; i++) {
+      String jobName = JOB_PREFIX + i;
+
+      // Create a JobConfig
+      JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
+      List<TaskConfig> taskConfigs = new ArrayList<>();
+      for (int j = 0; j < NUM_TASKS; j++) {
+        String taskID = jobName + "_" + TASK_PREFIX + j;
+        TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
+        taskConfigBuilder.setTaskId(taskID);
+        _taskIDs.add(taskID);
+        taskConfigs.add(taskConfigBuilder.build());
+      }
+
+      jobConfigBuilder.setJobId(jobName);
+      jobConfigBuilder.addTaskConfigs(taskConfigs);
+      jobConfigBuilder.setCommand("MOCK");
+      jobConfigBuilder.setWorkflow("WORKFLOW");
+      _taskDataCache.addJobConfig(jobName, jobConfigBuilder.build());
+
+      // Create a JobContext
+      ZNRecord znRecord = new ZNRecord(JOB_PREFIX + "context_" + i);
+      JobContext jobContext = new MockJobContext(znRecord, _liveInstances, 
_taskIDs);
+      _taskDataCache.addJobContext(jobName, jobContext);
+      _taskIDs.clear();
+    }
+
+    // Create an AssignableInstanceManager
+    _assignableInstanceManager = new AssignableInstanceManager(_clusterConfig, 
_taskDataCache,
+        _liveInstances, _instanceConfigs);
+  }
+
+  @Test
+  public void testGetAssignableInstanceMap() {
+    Map<String, AssignableInstance> assignableInstanceMap =
+        _assignableInstanceManager.getAssignableInstanceMap();
+    for (String liveInstance : _liveInstances.keySet()) {
+      Assert.assertTrue(assignableInstanceMap.containsKey(liveInstance));
+    }
+  }
+
+  @Test
+  public void testGetTaskAssignResultMap() {
+    Map<String, TaskAssignResult> taskAssignResultMap =
+        _assignableInstanceManager.getTaskAssignResultMap();
+    for (String taskID : _taskIDs) {
+      Assert.assertTrue(taskAssignResultMap.containsKey(taskID));
+    }
+  }
+
+  @Test
+  public void testUpdateAssignableInstances() {
+    Map<String, LiveInstance> newLiveInstances = new HashMap<>();
+    Map<String, InstanceConfig> newInstanceConfigs = new HashMap<>();
+
+    // A brand new set of LiveInstances
+    for (int i = NUM_PARTICIPANTS; i < NUM_PARTICIPANTS + 3; i++) {
+      String instanceName = INSTANCE_PREFIX + i;
+      newLiveInstances.put(instanceName, new LiveInstance(instanceName));
+      newInstanceConfigs.put(instanceName, new InstanceConfig(instanceName));
+    }
+
+    _assignableInstanceManager.updateAssignableInstances(_clusterConfig, 
newLiveInstances,
+        newInstanceConfigs);
+
+    // Check that the assignable instance map contains new instances and there 
are no
+    // TaskAssignResults due to previous live instances being removed
+    
Assert.assertEquals(_assignableInstanceManager.getTaskAssignResultMap().size(), 
0);
+    
Assert.assertEquals(_assignableInstanceManager.getAssignableInstanceMap().size(),
 newLiveInstances.size());
+    for (String instance : newLiveInstances.keySet()) {
+      
Assert.assertTrue(_assignableInstanceManager.getAssignableInstanceMap().containsKey(instance));
+    }
+  }
+
+  public class MockTaskDataCache extends TaskDataCache {
+    private Map<String, JobConfig> _jobConfigMap;
+    private Map<String, WorkflowConfig> _workflowConfigMap;
+    private Map<String, JobContext> _jobContextMap;
+    private Map<String, WorkflowContext> _workflowContextMap;
+
+    public MockTaskDataCache(String clusterName) {
+      super(clusterName);
+      _jobConfigMap = new HashMap<>();
+      _workflowConfigMap = new HashMap<>();
+      _jobContextMap = new HashMap<>();
+      _workflowContextMap = new HashMap<>();
+    }
+
+    public void addJobConfig(String jobName, JobConfig jobConfig) {
+      _jobConfigMap.put(jobName, jobConfig);
+    }
+
+    public void addJobContext(String jobName, JobContext jobContext) {
+      _jobContextMap.put(jobName, jobContext);
+    }
+
+    public void addWorkflowConfig(String workflowName, WorkflowConfig 
workflowConfig) {
+      _workflowConfigMap.put(workflowName, workflowConfig);
+    }
+
+    public void addWorkflowContext(String workflowName, WorkflowContext 
workflowContext) {
+      _workflowContextMap.put(workflowName, workflowContext);
+    }
+
+    @Override
+    public JobContext getJobContext(String jobName) {
+      return _jobContextMap.get(jobName);
+    }
+
+    @Override
+    public Map<String, JobConfig> getJobConfigMap() {
+      return _jobConfigMap;
+    }
+
+    @Override
+    public Map<String, WorkflowConfig> getWorkflowConfigMap() {
+      return _workflowConfigMap;
+    }
+
+    public Map<String, JobContext> getJobContextMap() {
+      return _jobContextMap;
+    }
+
+    public Map<String, WorkflowContext> getWorkflowContextMap() {
+      return _workflowContextMap;
+    }
+  }
+
+  public class MockJobContext extends JobContext {
+    private Set<Integer> _taskPartitionSet;
+    private Map<Integer, TaskPartitionState> _taskPartitionStateMap;
+    private Map<Integer, String> _partitionToTaskIDMap;
+    private Map<Integer, String> _taskToInstanceMap;
+
+    public MockJobContext(ZNRecord record, Map<String, LiveInstance> 
liveInstanceMap,
+        Set<String> taskIDs) {
+      super(record);
+      _taskPartitionSet = new HashSet<>();
+      _taskPartitionStateMap = new HashMap<>();
+      _partitionToTaskIDMap = new HashMap<>();
+      _taskToInstanceMap = new HashMap<>();
+
+      List<String> taskIDList = new ArrayList<>(taskIDs);
+      for (int i = 0; i < taskIDList.size(); i++) {
+        _taskPartitionSet.add(i);
+        _taskPartitionStateMap.put(i, TaskPartitionState.RUNNING);
+        _partitionToTaskIDMap.put(i, taskIDList.get(i));
+        String someInstance = liveInstanceMap.keySet().iterator().next();
+        _taskToInstanceMap.put(i, someInstance);
+      }
+    }
+
+    @Override
+    public Set<Integer> getPartitionSet() {
+      return _taskPartitionSet;
+    }
+
+    @Override
+    public TaskPartitionState getPartitionState(int p) {
+      return _taskPartitionStateMap.get(p);
+    }
+
+    @Override
+    public String getAssignedParticipant(int p) {
+      return _taskToInstanceMap.get(p);
+    }
+
+    @Override
+    public String getTaskIdForPartition(int p) {
+      return _partitionToTaskIDMap.get(p);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/034424cc/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java
 
b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java
new file mode 100644
index 0000000..71a5e65
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java
@@ -0,0 +1,149 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.task.assigner.AssignableInstance;
+import org.apache.helix.task.assigner.TaskAssignResult;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestAssignableInstanceManagerControllerSwitch extends 
TaskTestBase {
+  protected int numJobs = 2;
+  protected int numTasks = 3;
+
+  @Test
+  public void testControllerSwitch() throws InterruptedException {
+    setupAndRunJobs();
+
+    Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+
+    RoutingTableProvider routingTableProvider = new 
RoutingTableProvider(_manager);
+    Collection<LiveInstance> liveInstances = 
routingTableProvider.getLiveInstances();
+    for (LiveInstance liveInstance : liveInstances) {
+      String instanceName = liveInstance.getInstanceName();
+      liveInstanceMap.put(instanceName, liveInstance);
+      instanceConfigMap.put(instanceName,
+          
_setupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, 
instanceName));
+    }
+
+    // Get ClusterConfig
+    ClusterConfig clusterConfig = 
_manager.getConfigAccessor().getClusterConfig(CLUSTER_NAME);
+
+    // Initialize TaskDataCache
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    TaskDataCache taskDataCache = new TaskDataCache(CLUSTER_NAME);
+    Map<String, ResourceConfig> resourceConfigMap =
+        accessor.getChildValuesMap(accessor.keyBuilder().resourceConfigs(), 
true);
+
+    // Wait for the job pipeline
+    Thread.sleep(2000);
+    taskDataCache.refresh(accessor, resourceConfigMap);
+
+    AssignableInstanceManager prevAssignableInstanceManager = new 
AssignableInstanceManager(
+        clusterConfig, taskDataCache, liveInstanceMap, instanceConfigMap);
+    Map<String, AssignableInstance> prevAssignableInstanceMap =
+        new 
HashMap<>(prevAssignableInstanceManager.getAssignableInstanceMap());
+    Map<String, TaskAssignResult> prevTaskAssignResultMap =
+        new HashMap<>(prevAssignableInstanceManager.getTaskAssignResultMap());
+
+    // Stop the current controller
+    _controller.syncStop();
+    _controller = null;
+    // Start a new controller
+    String newControllerName = CONTROLLER_PREFIX + "_1";
+    ClusterControllerManager newController =
+        new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, newControllerName);
+    newController.syncStart();
+
+    // Generate a new AssignableInstanceManager
+    taskDataCache.refresh(accessor, resourceConfigMap);
+    AssignableInstanceManager newAssignableInstanceManager = new 
AssignableInstanceManager(
+        clusterConfig, taskDataCache, liveInstanceMap, instanceConfigMap);
+    Map<String, AssignableInstance> newAssignableInstanceMap =
+        new HashMap<>(newAssignableInstanceManager.getAssignableInstanceMap());
+    Map<String, TaskAssignResult> newTaskAssignResultMap =
+        new HashMap<>(newAssignableInstanceManager.getTaskAssignResultMap());
+
+    // Compare prev and new - they should match up exactly
+    Assert.assertEquals(prevAssignableInstanceMap.size(), 
newAssignableInstanceMap.size());
+    Assert.assertEquals(prevTaskAssignResultMap.size(), 
newTaskAssignResultMap.size());
+    for (Map.Entry<String, AssignableInstance> assignableInstanceEntry : 
newAssignableInstanceMap
+        .entrySet()) {
+      String instance = assignableInstanceEntry.getKey();
+      
Assert.assertEquals(prevAssignableInstanceMap.get(instance).getCurrentAssignments(),
+          assignableInstanceEntry.getValue().getCurrentAssignments());
+      
Assert.assertEquals(prevAssignableInstanceMap.get(instance).getTotalCapacity(),
+          assignableInstanceEntry.getValue().getTotalCapacity());
+      
Assert.assertEquals(prevAssignableInstanceMap.get(instance).getUsedCapacity(),
+          assignableInstanceEntry.getValue().getUsedCapacity());
+    }
+    for (Map.Entry<String, TaskAssignResult> taskAssignResultEntry : 
newTaskAssignResultMap
+        .entrySet()) {
+      String taskID = taskAssignResultEntry.getKey();
+      Assert.assertEquals(prevTaskAssignResultMap.get(taskID).toString(),
+          taskAssignResultEntry.getValue().toString());
+    }
+  }
+
+  private void setupAndRunJobs() {
+    // Create a workflow with some long-running jobs in progress
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder builder = new Workflow.Builder(workflowName);
+    for (int i = 0; i < numJobs; i++) {
+      List<TaskConfig> taskConfigs = new ArrayList<>();
+      for (int j = 0; j < numTasks; j++) {
+        String taskID = "JOB_" + i + "_TASK_" + j;
+        TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
+        taskConfigBuilder.setTaskId(taskID).setCommand(MockTask.TASK_COMMAND)
+            .addConfig(MockTask.JOB_DELAY, "120000");
+        taskConfigs.add(taskConfigBuilder.build());
+      }
+      String jobName = "JOB_" + i;
+      JobConfig.Builder jobBuilder =
+          new 
JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(10000)
+              .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG)
+              .addTaskConfigs(taskConfigs).setIgnoreDependentJobFailure(true)
+              .setFailureThreshold(100000)
+              .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, 
"120000")); // Long-running
+      // job
+      builder.addJob(jobName, jobBuilder);
+    }
+    // Start the workflow
+    _driver.start(builder.build());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/034424cc/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
 
b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
index f1c92e3..a1a5852 100644
--- 
a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
@@ -28,6 +28,8 @@ import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskStateModelFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
 
 public class TestAssignableInstance extends AssignerTestBase {
 
@@ -285,7 +287,16 @@ public class TestAssignableInstance extends 
AssignerTestBase {
     unsupportedTask.setQuotaType("UnsupportedQuotaType");
     currentAssignments.put("unsupportedTask", unsupportedTask);
 
-    Map<String, TaskAssignResult> results = 
ai.setCurrentAssignments(currentAssignments);
+    Map<String, TaskAssignResult> results = Maps.newHashMap();
+    for (Map.Entry<String, TaskConfig> entry : currentAssignments.entrySet()) {
+      String taskID = entry.getKey();
+      TaskConfig taskConfig = entry.getValue();
+      TaskAssignResult taskAssignResult = ai.restoreTaskAssignResult(taskID, 
taskConfig);
+      if (taskAssignResult.isSuccessful()) {
+        results.put(taskID, taskAssignResult);
+      }
+    }
+
     for (TaskAssignResult rst : results.values()) {
       Assert.assertTrue(rst.isSuccessful());
       Assert.assertEquals(rst.getAssignableInstance(), ai);
@@ -320,4 +331,4 @@ public class TestAssignableInstance extends 
AssignerTestBase {
     }
     return expectedQuotaPerType;
   }
-}
+}
\ No newline at end of file

Reply via email to