Repository: helix Updated Branches: refs/heads/master 4511dbc6c -> 034424cc4
[HELIX-720] [TASK] Implement AssignableInstanceManager AssignableInstanceManager supports job quotas in Task Framework by 1. Re-creates AssignableInstance map with correct resource usage based on TaskContexts 2. Provides an update API that refreshes instances and configs. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/034424cc Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/034424cc Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/034424cc Branch: refs/heads/master Commit: 034424cc4852bda55e74ddbbf42db4c7f293262c Parents: 4511dbc Author: Hunter Lee <[email protected]> Authored: Mon Jul 9 17:07:41 2018 -0700 Committer: Hunter Lee <[email protected]> Committed: Mon Jul 9 18:23:43 2018 -0700 ---------------------------------------------------------------------- .../helix/task/AssignableInstanceManager.java | 217 ++++++++++++++++ .../helix/task/assigner/AssignableInstance.java | 37 +-- .../task/TestAssignableInstanceManager.java | 248 +++++++++++++++++++ ...signableInstanceManagerControllerSwitch.java | 149 +++++++++++ .../task/assigner/TestAssignableInstance.java | 15 +- 5 files changed, 646 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/034424cc/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java new file mode 100644 index 0000000..4ede2b8 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java @@ -0,0 +1,217 @@ +package org.apache.helix.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.helix.common.caches.TaskDataCache; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.task.assigner.AssignableInstance; +import org.apache.helix.task.assigner.TaskAssignResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AssignableInstanceManager { + + private static final Logger LOG = LoggerFactory.getLogger(AssignableInstanceManager.class); + // Instance name -> AssignableInstance + private Map<String, AssignableInstance> _assignableInstanceMap; + // TaskID -> TaskAssignResult TODO: Hunter: Move this if not needed + private Map<String, TaskAssignResult> _taskAssignResultMap; + + /** + * Constructor for AssignableInstanceManager. Builds AssignableInstances based on + * WorkflowContexts, JobContexts, and LiveInstances. Note that the lists of LiveInstances and + * InstanceConfigs must match, meaning a LiveInstance and an InstanceConfig at the same index + * represent the same instance. + * @param clusterConfig + * @param taskDataCache + * @param liveInstances + * @param instanceConfigs + */ + public AssignableInstanceManager(ClusterConfig clusterConfig, TaskDataCache taskDataCache, + Map<String, LiveInstance> liveInstances, Map<String, InstanceConfig> instanceConfigs) { + // Build the cache from scratch + _assignableInstanceMap = new HashMap<>(); + _taskAssignResultMap = new HashMap<>(); + + // Create all AssignableInstance objects based on what's in liveInstances + for (Map.Entry<String, LiveInstance> liveInstanceEntry : liveInstances.entrySet()) { + // Prepare instance-specific metadata + String instanceName = liveInstanceEntry.getKey(); + LiveInstance liveInstance = liveInstanceEntry.getValue(); + if (!instanceConfigs.containsKey(instanceName)) { + continue; // Ill-formatted input; skip over this instance + } + InstanceConfig instanceConfig = instanceConfigs.get(instanceName); + + // Create an AssignableInstance + AssignableInstance assignableInstance = + new AssignableInstance(clusterConfig, instanceConfig, liveInstance); + _assignableInstanceMap.put(instanceConfig.getInstanceName(), assignableInstance); + LOG.info("AssignableInstance created for instance: {}", instanceName); + } + + // Update task profiles by traversing all TaskContexts + Map<String, JobConfig> jobConfigMap = taskDataCache.getJobConfigMap(); + for (String jobName : jobConfigMap.keySet()) { + JobConfig jobConfig = jobConfigMap.get(jobName); + JobContext jobContext = taskDataCache.getJobContext(jobName); + if (jobConfig == null || jobContext == null) { + LOG.warn( + "JobConfig or JobContext for this job is null. Skipping this job! Job name: {}, JobConfig: {}, JobContext: {}", + jobName, jobConfig, jobContext); + continue; // Ignore this job if either the config or context is null + } + Set<Integer> taskIndices = jobContext.getPartitionSet(); // Each integer represents a task in + // this job (this is NOT taskId) + for (int taskIndex : taskIndices) { + TaskPartitionState taskState = jobContext.getPartitionState(taskIndex); + if (taskState == TaskPartitionState.INIT || taskState == TaskPartitionState.RUNNING) { + // Because task state is INIT or RUNNING, find the right AssignableInstance and subtract + // the right amount of resources. STOPPED means it's been cancelled, so it will be + // re-assigned and therefore does not use instances' resources + + String assignedInstance = jobContext.getAssignedParticipant(taskIndex); + String taskId = jobContext.getTaskIdForPartition(taskIndex); + if (assignedInstance == null) { + LOG.warn( + "This task's TaskContext does not have an assigned instance! Task will be ignored. " + + "Job: {}, TaskId: {}, TaskIndex: {}", + jobContext.getName(), taskId, taskIndex); + continue; + } + if (_assignableInstanceMap.containsKey(assignedInstance)) { + TaskConfig taskConfig = jobConfig.getTaskConfig(taskId); + AssignableInstance assignableInstance = _assignableInstanceMap.get(assignedInstance); + TaskAssignResult taskAssignResult = + assignableInstance.restoreTaskAssignResult(taskId, taskConfig); + if (taskAssignResult.isSuccessful()) { + _taskAssignResultMap.put(taskId, taskAssignResult); + LOG.info("TaskAssignResult restored for taskId: {}, assigned on instance: {}", taskId, + assignedInstance); + } + } else { + LOG.warn( + "While building AssignableInstance map, discovered that the instance a task is assigned to is no " + + "longer a LiveInstance! TaskAssignResult will not be created and no resource will be taken " + + "up for this task. Job: {}, TaskId: {}, TaskIndex: {}, Instance: {}", + jobContext.getName(), taskId, taskIndex, assignedInstance); + } + } + } + } + } + + /** + * Updates AssignableInstances when there are any config changes. This update will be based on the + * list of LiveInstances provided. + * @param clusterConfig + * @param liveInstances + * @param instanceConfigs + */ + public void updateAssignableInstances(ClusterConfig clusterConfig, + Map<String, LiveInstance> liveInstances, Map<String, InstanceConfig> instanceConfigs) { + // Keep a collection to determine what's no longer a LiveInstance, in which case the + // corresponding AssignableInstance must be removed + Collection<AssignableInstance> staleAssignableInstances = + new HashSet<>(_assignableInstanceMap.values()); + + for (Map.Entry<String, LiveInstance> liveInstanceEntry : liveInstances.entrySet()) { + // Prepare instance-specific metadata + String instanceName = liveInstanceEntry.getKey(); + LiveInstance liveInstance = liveInstanceEntry.getValue(); + if (!instanceConfigs.containsKey(instanceName)) { + continue; // Ill-formatted input; skip over this instance + } + InstanceConfig instanceConfig = instanceConfigs.get(instanceName); + + // Update configs for currently existing instance + if (_assignableInstanceMap.containsKey(instanceName)) { + _assignableInstanceMap.get(instanceName).updateConfigs(clusterConfig, instanceConfig, + liveInstance); + } else { + // create a new AssignableInstance for a newly added LiveInstance; this is a new + // LiveInstance so TaskAssignResults are not re-created and no tasks are assigned + AssignableInstance assignableInstance = + new AssignableInstance(clusterConfig, instanceConfig, liveInstance); + _assignableInstanceMap.put(instanceName, assignableInstance); + LOG.info("AssignableInstance created for instance: {} during updateAssignableInstances", + instanceName); + } + // Remove because we've confirmed that this AssignableInstance is a LiveInstance as well + staleAssignableInstances.remove(_assignableInstanceMap.get(instanceName)); + } + + // AssignableInstances that are not live need to be removed from the map because they are not + // live + for (AssignableInstance instanceToBeRemoved : staleAssignableInstances) { + // Remove all tasks on this instance first + for (String taskToRemove : instanceToBeRemoved.getCurrentAssignments()) { + // Check that AssignableInstances match + if (_taskAssignResultMap.get(taskToRemove).getAssignableInstance().getInstanceName() + .equals(instanceToBeRemoved.getInstanceName())) { + _taskAssignResultMap.remove(taskToRemove); // TODO: Hunter: Move this if necessary + LOG.info( + "TaskAssignResult removed because its assigned instance is no longer live. TaskID: {}, instance: {}", + taskToRemove, instanceToBeRemoved.getInstanceName()); + } + } + _assignableInstanceMap.remove(instanceToBeRemoved.getInstanceName()); + LOG.info( + "Non-live AssignableInstance removed for instance: {} during updateAssignableInstances", + instanceToBeRemoved.getInstanceName()); + } + } + + /** + * Returns all instanceName -> AssignableInstance mappings. + * @return assignableInstanceMap + */ + public Map<String, AssignableInstance> getAssignableInstanceMap() { + return Collections.unmodifiableMap(_assignableInstanceMap); + } + + /** + * Returns all AssignableInstances that support a given quota type. + * @param quotaType + * @return unmodifiable set of AssignableInstances + */ + public Set<AssignableInstance> getAssignableInstancesForQuotaType(String quotaType) { + // TODO: Currently, quota types are global settings across all AssignableInstances. When this + // TODO: becomes customizable, we need to actually implement this so that it doesn't return all + // TODO: AssignableInstances + return Collections.unmodifiableSet(new HashSet<>(_assignableInstanceMap.values())); + } + + /** + * Returns taskId -> TaskAssignResult mappings. + * @return taskAssignResultMap + */ + public Map<String, TaskAssignResult> getTaskAssignResultMap() { + return _taskAssignResultMap; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/034424cc/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java index 8883987..b56b3b9 100644 --- a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java +++ b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java @@ -374,25 +374,26 @@ public class AssignableInstance { } /** - * This method is used for forcing AssignableInstance to match current assignment state. It - * returns with TaskAssignResult for proper release current assignments when they are finished. - * @param tasks taskId -> taskConfig mapping - * @return taskId -> TaskAssignResult mapping + * This method restores a TaskAssignResult for a given task in this AssignableInstance when this + * AssignableInstance is created from scratch due to events like a controller switch. It + * returns a TaskAssignResult to be used for proper release of resources when the task is in a + * terminal state. + * @param taskId of the task + * @param taskConfig of the task + * @return TaskAssignResult with isSuccessful = true if successful. If assigning it to an instance + * fails, TaskAssignResult's getSuccessful() will return false */ - public Map<String, TaskAssignResult> setCurrentAssignments(Map<String, TaskConfig> tasks) { - Map<String, TaskAssignResult> assignment = new HashMap<>(); - for (Map.Entry<String, TaskConfig> entry : tasks.entrySet()) { - TaskAssignResult assignResult = - new TaskAssignResult(entry.getValue(), this, true, fitnessScoreFactor, null, - "Recovered TaskAssignResult from current state"); - try { - assign(assignResult); - assignment.put(entry.getKey(), assignResult); - } catch (IllegalStateException e) { - logger.error("Failed to set current assignment for task {}.", entry.getValue().getId(), e); - } + public TaskAssignResult restoreTaskAssignResult(String taskId, TaskConfig taskConfig) { + TaskAssignResult assignResult = new TaskAssignResult(taskConfig, this, true, fitnessScoreFactor, + null, "Recovered TaskAssignResult from current state"); + try { + assign(assignResult); + } catch (IllegalStateException e) { + logger.error("Failed to set current assignment for task {}.", taskId, e); + return new TaskAssignResult(taskConfig, this, false, fitnessScoreFactor, + null, "Recovered TaskAssignResult from current state"); } - return assignment; + return assignResult; } /** @@ -424,4 +425,4 @@ public class AssignableInstance { public Map<String, Map<String, Integer>> getUsedCapacity() { return _usedCapacity; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/034424cc/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java new file mode 100644 index 0000000..a0457a6 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java @@ -0,0 +1,248 @@ +package org.apache.helix.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.helix.ZNRecord; +import org.apache.helix.common.caches.TaskDataCache; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.task.assigner.AssignableInstance; +import org.apache.helix.task.assigner.TaskAssignResult; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestAssignableInstanceManager { + private static final int NUM_PARTICIPANTS = 3; + private static final int NUM_JOBS = 3; + private static final int NUM_TASKS = 3; + private static final String CLUSTER_NAME = "TestCluster_0"; + private static final String INSTANCE_PREFIX = "Instance_"; + private static final String JOB_PREFIX = "Job_"; + private static final String TASK_PREFIX = "Task_"; + + private ClusterConfig _clusterConfig; + private MockTaskDataCache _taskDataCache; + private AssignableInstanceManager _assignableInstanceManager; + private Map<String, LiveInstance> _liveInstances; + private Map<String, InstanceConfig> _instanceConfigs; + private Set<String> _taskIDs; // To keep track of what tasks were created + + @BeforeClass + public void beforeClass() { + System.out.println( + "START " + this.getClass().getSimpleName() + " at " + new Date(System.currentTimeMillis())); + _clusterConfig = new ClusterConfig(CLUSTER_NAME); + _taskDataCache = new MockTaskDataCache(CLUSTER_NAME); + _liveInstances = new HashMap<>(); + _instanceConfigs = new HashMap<>(); + _taskIDs = new HashSet<>(); + + // Populate live instances and their corresponding instance configs + for (int i = 0; i < NUM_PARTICIPANTS; i++) { + String instanceName = INSTANCE_PREFIX + i; + LiveInstance liveInstance = new LiveInstance(instanceName); + InstanceConfig instanceConfig = new InstanceConfig(instanceName); + _liveInstances.put(instanceName, liveInstance); + _instanceConfigs.put(instanceName, instanceConfig); + } + + // Populate taskDataCache with JobConfigs and JobContexts + for (int i = 0; i < NUM_JOBS; i++) { + String jobName = JOB_PREFIX + i; + + // Create a JobConfig + JobConfig.Builder jobConfigBuilder = new JobConfig.Builder(); + List<TaskConfig> taskConfigs = new ArrayList<>(); + for (int j = 0; j < NUM_TASKS; j++) { + String taskID = jobName + "_" + TASK_PREFIX + j; + TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder(); + taskConfigBuilder.setTaskId(taskID); + _taskIDs.add(taskID); + taskConfigs.add(taskConfigBuilder.build()); + } + + jobConfigBuilder.setJobId(jobName); + jobConfigBuilder.addTaskConfigs(taskConfigs); + jobConfigBuilder.setCommand("MOCK"); + jobConfigBuilder.setWorkflow("WORKFLOW"); + _taskDataCache.addJobConfig(jobName, jobConfigBuilder.build()); + + // Create a JobContext + ZNRecord znRecord = new ZNRecord(JOB_PREFIX + "context_" + i); + JobContext jobContext = new MockJobContext(znRecord, _liveInstances, _taskIDs); + _taskDataCache.addJobContext(jobName, jobContext); + _taskIDs.clear(); + } + + // Create an AssignableInstanceManager + _assignableInstanceManager = new AssignableInstanceManager(_clusterConfig, _taskDataCache, + _liveInstances, _instanceConfigs); + } + + @Test + public void testGetAssignableInstanceMap() { + Map<String, AssignableInstance> assignableInstanceMap = + _assignableInstanceManager.getAssignableInstanceMap(); + for (String liveInstance : _liveInstances.keySet()) { + Assert.assertTrue(assignableInstanceMap.containsKey(liveInstance)); + } + } + + @Test + public void testGetTaskAssignResultMap() { + Map<String, TaskAssignResult> taskAssignResultMap = + _assignableInstanceManager.getTaskAssignResultMap(); + for (String taskID : _taskIDs) { + Assert.assertTrue(taskAssignResultMap.containsKey(taskID)); + } + } + + @Test + public void testUpdateAssignableInstances() { + Map<String, LiveInstance> newLiveInstances = new HashMap<>(); + Map<String, InstanceConfig> newInstanceConfigs = new HashMap<>(); + + // A brand new set of LiveInstances + for (int i = NUM_PARTICIPANTS; i < NUM_PARTICIPANTS + 3; i++) { + String instanceName = INSTANCE_PREFIX + i; + newLiveInstances.put(instanceName, new LiveInstance(instanceName)); + newInstanceConfigs.put(instanceName, new InstanceConfig(instanceName)); + } + + _assignableInstanceManager.updateAssignableInstances(_clusterConfig, newLiveInstances, + newInstanceConfigs); + + // Check that the assignable instance map contains new instances and there are no + // TaskAssignResults due to previous live instances being removed + Assert.assertEquals(_assignableInstanceManager.getTaskAssignResultMap().size(), 0); + Assert.assertEquals(_assignableInstanceManager.getAssignableInstanceMap().size(), newLiveInstances.size()); + for (String instance : newLiveInstances.keySet()) { + Assert.assertTrue(_assignableInstanceManager.getAssignableInstanceMap().containsKey(instance)); + } + } + + public class MockTaskDataCache extends TaskDataCache { + private Map<String, JobConfig> _jobConfigMap; + private Map<String, WorkflowConfig> _workflowConfigMap; + private Map<String, JobContext> _jobContextMap; + private Map<String, WorkflowContext> _workflowContextMap; + + public MockTaskDataCache(String clusterName) { + super(clusterName); + _jobConfigMap = new HashMap<>(); + _workflowConfigMap = new HashMap<>(); + _jobContextMap = new HashMap<>(); + _workflowContextMap = new HashMap<>(); + } + + public void addJobConfig(String jobName, JobConfig jobConfig) { + _jobConfigMap.put(jobName, jobConfig); + } + + public void addJobContext(String jobName, JobContext jobContext) { + _jobContextMap.put(jobName, jobContext); + } + + public void addWorkflowConfig(String workflowName, WorkflowConfig workflowConfig) { + _workflowConfigMap.put(workflowName, workflowConfig); + } + + public void addWorkflowContext(String workflowName, WorkflowContext workflowContext) { + _workflowContextMap.put(workflowName, workflowContext); + } + + @Override + public JobContext getJobContext(String jobName) { + return _jobContextMap.get(jobName); + } + + @Override + public Map<String, JobConfig> getJobConfigMap() { + return _jobConfigMap; + } + + @Override + public Map<String, WorkflowConfig> getWorkflowConfigMap() { + return _workflowConfigMap; + } + + public Map<String, JobContext> getJobContextMap() { + return _jobContextMap; + } + + public Map<String, WorkflowContext> getWorkflowContextMap() { + return _workflowContextMap; + } + } + + public class MockJobContext extends JobContext { + private Set<Integer> _taskPartitionSet; + private Map<Integer, TaskPartitionState> _taskPartitionStateMap; + private Map<Integer, String> _partitionToTaskIDMap; + private Map<Integer, String> _taskToInstanceMap; + + public MockJobContext(ZNRecord record, Map<String, LiveInstance> liveInstanceMap, + Set<String> taskIDs) { + super(record); + _taskPartitionSet = new HashSet<>(); + _taskPartitionStateMap = new HashMap<>(); + _partitionToTaskIDMap = new HashMap<>(); + _taskToInstanceMap = new HashMap<>(); + + List<String> taskIDList = new ArrayList<>(taskIDs); + for (int i = 0; i < taskIDList.size(); i++) { + _taskPartitionSet.add(i); + _taskPartitionStateMap.put(i, TaskPartitionState.RUNNING); + _partitionToTaskIDMap.put(i, taskIDList.get(i)); + String someInstance = liveInstanceMap.keySet().iterator().next(); + _taskToInstanceMap.put(i, someInstance); + } + } + + @Override + public Set<Integer> getPartitionSet() { + return _taskPartitionSet; + } + + @Override + public TaskPartitionState getPartitionState(int p) { + return _taskPartitionStateMap.get(p); + } + + @Override + public String getAssignedParticipant(int p) { + return _taskToInstanceMap.get(p); + } + + @Override + public String getTaskIdForPartition(int p) { + return _partitionToTaskIDMap.get(p); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/034424cc/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java new file mode 100644 index 0000000..71a5e65 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java @@ -0,0 +1,149 @@ +package org.apache.helix.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.TestHelper; +import org.apache.helix.common.caches.TaskDataCache; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.task.MockTask; +import org.apache.helix.integration.task.TaskTestBase; +import org.apache.helix.integration.task.WorkflowGenerator; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.spectator.RoutingTableProvider; +import org.apache.helix.task.assigner.AssignableInstance; +import org.apache.helix.task.assigner.TaskAssignResult; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestAssignableInstanceManagerControllerSwitch extends TaskTestBase { + protected int numJobs = 2; + protected int numTasks = 3; + + @Test + public void testControllerSwitch() throws InterruptedException { + setupAndRunJobs(); + + Map<String, LiveInstance> liveInstanceMap = new HashMap<>(); + Map<String, InstanceConfig> instanceConfigMap = new HashMap<>(); + + RoutingTableProvider routingTableProvider = new RoutingTableProvider(_manager); + Collection<LiveInstance> liveInstances = routingTableProvider.getLiveInstances(); + for (LiveInstance liveInstance : liveInstances) { + String instanceName = liveInstance.getInstanceName(); + liveInstanceMap.put(instanceName, liveInstance); + instanceConfigMap.put(instanceName, + _setupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceName)); + } + + // Get ClusterConfig + ClusterConfig clusterConfig = _manager.getConfigAccessor().getClusterConfig(CLUSTER_NAME); + + // Initialize TaskDataCache + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); + TaskDataCache taskDataCache = new TaskDataCache(CLUSTER_NAME); + Map<String, ResourceConfig> resourceConfigMap = + accessor.getChildValuesMap(accessor.keyBuilder().resourceConfigs(), true); + + // Wait for the job pipeline + Thread.sleep(2000); + taskDataCache.refresh(accessor, resourceConfigMap); + + AssignableInstanceManager prevAssignableInstanceManager = new AssignableInstanceManager( + clusterConfig, taskDataCache, liveInstanceMap, instanceConfigMap); + Map<String, AssignableInstance> prevAssignableInstanceMap = + new HashMap<>(prevAssignableInstanceManager.getAssignableInstanceMap()); + Map<String, TaskAssignResult> prevTaskAssignResultMap = + new HashMap<>(prevAssignableInstanceManager.getTaskAssignResultMap()); + + // Stop the current controller + _controller.syncStop(); + _controller = null; + // Start a new controller + String newControllerName = CONTROLLER_PREFIX + "_1"; + ClusterControllerManager newController = + new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, newControllerName); + newController.syncStart(); + + // Generate a new AssignableInstanceManager + taskDataCache.refresh(accessor, resourceConfigMap); + AssignableInstanceManager newAssignableInstanceManager = new AssignableInstanceManager( + clusterConfig, taskDataCache, liveInstanceMap, instanceConfigMap); + Map<String, AssignableInstance> newAssignableInstanceMap = + new HashMap<>(newAssignableInstanceManager.getAssignableInstanceMap()); + Map<String, TaskAssignResult> newTaskAssignResultMap = + new HashMap<>(newAssignableInstanceManager.getTaskAssignResultMap()); + + // Compare prev and new - they should match up exactly + Assert.assertEquals(prevAssignableInstanceMap.size(), newAssignableInstanceMap.size()); + Assert.assertEquals(prevTaskAssignResultMap.size(), newTaskAssignResultMap.size()); + for (Map.Entry<String, AssignableInstance> assignableInstanceEntry : newAssignableInstanceMap + .entrySet()) { + String instance = assignableInstanceEntry.getKey(); + Assert.assertEquals(prevAssignableInstanceMap.get(instance).getCurrentAssignments(), + assignableInstanceEntry.getValue().getCurrentAssignments()); + Assert.assertEquals(prevAssignableInstanceMap.get(instance).getTotalCapacity(), + assignableInstanceEntry.getValue().getTotalCapacity()); + Assert.assertEquals(prevAssignableInstanceMap.get(instance).getUsedCapacity(), + assignableInstanceEntry.getValue().getUsedCapacity()); + } + for (Map.Entry<String, TaskAssignResult> taskAssignResultEntry : newTaskAssignResultMap + .entrySet()) { + String taskID = taskAssignResultEntry.getKey(); + Assert.assertEquals(prevTaskAssignResultMap.get(taskID).toString(), + taskAssignResultEntry.getValue().toString()); + } + } + + private void setupAndRunJobs() { + // Create a workflow with some long-running jobs in progress + String workflowName = TestHelper.getTestMethodName(); + Workflow.Builder builder = new Workflow.Builder(workflowName); + for (int i = 0; i < numJobs; i++) { + List<TaskConfig> taskConfigs = new ArrayList<>(); + for (int j = 0; j < numTasks; j++) { + String taskID = "JOB_" + i + "_TASK_" + j; + TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder(); + taskConfigBuilder.setTaskId(taskID).setCommand(MockTask.TASK_COMMAND) + .addConfig(MockTask.JOB_DELAY, "120000"); + taskConfigs.add(taskConfigBuilder.build()); + } + String jobName = "JOB_" + i; + JobConfig.Builder jobBuilder = + new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(10000) + .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG) + .addTaskConfigs(taskConfigs).setIgnoreDependentJobFailure(true) + .setFailureThreshold(100000) + .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "120000")); // Long-running + // job + builder.addJob(jobName, jobBuilder); + } + // Start the workflow + _driver.start(builder.build()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/034424cc/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java index f1c92e3..a1a5852 100644 --- a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java +++ b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java @@ -28,6 +28,8 @@ import org.apache.helix.task.TaskConfig; import org.apache.helix.task.TaskStateModelFactory; import org.testng.Assert; import org.testng.annotations.Test; +import org.testng.collections.Maps; + public class TestAssignableInstance extends AssignerTestBase { @@ -285,7 +287,16 @@ public class TestAssignableInstance extends AssignerTestBase { unsupportedTask.setQuotaType("UnsupportedQuotaType"); currentAssignments.put("unsupportedTask", unsupportedTask); - Map<String, TaskAssignResult> results = ai.setCurrentAssignments(currentAssignments); + Map<String, TaskAssignResult> results = Maps.newHashMap(); + for (Map.Entry<String, TaskConfig> entry : currentAssignments.entrySet()) { + String taskID = entry.getKey(); + TaskConfig taskConfig = entry.getValue(); + TaskAssignResult taskAssignResult = ai.restoreTaskAssignResult(taskID, taskConfig); + if (taskAssignResult.isSuccessful()) { + results.put(taskID, taskAssignResult); + } + } + for (TaskAssignResult rst : results.values()) { Assert.assertTrue(rst.isSuccessful()); Assert.assertEquals(rst.getAssignableInstance(), ai); @@ -320,4 +331,4 @@ public class TestAssignableInstance extends AssignerTestBase { } return expectedQuotaPerType; } -} +} \ No newline at end of file
