This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 82c4640d0c2ed79ae69804a28cbaeb1e086d4342 Author: Ali Reza Zamani Zadeh Najari <[email protected]> AuthorDate: Fri Jul 24 11:12:28 2020 -0700 Quota calculation based on CurrentState (#1165) Calculate quota based on CurrentState In this commit, the new methods have been added to AssignableInstanceManager which allow controller to calculate quota based on CurrentState and pending messages. --- .../WorkflowControllerDataProvider.java | 11 -- .../controller/stages/CurrentStateOutput.java | 14 +- .../stages/task/TaskSchedulingStage.java | 8 + .../helix/task/AssignableInstanceManager.java | 172 +++++++++++++++++++ .../helix/integration/task/TestStuckTaskQuota.java | 189 +++++++++++++++++++++ 5 files changed, 382 insertions(+), 12 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java index d5bc11e..45e1319 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java @@ -92,17 +92,6 @@ public class WorkflowControllerDataProvider extends BaseControllerDataProvider { // Refresh TaskCache _taskDataCache.refresh(accessor, getResourceConfigMap()); - // Refresh AssignableInstanceManager - AssignableInstanceManager assignableInstanceManager = - _taskDataCache.getAssignableInstanceManager(); - - // Build from scratch every time - assignableInstanceManager.buildAssignableInstances(getClusterConfig(), _taskDataCache, - getLiveInstances(), getInstanceConfigMap()); - - // TODO: (Hunter) Consider this for optimization after fixing the problem of quotas not being - assignableInstanceManager.logQuotaProfileJSON(false); - long duration = System.currentTimeMillis() - startTime; LogUtil.logInfo(logger, getClusterEventId(), String.format( "END: WorkflowControllerDataProvider.refresh() for cluster %s, started at %d took %d for %s pipeline", diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java index dc82a61..a81fd2c 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java @@ -273,7 +273,7 @@ public class CurrentStateOutput { } /** - * Given resource, returns current state map (parition -> instance -> currentState) + * Given resource, returns current state map (partition -> instance -> currentState) * @param resourceName * @return */ @@ -338,6 +338,18 @@ public class CurrentStateOutput { } /** + * Given resource, returns pending message map (partition -> instance -> message) + * @param resourceName + * @return + */ + public Map<Partition, Map<String, Message>> getPendingMessageMap(String resourceName) { + if (_pendingMessageMap.containsKey(resourceName)) { + return _pendingMessageMap.get(resourceName); + } + return Collections.emptyMap(); + } + + /** * Get the partitions mapped in the current state * @param resourceId resource to look up * @return set of mapped partitions, or empty set if there are none diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java index 430de4b..5b4d580 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java @@ -76,6 +76,14 @@ public class TaskSchedulingStage extends AbstractBaseStage { "Missing attributes in event:" + event + ". Requires CURRENT_STATE|RESOURCES|DataCache"); } + + // Build quota capacity based on Current State and Pending Messages + cache.getAssignableInstanceManager().buildAssignableInstancesFromCurrentState( + cache.getClusterConfig(), cache.getTaskDataCache(), cache.getLiveInstances(), cache.getInstanceConfigMap(), + currentStateOutput, resourceMap); + + cache.getAssignableInstanceManager().logQuotaProfileJSON(false); + // Reset current INIT/RUNNING tasks on participants for throttling cache.resetActiveTaskCount(currentStateOutput); diff --git a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java index eb966ac..cca9335 100644 --- a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java +++ b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java @@ -27,9 +27,13 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.helix.common.caches.TaskDataCache; +import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.Message; +import org.apache.helix.model.Partition; +import org.apache.helix.model.Resource; import org.apache.helix.task.assigner.AssignableInstance; import org.apache.helix.task.assigner.TaskAssignResult; import org.codehaus.jackson.JsonNode; @@ -177,6 +181,174 @@ public class AssignableInstanceManager { } /** + * Builds AssignableInstances and restores TaskAssignResults from scratch by reading from + * CurrentState. It re-computes current quota profile for each AssignableInstance. + * If a task current state is INIT or RUNNING or if there is a pending message which it's ToState + * is RUNNING, the task/partition will be assigned to AssignableInstances of the instance. + * @param clusterConfig + * @param taskDataCache + * @param liveInstances + * @param instanceConfigs + * @param currentStateOutput + * @param resourceMap + */ + public void buildAssignableInstancesFromCurrentState(ClusterConfig clusterConfig, + TaskDataCache taskDataCache, Map<String, LiveInstance> liveInstances, + Map<String, InstanceConfig> instanceConfigs, CurrentStateOutput currentStateOutput, + Map<String, Resource> resourceMap) { + _assignableInstanceMap.clear(); + _taskAssignResultMap.clear(); + + // Create all AssignableInstance objects based on what's in liveInstances + for (Map.Entry<String, LiveInstance> liveInstanceEntry : liveInstances.entrySet()) { + // Prepare instance-specific metadata + String instanceName = liveInstanceEntry.getKey(); + LiveInstance liveInstance = liveInstanceEntry.getValue(); + if (!instanceConfigs.containsKey(instanceName)) { + continue; // Ill-formatted input; skip over this instance + } + InstanceConfig instanceConfig = instanceConfigs.get(instanceName); + + // Create an AssignableInstance + AssignableInstance assignableInstance = + new AssignableInstance(clusterConfig, instanceConfig, liveInstance); + _assignableInstanceMap.put(instanceConfig.getInstanceName(), assignableInstance); + LOG.debug("AssignableInstance created for instance: {}", instanceName); + } + + Map<String, JobConfig> jobConfigMap = taskDataCache.getJobConfigMap(); + + // Update task profiles by traversing all CurrentStates + for (Map.Entry<String, Resource> resourceEntry : resourceMap.entrySet()) { + String resourceName = resourceEntry.getKey(); + if (resourceEntry.getValue().getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) { + JobConfig jobConfig = jobConfigMap.get(resourceName); + JobContext jobContext = taskDataCache.getJobContext(resourceName); + String quotaType = getQuotaType(jobConfig); + Map<Partition, Map<String, String>> currentStateMap = + currentStateOutput.getCurrentStateMap(resourceName); + for (Map.Entry<Partition, Map<String, String>> currentStateMapEntry : currentStateMap + .entrySet()) { + Partition partition = currentStateMapEntry.getKey(); + String taskId = getTaskID(jobConfig, jobContext, partition); + for (Map.Entry<String, String> instanceCurrentStateEntry : currentStateMapEntry.getValue() + .entrySet()) { + String assignedInstance = instanceCurrentStateEntry.getKey(); + String taskState = instanceCurrentStateEntry.getValue(); + // If a task in in INIT or RUNNING state on the instance, this task should occupy one + // quota from this instance. + if (taskState.equals(TaskPartitionState.INIT.name()) + || taskState.equals(TaskPartitionState.RUNNING.name())) { + assignTaskToInstance(assignedInstance, jobConfig, taskId, quotaType); + } + } + } + Map<Partition, Map<String, Message>> pendingMessageMap = + currentStateOutput.getPendingMessageMap(resourceName); + for (Map.Entry<Partition, Map<String, Message>> pendingMessageMapEntry : pendingMessageMap + .entrySet()) { + Partition partition = pendingMessageMapEntry.getKey(); + String taskId = getTaskID(jobConfig, jobContext, partition); + for (Map.Entry<String, Message> instancePendingMessageEntry : pendingMessageMapEntry + .getValue().entrySet()) { + String assignedInstance = instancePendingMessageEntry.getKey(); + String messageToState = instancePendingMessageEntry.getValue().getToState(); + // If there is a pending message on the instance which has ToState of RUNNING, the task + // will run on the instance soon. So the task needs to occupy one quota on this instance. + if (messageToState.equals(TaskPartitionState.RUNNING.name()) + && !TaskPartitionState.INIT.name().equals( + currentStateOutput.getCurrentState(resourceName, partition, assignedInstance)) + && !TaskPartitionState.RUNNING.name().equals(currentStateOutput + .getCurrentState(resourceName, partition, assignedInstance))) { + assignTaskToInstance(assignedInstance, jobConfig, taskId, quotaType); + } + } + } + } + } + LOG.info( + "AssignableInstanceManager built AssignableInstances from scratch based on contexts in TaskDataCache due to Controller switch or ClusterConfig change."); + computeGlobalThreadBasedCapacity(); + } + + /** + * Assign the task to the instance's Assignable Instance + * @param instance + * @param jobConfig + * @param taskId + * @param quotaType + */ + private void assignTaskToInstance(String instance, JobConfig jobConfig, String taskId, + String quotaType) { + if (_assignableInstanceMap.containsKey(instance)) { + TaskConfig taskConfig = getTaskConfig(jobConfig, taskId); + AssignableInstance assignableInstance = _assignableInstanceMap.get(instance); + TaskAssignResult taskAssignResult = + assignableInstance.restoreTaskAssignResult(taskId, taskConfig, quotaType); + if (taskAssignResult.isSuccessful()) { + _taskAssignResultMap.put(taskId, taskAssignResult); + LOG.debug("TaskAssignResult restored for taskId: {}, assigned on instance: {}", taskId, + instance); + } + } else { + LOG.debug( + "While building AssignableInstance map, discovered that the instance a task is assigned to is no " + + "longer a LiveInstance! TaskAssignResult will not be created and no resource will be taken " + + "up for this task. TaskId: {}, Instance: {}", + taskId, instance); + } + } + + /** + * Extract the quota type information of the Job + * @param jobConfig + * @return + */ + private String getQuotaType(JobConfig jobConfig) { + // If jobConfig is null (job has been deleted but participant has not dropped the task yet), use + // default quota for the task + if (jobConfig == null || jobConfig.getJobType() == null) { + return AssignableInstance.DEFAULT_QUOTA_TYPE; + } + return jobConfig.getJobType(); + } + + /** + * Calculate the TaskID based on the JobConfig and JobContext information + * @param jobConfig + * @param jobContext + * @param partition + * @return + */ + private String getTaskID(JobConfig jobConfig, JobContext jobContext, Partition partition) { + if (jobConfig == null || jobContext == null) { + // If JobConfig or JobContext is null, use the partition name + return partition.getPartitionName(); + } + int taskIndex = TaskUtil.getPartitionId(partition.getPartitionName()); + String taskId = jobContext.getTaskIdForPartition(taskIndex); + if (taskId == null) { + // For targeted tasks, taskId will be null + // We instead use pName (see FixedTargetTaskAssignmentCalculator) + taskId = String.format("%s_%s", jobConfig.getJobId(), taskIndex); + } + return taskId; + } + + /** + * A method that return the task config a task based on the JonConfig information + * @param jobConfig + * @param taskId + * @return + */ + private TaskConfig getTaskConfig (JobConfig jobConfig, String taskId) { + if (jobConfig == null){ + return new TaskConfig(null, null, taskId, null); + } + return jobConfig.getTaskConfig(taskId); + } + + /** * Updates AssignableInstances when there are changes in LiveInstances or InstanceConfig. This * update only keeps an up-to-date count of AssignableInstances and does NOT re-build tasks * (because it's costly). diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java new file mode 100644 index 0000000..a118c6b --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java @@ -0,0 +1,189 @@ +package org.apache.helix.integration.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.Map; + +import java.util.concurrent.CountDownLatch; +import org.apache.helix.TestHelper; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.TaskCallbackContext; +import org.apache.helix.task.TaskFactory; +import org.apache.helix.task.TaskPartitionState; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.TaskStateModelFactory; +import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.Workflow; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableMap; + + +public class TestStuckTaskQuota extends TaskTestBase { + private CountDownLatch latch = new CountDownLatch(1); + + @BeforeClass + public void beforeClass() throws Exception { + _numNodes = 2; + super.beforeClass(); + + // Stop participants that have been started in super class + for (int i = 0; i < _numNodes; i++) { + super.stopParticipant(i); + Assert.assertFalse(_participants[i].isConnected()); + } + _participants = new MockParticipantManager[_numNodes]; + + // Start first participant + startParticipantAndRegisterNewMockTask(0); + } + + @AfterClass + public void afterClass() throws Exception { + super.afterClass(); + } + + @Test + public void testStuckTaskQuota() throws Exception { + String workflowName1 = TestHelper.getTestMethodName() + "_1"; + String workflowName2 = TestHelper.getTestMethodName() + "_2"; + String workflowName3 = TestHelper.getTestMethodName() + "_3"; + String jobName = "JOB0"; + JobConfig.Builder jobBuilder1 = + new JobConfig.Builder().setWorkflow(workflowName1).setNumberOfTasks(40) + .setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND) + .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999")); + + JobConfig.Builder jobBuilder2 = new JobConfig.Builder().setWorkflow(workflowName2) + .setNumberOfTasks(1).setCommand(MockTask.TASK_COMMAND) + .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999")); + + JobConfig.Builder jobBuilder3 = new JobConfig.Builder().setWorkflow(workflowName3) + .setNumberOfTasks(1).setCommand(MockTask.TASK_COMMAND) + .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999")); + + Workflow.Builder workflowBuilder1 = + new Workflow.Builder(workflowName1).addJob(jobName, jobBuilder1); + Workflow.Builder workflowBuilder2 = + new Workflow.Builder(workflowName2).addJob(jobName, jobBuilder2); + Workflow.Builder workflowBuilder3 = + new Workflow.Builder(workflowName3).addJob(jobName, jobBuilder3); + + _driver.start(workflowBuilder1.build()); + + // Make sure the JOB0 of workflow1 is started and all of the tasks are assigned to the + // participant 0 + _driver.pollForJobState(workflowName1, TaskUtil.getNamespacedJobName(workflowName1, jobName), + TaskState.IN_PROGRESS); + + String participant0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0); + for (int i = 0; i < 40; i++) { + int finalI = i; + Assert.assertTrue(TestHelper.verify(() -> (TaskPartitionState.RUNNING + .equals(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName1, jobName)) + .getPartitionState(finalI)) + && participant0 + .equals(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName1, jobName)) + .getAssignedParticipant(finalI))), + TestHelper.WAIT_DURATION)); + } + + // Start the second participant + startParticipantAndRegisterNewMockTask(1); + + _driver.start(workflowBuilder2.build()); + // Make sure the JOB0 of workflow2 is started and the only task of this job is assigned to + // participant1 + _driver.pollForJobState(workflowName2, TaskUtil.getNamespacedJobName(workflowName2, jobName), + TaskState.IN_PROGRESS); + String participant1 = PARTICIPANT_PREFIX + "_" + (_startPort + 1); + Assert.assertTrue(TestHelper.verify(() -> (TaskPartitionState.RUNNING.equals(_driver + .getJobContext(TaskUtil.getNamespacedJobName(workflowName2, jobName)).getPartitionState(0)) + && participant1 + .equals(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName2, jobName)) + .getAssignedParticipant(0))), + TestHelper.WAIT_DURATION)); + + // Delete the workflow1 + _driver.delete(workflowName1); + + // Since the tasks will be stuck for workflow1 after the deletion, the participant 0 is out of + // capacity. Hence, the new tasks should be assigned to participant 1 + _driver.start(workflowBuilder3.build()); + + // Make sure the JOB0 of workflow3 is started and the only task of this job is assigned to + // participant1 + _driver.pollForJobState(workflowName3, TaskUtil.getNamespacedJobName(workflowName3, jobName), + TaskState.IN_PROGRESS); + + Assert.assertTrue(TestHelper + .verify(() -> (TaskPartitionState.RUNNING + .equals(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName3, jobName)) + .getPartitionState(0))), + TestHelper.WAIT_DURATION) + && participant1 + .equals(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName3, jobName)) + .getAssignedParticipant(0))); + latch.countDown(); + // Stop the workflow2 and workflow3 + _driver.waitToStop(workflowName2, 5000L); + _driver.waitToStop(workflowName3, 5000L); + } + + private void startParticipantAndRegisterNewMockTask(int participantIndex) { + Map<String, TaskFactory> taskFactoryReg = new HashMap<>(); + taskFactoryReg.put(NewMockTask.TASK_COMMAND, NewMockTask::new); + String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + participantIndex); + _participants[participantIndex] = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + + // Register a Task state model factory. + StateMachineEngine stateMachine = _participants[participantIndex].getStateMachineEngine(); + stateMachine.registerStateModelFactory("Task", + new TaskStateModelFactory(_participants[participantIndex], taskFactoryReg)); + _participants[participantIndex].syncStart(); + } + + /** + * A mock task that extents MockTask class to count the number of cancel messages. + */ + private class NewMockTask extends MockTask { + + NewMockTask(TaskCallbackContext context) { + super(context); + } + + @Override + public void cancel() { + try { + latch.await(); + } catch (Exception e) { + // Pass + } + super.cancel(); + } + } +}
