This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit c122577bd7d5e5e90b4db2db2629e115331d39e7 Author: Ali Reza Zamani Zadeh Najari <[email protected]> AuthorDate: Thu Jun 4 10:52:54 2020 -0700 Remove previousAssignment in processTaskWithPendingMessage method (#1040) Remove previousAssignment in processTaskWithPendingMessage method The processTaskWithPendingMessage method is relying on the previousAssignment. In this commit, this method has been modified and previousAssignment has been replaced with currentState. --- .../apache/helix/task/AbstractTaskDispatcher.java | 74 +++++------ .../java/org/apache/helix/task/JobDispatcher.java | 7 +- ...eviousAssignedTaskStatusWithPendingMessage.java | 148 +++++++++++++++++++++ 3 files changed, 183 insertions(+), 46 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java index 8934337..fa12203 100644 --- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java @@ -66,9 +66,9 @@ public abstract class AbstractTaskDispatcher { // Job Update related methods public void updatePreviousAssignedTasksStatus( - Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments, Set<String> excludedInstances, - String jobResource, CurrentStateOutput currStateOutput, JobContext jobCtx, JobConfig jobCfg, - ResourceAssignment prevTaskToInstanceStateAssignment, TaskState jobState, + Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments, + Set<String> excludedInstances, String jobResource, CurrentStateOutput currStateOutput, + JobContext jobCtx, JobConfig jobCfg, TaskState jobState, Map<String, Set<Integer>> assignedPartitions, Set<Integer> partitionsToDropFromIs, Map<Integer, PartitionAssignment> paMap, TargetState jobTgtState, Set<Integer> skippedPartitions, WorkflowControllerDataProvider cache, @@ -130,8 +130,8 @@ public abstract class AbstractTaskDispatcher { // If there is a pending message whose destination state is different from the current // state, just make the same assignment as the pending message. This is essentially // "waiting" until this state transition is complete - processTaskWithPendingMessage(prevTaskToInstanceStateAssignment, pId, pName, instance, - pendingMessage, jobState, currState, paMap, assignedPartitions); + processTaskWithPendingMessage(pId, pName, instance, pendingMessage, jobState, currState, + paMap, assignedPartitions); continue; } @@ -300,6 +300,9 @@ public abstract class AbstractTaskDispatcher { // In this case, tasks' IdealState will be removed, and they will be sent to DROPPED partitionsToDropFromIs.add(pId); + assignedPartitions.get(instance).add(pId); + paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name())); + // Also release resources for these tasks assignableInstanceManager.release(instance, taskConfig, quotaType); break; @@ -439,7 +442,6 @@ public abstract class AbstractTaskDispatcher { /** * Create an assignment based on an already-existing pending message. This effectively lets the * Controller to "wait" until the pending state transition has been processed. - * @param prevAssignment * @param pId * @param pName * @param instance @@ -449,43 +451,31 @@ public abstract class AbstractTaskDispatcher { * @param paMap * @param assignedPartitions */ - private void processTaskWithPendingMessage(ResourceAssignment prevAssignment, Integer pId, - String pName, String instance, Message pendingMessage, TaskState jobState, - TaskPartitionState currState, Map<Integer, PartitionAssignment> paMap, - Map<String, Set<Integer>> assignedPartitions) { - - // stateMap is a mapping of Instance -> TaskPartitionState (String) - Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName)); - if (stateMap != null) { - String prevState = stateMap.get(instance); - if (!pendingMessage.getToState().equals(prevState)) { - LOG.warn(String.format( - "Task pending to-state is %s while previous assigned state is %s. This should not" - + "happen.", - pendingMessage.getToState(), prevState)); + private void processTaskWithPendingMessage(Integer pId, String pName, String instance, + Message pendingMessage, TaskState jobState, TaskPartitionState currState, + Map<Integer, PartitionAssignment> paMap, Map<String, Set<Integer>> assignedPartitions) { + + if (jobState == TaskState.TIMING_OUT && currState == TaskPartitionState.INIT + && pendingMessage.getToState().equals(TaskPartitionState.RUNNING.name())) { + // While job is timing out, if the task is pending on INIT->RUNNING, set it back to INIT, + // so that Helix will cancel the transition. + paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.INIT.name())); + assignedPartitions.get(instance).add(pId); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Task partition %s has a pending state transition on instance %s INIT->RUNNING. CurrentState is %s " + + "Setting it back to INIT so that Helix can cancel the transition(if enabled).", + pName, instance, currState.name())); } - if (jobState == TaskState.TIMING_OUT && currState == TaskPartitionState.INIT - && prevState.equals(TaskPartitionState.RUNNING.name())) { - // While job is timing out, if the task is pending on INIT->RUNNING, set it back to INIT, - // so that Helix will cancel the transition. - paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.INIT.name())); - assignedPartitions.get(instance).add(pId); - if (LOG.isDebugEnabled()) { - LOG.debug(String.format( - "Task partition %s has a pending state transition on instance %s INIT->RUNNING. Previous state %s" - + "Setting it back to INIT so that Helix can cancel the transition(if enabled).", - pName, instance, prevState)); - } - } else { - // Otherwise, Just copy forward - // the state assignment from the previous ideal state. - paMap.put(pId, new PartitionAssignment(instance, prevState)); - assignedPartitions.get(instance).add(pId); - if (LOG.isDebugEnabled()) { - LOG.debug(String.format( - "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.", - pName, instance, prevState)); - } + } else { + // Otherwise, Just copy forward + // the state assignment from the pending message + paMap.put(pId, new PartitionAssignment(instance, pendingMessage.getToState())); + assignedPartitions.get(instance).add(pId); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Task partition %s has a pending state transition on instance %s. Using the pending message ToState which was %s.", + pName, instance, pendingMessage.getToState())); } } } diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java index b35252c..10a1b7c 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java @@ -251,10 +251,9 @@ public class JobDispatcher extends AbstractTaskDispatcher { } // Release resource for tasks in terminal state - updatePreviousAssignedTasksStatus(currentInstanceToTaskAssignments, excludedInstances, jobResource, - currStateOutput, jobCtx, jobCfg, prevTaskToInstanceStateAssignment, jobState, - assignedPartitions, partitionsToDropFromIs, paMap, jobTgtState, skippedPartitions, cache, - tasksToDrop); + updatePreviousAssignedTasksStatus(currentInstanceToTaskAssignments, excludedInstances, + jobResource, currStateOutput, jobCtx, jobCfg, jobState, assignedPartitions, + partitionsToDropFromIs, paMap, jobTgtState, skippedPartitions, cache, tasksToDrop); addGiveupPartitions(skippedPartitions, jobCtx, allPartitions, jobCfg); diff --git a/helix-core/src/test/java/org/apache/helix/task/TestUpdatePreviousAssignedTaskStatusWithPendingMessage.java b/helix-core/src/test/java/org/apache/helix/task/TestUpdatePreviousAssignedTaskStatusWithPendingMessage.java new file mode 100644 index 0000000..f9127e1 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/task/TestUpdatePreviousAssignedTaskStatusWithPendingMessage.java @@ -0,0 +1,148 @@ +package org.apache.helix.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; +import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.Message; +import org.apache.helix.model.Partition; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * This test checks the scheduling decision for the task that has already been assigned to an + * instance and there exists a message pending for that task. + */ +public class TestUpdatePreviousAssignedTaskStatusWithPendingMessage { + private static final String WORKFLOW_NAME = "TestWorkflow"; + private static final String INSTANCE_NAME = "TestInstance"; + private static final String JOB_NAME = "TestJob"; + private static final String PARTITION_NAME = "0"; + private static final String TARGET_RESOURCES = "TestDB"; + private static final int PARTITION_ID = 0; + + /** + * Scenario: + * JobState = TIMING_OUT + * Task State: Context= INIT, CurrentState = INIT + * Pending Message: FromState = INIT, ToState = RUNNING + */ + @Test + public void testTaskWithPendingMessageWhileJobTimingOut() { + JobDispatcher jobDispatcher = new JobDispatcher(); + // Preparing the inputs + Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments = new HashMap<>(); + SortedSet<Integer> tasks = new TreeSet<>(); + tasks.add(PARTITION_ID); + currentInstanceToTaskAssignments.put(INSTANCE_NAME, tasks); + Map<Integer, AbstractTaskDispatcher.PartitionAssignment> paMap = new TreeMap<>(); + CurrentStateOutput currentStateOutput = prepareCurrentState(TaskPartitionState.INIT, + TaskPartitionState.INIT, TaskPartitionState.RUNNING); + JobContext jobContext = prepareJobContext(TaskPartitionState.INIT); + JobConfig jobConfig = prepareJobConfig(); + Map<String, Set<Integer>> tasksToDrop = new HashMap<>(); + tasksToDrop.put(INSTANCE_NAME, new HashSet<>()); + WorkflowControllerDataProvider cache = new WorkflowControllerDataProvider(); + jobDispatcher.updatePreviousAssignedTasksStatus(currentInstanceToTaskAssignments, + new HashSet<>(), JOB_NAME, currentStateOutput, jobContext, jobConfig, TaskState.TIMING_OUT, + new HashMap<>(), new HashSet<>(), paMap, TargetState.STOP, new HashSet<>(), cache, + tasksToDrop); + Assert.assertEquals(paMap.get(0)._state, TaskPartitionState.INIT.name()); + } + + /** + * Scenario: + * JobState = IN_PROGRESS + * Task State: Context= RUNNING, CurrentState = RUNNING + * Pending Message: FromState = RUNNING, ToState = DROPPED + */ + @Test + public void testTaskWithPendingMessage() { + JobDispatcher jobDispatcher = new JobDispatcher(); + // Preparing the inputs + Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments = new HashMap<>(); + SortedSet<Integer> tasks = new TreeSet<>(); + tasks.add(PARTITION_ID); + currentInstanceToTaskAssignments.put(INSTANCE_NAME, tasks); + Map<Integer, AbstractTaskDispatcher.PartitionAssignment> paMap = new TreeMap<>(); + CurrentStateOutput currentStateOutput = prepareCurrentState(TaskPartitionState.RUNNING, + TaskPartitionState.RUNNING, TaskPartitionState.DROPPED); + JobContext jobContext = prepareJobContext(TaskPartitionState.RUNNING); + JobConfig jobConfig = prepareJobConfig(); + Map<String, Set<Integer>> tasksToDrop = new HashMap<>(); + tasksToDrop.put(INSTANCE_NAME, new HashSet<>()); + WorkflowControllerDataProvider cache = new WorkflowControllerDataProvider(); + jobDispatcher.updatePreviousAssignedTasksStatus(currentInstanceToTaskAssignments, + new HashSet<>(), JOB_NAME, currentStateOutput, jobContext, jobConfig, TaskState.IN_PROGRESS, + new HashMap<>(), new HashSet<>(), paMap, TargetState.START, new HashSet<>(), cache, + tasksToDrop); + Assert.assertEquals(paMap.get(0)._state, TaskPartitionState.DROPPED.name()); + } + + private JobConfig prepareJobConfig() { + JobConfig.Builder jobConfigBuilder = new JobConfig.Builder(); + jobConfigBuilder.setWorkflow(WORKFLOW_NAME); + jobConfigBuilder.setCommand("TestCommand"); + jobConfigBuilder.setJobId(JOB_NAME); + List<String> targetPartition = new ArrayList<>(); + jobConfigBuilder.setTargetPartitions(targetPartition); + List<TaskConfig> taskConfigs = new ArrayList<>(); + TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder(); + taskConfigBuilder.setTaskId("0"); + taskConfigs.add(taskConfigBuilder.build()); + jobConfigBuilder.addTaskConfigs(taskConfigs); + return jobConfigBuilder.build(); + } + + private JobContext prepareJobContext(TaskPartitionState taskPartitionState) { + ZNRecord record = new ZNRecord(JOB_NAME); + JobContext jobContext = new JobContext(record); + jobContext.setStartTime(0L); + jobContext.setName(JOB_NAME); + jobContext.setStartTime(0L); + jobContext.setPartitionState(PARTITION_ID, taskPartitionState); + jobContext.setPartitionTarget(PARTITION_ID, TARGET_RESOURCES + "_0"); + return jobContext; + } + + private CurrentStateOutput prepareCurrentState(TaskPartitionState currentState, + TaskPartitionState messageFromState, TaskPartitionState messageToState) { + CurrentStateOutput currentStateOutput = new CurrentStateOutput(); + currentStateOutput.setResourceStateModelDef(JOB_NAME, "TASK"); + Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME); + currentStateOutput.setCurrentState(JOB_NAME, taskPartition, INSTANCE_NAME, currentState.name()); + Message message = new Message(Message.MessageType.STATE_TRANSITION, "123456789"); + message.setFromState(messageFromState.name()); + message.setToState(messageToState.name()); + currentStateOutput.setPendingMessage(JOB_NAME, taskPartition, INSTANCE_NAME, message); + return currentStateOutput; + } +}
