This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit f9f89a79768156ef7341262cbb25a40d7dafeb1e Author: narendly <naren...@gmail.com> AuthorDate: Mon Feb 25 17:51:17 2019 -0800 [HELIX-795] TASK: Drop tasks upon Participant reconnect This changes the default reset() behavior for tasks on Participants. Previously, it would send all task partitions to INIT. After this change, the task partitions will inherit the states from the previous session, and their RequestedState will be set to DROPPED. Then the Controller will send messages to drop the said task partitions so that there are no quota/resource leaks for the number of tasks on Participants. Changelist: 1. Modify state transition logic so that drop state transitions messages will be honored 2. Modify CurrentState copy-over logic 3. Add an integration test: TestDropOnParticipantReset --- .../helix/manager/zk/CurStateCarryOverUpdater.java | 18 +++- .../helix/participant/HelixStateMachineEngine.java | 16 +++- .../java/org/apache/helix/task/JobDispatcher.java | 45 +++++++--- .../java/org/apache/helix/task/TaskStateModel.java | 13 ++- .../task/TestDropOnParticipantReset.java | 95 ++++++++++++++++++++++ 5 files changed, 167 insertions(+), 20 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java index b96de18..f52a669 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java @@ -22,6 +22,9 @@ package org.apache.helix.manager.zk; import org.I0Itec.zkclient.DataUpdater; import org.apache.helix.ZNRecord; import org.apache.helix.model.CurrentState; +import org.apache.helix.task.TaskConstants; +import org.apache.helix.task.TaskPartitionState; + /** * updater for carrying over last current states @@ -47,7 +50,7 @@ class CurStateCarryOverUpdater implements DataUpdater<ZNRecord> { @Override public ZNRecord update(ZNRecord currentData) { - CurrentState curState = null; + CurrentState curState; if (currentData == null) { curState = new CurrentState(_lastCurState.getId()); // copy all simple fields settings and overwrite session-id to current session @@ -58,9 +61,16 @@ class CurStateCarryOverUpdater implements DataUpdater<ZNRecord> { } for (String partitionName : _lastCurState.getPartitionStateMap().keySet()) { - // carry-over only when current-state not exist - if (curState.getState(partitionName) == null) { - curState.setState(partitionName, _initState); + // For tasks, we preserve previous session's CurrentStates and set RequestState to DROPPED so + // that they will be dropped by the Controller + if (_lastCurState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) { + curState.setState(partitionName, _lastCurState.getState(partitionName)); + curState.setRequestedState(partitionName, TaskPartitionState.DROPPED.name()); + } else { + // carry-over only when current-state does not exist for regular Helix resource partitions + if (curState.getState(partitionName) == null) { + curState.setState(partitionName, _initState); + } } } return curState.getRecord(); diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java index e235201..c75b0b8 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java +++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java @@ -45,6 +45,8 @@ import org.apache.helix.model.StateModelDefinition; import org.apache.helix.participant.statemachine.StateModel; import org.apache.helix.participant.statemachine.StateModelFactory; import org.apache.helix.participant.statemachine.StateModelParser; +import org.apache.helix.task.TaskConstants; +import org.apache.helix.task.TaskPartitionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -215,14 +217,22 @@ public class HelixStateMachineEngine implements StateMachineEngine { _stateModelDefs.put(stateModelName, stateModelDef); } - if (message.getBatchMessageMode() == false) { + if (!message.getBatchMessageMode()) { String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState(); StateModel stateModel = stateModelFactory.getStateModel(resourceName, partitionKey); if (stateModel == null) { stateModel = stateModelFactory.createAndAddStateModel(resourceName, partitionKey); - stateModel.updateState(initState); + if (stateModelName.equals(TaskConstants.STATE_MODEL_NAME) + && message.getToState().equals(TaskPartitionState.DROPPED.name())) { + // If stateModel is null, that means there was a reboot of the Participant. Then the + // purpose of this first message must be to drop the task. We manually set the current + // state to be the same state of fromState (which Controller inferred from JobContext) to + // allow the Participant to successfully process this dropping transition + stateModel.updateState(message.getFromState()); + } else { + stateModel.updateState(initState); + } } - if (message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) { return new HelixStateTransitionCancellationHandler(stateModel, message, context); } else { diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java index 408c9f1..d72db7f 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java @@ -25,11 +25,11 @@ import org.slf4j.LoggerFactory; public class JobDispatcher extends AbstractTaskDispatcher { private static final Logger LOG = LoggerFactory.getLogger(JobDispatcher.class); - private static final Set<TaskState> intemediateStates = new HashSet<>( - Arrays.asList(TaskState.IN_PROGRESS, TaskState.NOT_STARTED, TaskState.STOPPING, TaskState.STOPPED)); + // Intermediate states (meaning they are not terminal states) for workflows and jobs + private static final Set<TaskState> INTERMEDIATE_STATES = new HashSet<>(Arrays + .asList(TaskState.IN_PROGRESS, TaskState.NOT_STARTED, TaskState.STOPPING, TaskState.STOPPED)); private WorkflowControllerDataProvider _dataProvider; - public void updateCache(WorkflowControllerDataProvider cache) { _dataProvider = cache; } @@ -127,7 +127,7 @@ public class JobDispatcher extends AbstractTaskDispatcher { jobState = workflowCtx.getJobState(jobName); workflowState = workflowCtx.getWorkflowState(); - if (intemediateStates.contains(jobState) && (isTimeout(jobCtx.getStartTime(), jobCfg.getTimeout()) + if (INTERMEDIATE_STATES.contains(jobState) && (isTimeout(jobCtx.getStartTime(), jobCfg.getTimeout()) || TaskState.TIMED_OUT.equals(workflowState))) { jobState = TaskState.TIMING_OUT; workflowCtx.setJobState(jobName, TaskState.TIMING_OUT); @@ -206,7 +206,7 @@ public class JobDispatcher extends AbstractTaskDispatcher { Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments = getPrevInstanceToTaskAssignments(liveInstances, prevTaskToInstanceStateAssignment, - allPartitions); + allPartitions, currStateOutput, jobResource); long currentTime = System.currentTimeMillis(); if (LOG.isDebugEnabled()) { @@ -365,28 +365,53 @@ public class JobDispatcher extends AbstractTaskDispatcher { * @param liveInstances * @param prevAssignment task partition -> (instance -> state) * @param allTaskPartitions all task partitionIds + * @param currStateOutput currentStates to make sure currentStates copied over expired sessions + * are accounted for + * @param jobName job name * @return instance -> partitionIds from previous assignment, if the instance is still live */ private static Map<String, SortedSet<Integer>> getPrevInstanceToTaskAssignments( Iterable<String> liveInstances, ResourceAssignment prevAssignment, - Set<Integer> allTaskPartitions) { - Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>(); + Set<Integer> allTaskPartitions, CurrentStateOutput currStateOutput, String jobName) { + Map<String, SortedSet<Integer>> result = new HashMap<>(); for (String instance : liveInstances) { result.put(instance, new TreeSet<Integer>()); } + // First, add all task partitions from JobContext for (Partition partition : prevAssignment.getMappedPartitions()) { int pId = TaskUtil.getPartitionId(partition.getPartitionName()); if (allTaskPartitions.contains(pId)) { Map<String, String> replicaMap = prevAssignment.getReplicaMap(partition); for (String instance : replicaMap.keySet()) { - SortedSet<Integer> pList = result.get(instance); - if (pList != null) { - pList.add(pId); + SortedSet<Integer> pIdSet = result.get(instance); + if (pIdSet != null) { + pIdSet.add(pId); } } } } + + // Add all pIds existing in CurrentStateOutput as well because task currentStates copied over + // from previous sessions won't show up in prevInstanceToTaskAssignments + // We need to add these back here in order for these task partitions to be dropped (after a + // copy-over, the Controller will send a message to drop the state currentState) + // partitions: (partition -> instance -> currentState) + Map<Partition, Map<String, String>> partitions = currStateOutput.getCurrentStateMap(jobName); + for (Map.Entry<Partition, Map<String, String>> entry : partitions.entrySet()) { + // Get all (instance -> currentState) mappings + for (Map.Entry<String, String> instanceToCurrState : entry.getValue().entrySet()) { + String instance = instanceToCurrState.getKey(); + String requestedState = + currStateOutput.getRequestedState(jobName, entry.getKey(), instance); + int pId = TaskUtil.getPartitionId(entry.getKey().getPartitionName()); + if (result.containsKey(instance) && requestedState != null + && requestedState.equals(TaskPartitionState.DROPPED.name())) { + // Only if this instance is live and requestedState is DROPPED + result.get(instance).add(pId); + } + } + } return result; } diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java index 90ab2fb..34d6842 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java @@ -21,7 +21,9 @@ package org.apache.helix.task; import java.util.Map; import java.util.TimerTask; -import java.util.concurrent.*; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import org.apache.helix.HelixManager; import org.apache.helix.NotificationContext; @@ -189,8 +191,13 @@ public class TaskStateModel extends StateModel { public void onBecomeDroppedFromRunning(Message msg, NotificationContext context) { String taskPartition = msg.getPartitionName(); if (_taskRunner == null) { - throw new IllegalStateException(String.format( - "Invalid state transition. There is no running task for partition %s.", taskPartition)); + if (timeout_task != null) { + timeout_task.cancel(true); + } + LOG.error( + "The thread running the task partition {} was not found while attempting to cancel this task; Manual cleanup may be required for this task.", + taskPartition); + return; } _taskRunner.cancel(); diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDropOnParticipantReset.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestDropOnParticipantReset.java new file mode 100644 index 0000000..5dbde94 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDropOnParticipantReset.java @@ -0,0 +1,95 @@ +package org.apache.helix.integration.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.List; +import org.apache.helix.TestHelper; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.TaskConfig; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.Workflow; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestDropOnParticipantReset extends TaskTestBase { + + @BeforeClass + public void beforeClass() throws Exception { + _numDbs = 0; + _numPartitions = 0; + _numReplicas = 0; + _numNodes = 1; // Only bring up 1 instance + super.beforeClass(); + } + + /** + * Tests that upon Participant reconnect, the Controller correctly sends the current + * state of the task partition to DROPPED. This is to avoid a resource leak in case of a + * Participant disconnect/reconnect and an ensuing reset() on all of the partitions on that + * Participant. + */ + @Test + public void testDropOnParticipantReset() throws InterruptedException { + // Create a workflow with some long-running jobs in progress + String workflowName = TestHelper.getTestMethodName(); + String jobName = "JOB"; + Workflow.Builder builder = new Workflow.Builder(workflowName); + List<TaskConfig> taskConfigs = new ArrayList<>(); + for (int j = 0; j < 2; j++) { // 2 tasks to ensure that they execute + String taskID = jobName + "_TASK_" + j; + TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder(); + taskConfigBuilder.setTaskId(taskID).setCommand(MockTask.TASK_COMMAND) + .addConfig(MockTask.JOB_DELAY, "3000"); + taskConfigs.add(taskConfigBuilder.build()); + } + JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND) + .setMaxAttemptsPerTask(10).setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG) + .addTaskConfigs(taskConfigs).setIgnoreDependentJobFailure(true) + // 1 task at a time + .setNumConcurrentTasksPerInstance(1); + builder.addJob(jobName, jobBuilder); + + // Modify maxConcurrentTask for the instance so that it only accepts 1 task at most + InstanceConfig instanceConfig = _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, _participants[0].getInstanceName()); + instanceConfig.setMaxConcurrentTask(1); + _gSetupTool.getClusterManagementTool().setInstanceConfig(CLUSTER_NAME, + _participants[0].getInstanceName(), instanceConfig); + + // Start the workflow + _driver.start(builder.build()); + _driver.pollForJobState(workflowName, workflowName + "_" + jobName, TaskState.IN_PROGRESS); + Thread.sleep(1500L); // Wait for the Participant to process the message + // Stop and start the participant to mimic a connection issue + _participants[0].syncStop(); + // Upon starting the participant, the first task partition should be dropped and assigned anew + // on the instance. Then the rest of the tasks will execute and the workflow will complete + startParticipant(0); + + TaskState workflowState = _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED); + TaskState jobState = + _driver.pollForJobState(workflowName, workflowName + "_" + jobName, TaskState.COMPLETED); + Assert.assertEquals(workflowState, TaskState.COMPLETED); + Assert.assertEquals(jobState, TaskState.COMPLETED); + } +}