Repository: helix
Updated Branches:
refs/heads/master 1b4e0bbb8 -> 41ff38670
Fix a bug in stopping workflows
There was an edge case that came to our attention lately in the stop-workflow
logic. The symptom we observed was that when a workflow is cancelled via
TaskDriver, its tasks would flip-flop between STOPPED and RUNNING states. This
was reproducible. Upon analysis, what was happening was that in
AbstractTaskDispatcher, when the current Task's state is STOPPED, sometimes the
Participant would try to process a message going to COMPLETE/ERROR state, and
since there is no direct transition in the Task state model going from STOPPED
to COMPLETE/ERROR, it was first going to the intermediate state of RUNNING. In
short, STOPPED->RUNNING was taking place, and when it hits the RUNNING state,
the Controller would send it back to STOPPED since the target state for the
workflow is STOP.
The fix for it was to simply add a check on the current Task state and the
target state of the parent workflow before we pass the requested state
assignment in AbstractTaskDispatcher. A test was added to ensure that
workflows' tasks are stopped properly when stopped via TaskDriver.
Changelist:
1. Add a check in AbstractTaskDispatcher before passing the requested state
transition assignment that if the current Task state is STOPPED and the target
state for the workflow is STOP, do not make the assignment and just continue
(NOP)
2. Add tests in TestStopWorkflow
3. Ensure that quota resource is released at stop time for tasks
4. Identify a unreachable code snippet
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/41ff3867
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/41ff3867
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/41ff3867
Branch: refs/heads/master
Commit: 41ff38670345e03c3274cb5f56468513026eacb8
Parents: 1b4e0bb
Author: Hunter Lee <[email protected]>
Authored: Fri Aug 3 11:18:59 2018 -0700
Committer: Lei Xia <[email protected]>
Committed: Fri Sep 21 15:41:56 2018 -0700
----------------------------------------------------------------------
.../helix/task/AbstractTaskDispatcher.java | 93 ++++---
.../integration/task/TestStopWorkflow.java | 246 ++++++++++++++++++-
2 files changed, 304 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/41ff3867/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
----------------------------------------------------------------------
diff --git
a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 9a5b899..bf33d77 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -64,17 +64,34 @@ public abstract class AbstractTaskDispatcher {
TaskPartitionState currState =
updateJobContextAndGetTaskCurrentState(currStateOutput,
jobResource, pId, pName, instance, jobCtx);
- // Check for pending state transitions on this (partition, instance).
+ // Check for pending state transitions on this (partition, instance).
If there is a pending
+ // state transition, we prioritize this pending state transition and
set the assignment from
+ // this pending state transition, essentially "waiting" until this
pending message clears
Message pendingMessage =
currStateOutput.getPendingMessage(jobResource, new
Partition(pName), instance);
if (pendingMessage != null &&
!pendingMessage.getToState().equals(currState.name())) {
+ // If there is a pending message whose destination state is
different from the current
+ // state, just make the same assignment as the pending message. This
is essentially
+ // "waiting" until this state transition is complete
processTaskWithPendingMessage(prevTaskToInstanceStateAssignment,
pId, pName, instance,
pendingMessage, jobState, currState, paMap, assignedPartitions);
continue;
}
- // Process any requested state transitions.
+ // Get AssignableInstance for this instance and TaskConfig for
releasing resources
+ String quotaType = jobCfg.getJobType();
+ AssignableInstance assignableInstance =
assignableInstanceMap.get(instance);
+ String taskId;
+ if (TaskUtil.isGenericTaskJob(jobCfg)) {
+ taskId = jobCtx.getTaskIdForPartition(pId);
+ } else {
+ taskId = pName;
+ }
+ TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
+
+ // Process any requested state transitions. If there is a requested
state transition, just
+ // "wait" until this state transition is complete
String requestedStateStr =
currStateOutput.getRequestedState(jobResource, new
Partition(pName), instance);
if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
@@ -85,6 +102,14 @@ public abstract class AbstractTaskDispatcher {
requestedState, instance));
}
+ // For STOPPED tasks, if the targetState is STOP, we should not
honor requestedState
+ // transition and make it a NOP
+ if (currState == TaskPartitionState.STOPPED && jobTgtState ==
TargetState.STOP) {
+ // This task is STOPPED and not going to be re-run, so release
this task
+ assignableInstance.release(taskConfig, quotaType);
+ continue;
+ }
+
paMap.put(pId, new PartitionAssignment(instance,
requestedState.name()));
assignedPartitions.add(pId);
if (LOG.isDebugEnabled()) {
@@ -95,17 +120,6 @@ public abstract class AbstractTaskDispatcher {
continue;
}
- // Get AssignableInstance for this instance and TaskConfig for
releasing resources
- String quotaType = jobCfg.getJobType();
- AssignableInstance assignableInstance =
assignableInstanceMap.get(instance);
- String taskId;
- if (TaskUtil.isGenericTaskJob(jobCfg)) {
- taskId = jobCtx.getTaskIdForPartition(pId);
- } else {
- taskId = pName;
- }
- TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
-
switch (currState) {
case RUNNING: {
TaskPartitionState nextState = TaskPartitionState.RUNNING;
@@ -128,23 +142,29 @@ public abstract class AbstractTaskDispatcher {
}
}
break;
- case STOPPED: {
- TaskPartitionState nextState;
- if (jobTgtState.equals(TargetState.START)) {
- nextState = TaskPartitionState.RUNNING;
- } else {
- nextState = TaskPartitionState.STOPPED;
- // This task is STOPPED and not going to be re-run, so release
this task
- assignableInstance.release(taskConfig, quotaType);
- }
+ case STOPPED: {
+ // TODO: This case statement might be unreachable code - Hunter
+ // This code may need to be removed because once a task is STOPPED
and its workflow's
+ // targetState is STOP, we do not assign that stopped task. Not
assigning means it will
+ // not be included in previousAssignment map in the next rebalance.
If it is not in
+ // prevInstanceToTaskAssignments, it will never hit this part of the
code
+ // When the parent workflow is to be resumed (target state is
START), then it will just be
+ // assigned as if it were being assigned for the first time
+ TaskPartitionState nextState;
+ if (jobTgtState.equals(TargetState.START)) {
+ nextState = TaskPartitionState.RUNNING;
+ } else {
+ nextState = TaskPartitionState.STOPPED;
+ // This task is STOPPED and not going to be re-run, so release
this task
+ assignableInstance.release(taskConfig, quotaType);
+ }
+ paMap.put(pId, new JobRebalancer.PartitionAssignment(instance,
nextState.name()));
+ assignedPartitions.add(pId);
- paMap.put(pId, new JobRebalancer.PartitionAssignment(instance,
nextState.name()));
- assignedPartitions.add(pId);
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Setting task partition %s state to %s
on instance %s.", pName,
- nextState, instance));
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Setting task partition %s state to %s on
instance %s.", pName, nextState, instance));
}
+ }
break;
case COMPLETED: {
// The task has completed on this partition. Mark as such in the
context object.
@@ -288,11 +308,25 @@ public abstract class AbstractTaskDispatcher {
return currentState;
}
+ /**
+ * Create an assignment based on an already-existing pending message. This
effectively lets the
+ * Controller to "wait" until the pending state transition has been
processed.
+ * @param prevAssignment
+ * @param pId
+ * @param pName
+ * @param instance
+ * @param pendingMessage
+ * @param jobState
+ * @param currState
+ * @param paMap
+ * @param assignedPartitions
+ */
private void processTaskWithPendingMessage(ResourceAssignment
prevAssignment, Integer pId,
String pName, String instance, Message pendingMessage, TaskState
jobState,
TaskPartitionState currState, Map<Integer, PartitionAssignment> paMap,
Set<Integer> assignedPartitions) {
+ // stateMap is a mapping of Instance -> TaskPartitionState (String)
Map<String, String> stateMap = prevAssignment.getReplicaMap(new
Partition(pName));
if (stateMap != null) {
String prevState = stateMap.get(instance);
@@ -783,7 +817,8 @@ public abstract class AbstractTaskDispatcher {
failedJobs++;
if (!cfg.isJobQueue() && failedJobs > cfg.getFailureThreshold()) {
ctx.setWorkflowState(TaskState.FAILED);
- LOG.info("Workflow {} reached the failure threshold, so setting its
state to FAILED.", cfg.getWorkflowId());
+ LOG.info("Workflow {} reached the failure threshold, so setting its
state to FAILED.",
+ cfg.getWorkflowId());
for (String jobToFail : cfg.getJobDag().getAllNodes()) {
if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
ctx.setJobState(jobToFail, TaskState.ABORTED);
http://git-wip-us.apache.org/repos/asf/helix/blob/41ff3867/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
----------------------------------------------------------------------
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
index 8b23f56..4a25a57 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
@@ -1,17 +1,33 @@
package org.apache.helix.integration.task;
import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskResult;
import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-
public class TestStopWorkflow extends TaskTestBase {
+ private boolean _taskFinishFlag = false;
+
@BeforeClass
public void beforeClass() throws Exception {
_numPartitions = 1;
@@ -22,8 +38,7 @@ public class TestStopWorkflow extends TaskTestBase {
public void testStopWorkflow() throws InterruptedException {
String jobQueueName = TestHelper.getTestMethodName();
JobConfig.Builder jobBuilder =
JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
- .setMaxAttemptsPerTask(1)
- .setWorkflow(jobQueueName)
+ .setMaxAttemptsPerTask(1).setWorkflow(jobQueueName)
.setJobCommandConfigMap(ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL,
"1"));
JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
@@ -35,11 +50,230 @@ public class TestStopWorkflow extends TaskTestBase {
_driver.pollForJobState(jobQueueName,
TaskUtil.getNamespacedJobName(jobQueueName, "job2_will_fail"),
TaskState.FAILED);
-
Assert.assertTrue(_driver.getWorkflowContext(jobQueueName).getWorkflowState().equals(TaskState.IN_PROGRESS));
+ Assert.assertTrue(
+
_driver.getWorkflowContext(jobQueueName).getWorkflowState().equals(TaskState.IN_PROGRESS));
// Now stop the workflow, and it should be stopped because all jobs have
completed or failed.
_driver.waitToStop(jobQueueName, 4000);
+ _driver.pollForWorkflowState(jobQueueName, TaskState.STOPPED);
+
+ Assert.assertTrue(
+
_driver.getWorkflowContext(jobQueueName).getWorkflowState().equals(TaskState.STOPPED));
+ }
+
+ /**
+ * Tests that stopping a workflow does result in its task ending up in
STOPPED state.
+ * @throws InterruptedException
+ */
+ @Test
+ public void testStopTask() throws InterruptedException {
+ stopTestSetup(1);
+
+ String workflowName = TestHelper.getTestMethodName();
+ Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
+ WorkflowConfig.Builder configBuilder = new
WorkflowConfig.Builder(workflowName);
+ configBuilder.setAllowOverlapJobAssignment(true);
+ workflowBuilder.setWorkflowConfig(configBuilder.build());
+
+ for (int i = 0; i < 1; i++) {
+ List<TaskConfig> taskConfigs = new ArrayList<>();
+ taskConfigs.add(new TaskConfig("StopTask", new HashMap<String,
String>()));
+ JobConfig.Builder jobConfigBulider = new
JobConfig.Builder().setCommand("Dummy")
+ .addTaskConfigs(taskConfigs).setJobCommandConfigMap(new
HashMap<String, String>());
+ workflowBuilder.addJob("JOB" + i, jobConfigBulider);
+ }
+
+ _driver.start(workflowBuilder.build());
+ _driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS);
+
+ // Stop the workflow
+ _driver.stop(workflowName);
+ _driver.pollForWorkflowState(workflowName, TaskState.STOPPED);
+
+ Assert.assertEquals(_driver.getWorkflowContext(_manager,
workflowName).getWorkflowState(),
+ TaskState.STOPPED);
+ }
+
+ /**
+ * Tests that stop() indeed frees up quotas for tasks belonging to the
stopped workflow.
+ * @throws InterruptedException
+ */
+ @Test
+ public void testStopTaskForQuota() throws InterruptedException {
+ stopTestSetup(1);
+
+ String workflowNameToStop = TestHelper.getTestMethodName();
+ Workflow.Builder workflowBuilderToStop = new
Workflow.Builder(workflowNameToStop);
+ WorkflowConfig.Builder configBuilderToStop = new
WorkflowConfig.Builder(workflowNameToStop);
+ configBuilderToStop.setAllowOverlapJobAssignment(true);
+ workflowBuilderToStop.setWorkflowConfig(configBuilderToStop.build());
+
+ // First create 50 jobs so that all 40 threads will be taken up
+ for (int i = 0; i < 50; i++) {
+ List<TaskConfig> taskConfigs = new ArrayList<>();
+ taskConfigs.add(new TaskConfig("StopTask", new HashMap<String,
String>()));
+ JobConfig.Builder jobConfigBulider = new
JobConfig.Builder().setCommand("Dummy")
+ .addTaskConfigs(taskConfigs).setJobCommandConfigMap(new
HashMap<String, String>());
+ workflowBuilderToStop.addJob("JOB" + i, jobConfigBulider);
+ }
+
+ _driver.start(workflowBuilderToStop.build());
+ _driver.pollForWorkflowState(workflowNameToStop, TaskState.IN_PROGRESS);
+
+ // Stop the workflow
+ _driver.stop(workflowNameToStop);
+
+ _driver.pollForWorkflowState(workflowNameToStop, TaskState.STOPPED);
+ Assert.assertEquals(_driver.getWorkflowContext(_manager,
workflowNameToStop).getWorkflowState(),
+ TaskState.STOPPED); // Check that the workflow has been stopped
+
+ // Generate another workflow to be completed this time around
+ String workflowToComplete = TestHelper.getTestMethodName() + "ToComplete";
+ Workflow.Builder workflowBuilderToComplete = new
Workflow.Builder(workflowToComplete);
+ WorkflowConfig.Builder configBuilderToComplete = new
WorkflowConfig.Builder(workflowToComplete);
+ configBuilderToComplete.setAllowOverlapJobAssignment(true);
+
workflowBuilderToComplete.setWorkflowConfig(configBuilderToComplete.build());
+
+ // Create 20 jobs that should complete
+ for (int i = 0; i < 20; i++) {
+ List<TaskConfig> taskConfigs = new ArrayList<>();
+ taskConfigs.add(new TaskConfig("CompleteTask", new HashMap<String,
String>()));
+ JobConfig.Builder jobConfigBulider = new
JobConfig.Builder().setCommand("Dummy")
+ .addTaskConfigs(taskConfigs).setJobCommandConfigMap(new
HashMap<String, String>());
+ workflowBuilderToComplete.addJob("JOB" + i, jobConfigBulider);
+ }
+
+ // Start the workflow to be completed
+ _driver.start(workflowBuilderToComplete.build());
+ _driver.pollForWorkflowState(workflowToComplete, TaskState.COMPLETED);
+ Assert.assertEquals(_driver.getWorkflowContext(_manager,
workflowToComplete).getWorkflowState(),
+ TaskState.COMPLETED);
+ }
+
+ /**
+ * Test that there is no thread leak when stopping and resuming.
+ * @throws InterruptedException
+ */
+ @Test
+ public void testResumeTaskForQuota() throws InterruptedException {
+ stopTestSetup(1);
+
+ String workflowName_1 = TestHelper.getTestMethodName();
+ Workflow.Builder workflowBuilder_1 = new Workflow.Builder(workflowName_1);
+ WorkflowConfig.Builder configBuilder_1 = new
WorkflowConfig.Builder(workflowName_1);
+ configBuilder_1.setAllowOverlapJobAssignment(true);
+ workflowBuilder_1.setWorkflowConfig(configBuilder_1.build());
+
+ // 30 jobs run first
+ for (int i = 0; i < 30; i++) {
+ List<TaskConfig> taskConfigs = new ArrayList<>();
+ taskConfigs.add(new TaskConfig("StopTask", new HashMap<String,
String>()));
+ JobConfig.Builder jobConfigBulider = new
JobConfig.Builder().setCommand("Dummy")
+ .addTaskConfigs(taskConfigs).setJobCommandConfigMap(new
HashMap<String, String>());
+ workflowBuilder_1.addJob("JOB" + i, jobConfigBulider);
+ }
+
+ _driver.start(workflowBuilder_1.build());
+
+ Thread.sleep(2000L); // Sleep until each task really is in progress
+ _driver.stop(workflowName_1);
+ _driver.pollForWorkflowState(workflowName_1, TaskState.STOPPED);
+
+ _taskFinishFlag = false;
+ _driver.resume(workflowName_1);
+ Thread.sleep(2000L); // Sleep until each task really is in progress
+
+ // By now there should only be 30 threads occupied
+
+ String workflowName_2 = TestHelper.getTestMethodName() + "_2";
+ Workflow.Builder workflowBuilder_2 = new Workflow.Builder(workflowName_2);
+ WorkflowConfig.Builder configBuilder_2 = new
WorkflowConfig.Builder(workflowName_2);
+ configBuilder_2.setAllowOverlapJobAssignment(true);
+ workflowBuilder_2.setWorkflowConfig(configBuilder_2.build());
+
+ // Try to run 10 jobs that complete
+ int numJobs = 10;
+ for (int i = 0; i < numJobs; i++) {
+ List<TaskConfig> taskConfigs = new ArrayList<>();
+ taskConfigs.add(new TaskConfig("CompleteTask", new HashMap<String,
String>()));
+ JobConfig.Builder jobConfigBulider = new
JobConfig.Builder().setCommand("Dummy")
+ .addTaskConfigs(taskConfigs).setJobCommandConfigMap(new
HashMap<String, String>());
+ workflowBuilder_2.addJob("JOB" + i, jobConfigBulider);
+ }
+
+ // If these jobs complete successfully, then that means there is no thread
leak
+ _driver.start(workflowBuilder_2.build());
+ Assert.assertEquals(_driver.pollForWorkflowState(workflowName_2,
TaskState.COMPLETED),
+ TaskState.COMPLETED);
+ }
+
+ /**
+ * Sets up an environment to make stop task testing easy. Shuts down all
Participants and starts
+ * only one Participant.
+ */
+ private void stopTestSetup(int numNodes) {
+ // Set task callbacks
+ Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+ TaskFactory taskFactory = new TaskFactory() {
+ @Override
+ public Task createNewTask(TaskCallbackContext context) {
+ return new StopTask(context);
+ }
+ };
+ TaskFactory taskFactoryComplete = new TaskFactory() {
+ @Override
+ public Task createNewTask(TaskCallbackContext context) {
+ return new MockTask(context);
+ }
+ };
+ taskFactoryReg.put("StopTask", taskFactory);
+ taskFactoryReg.put("CompleteTask", taskFactoryComplete);
+
+ stopParticipants();
+
+ for (int i = 0; i < numNodes; i++) {
+ String instanceName = _participants[i].getInstanceName();
+ _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME,
instanceName);
+ // Register a Task state model factory.
+ StateMachineEngine stateMachine =
_participants[i].getStateMachineEngine();
+ stateMachine.registerStateModelFactory("Task",
+ new TaskStateModelFactory(_participants[i], taskFactoryReg));
+
+ _participants[i].syncStart();
+ }
+ }
+
+ /**
+ * A mock task class that models a short-lived task to be stopped.
+ */
+ private class StopTask extends MockTask {
+ StopTask(TaskCallbackContext context) {
+ super(context);
+ }
+
+ @Override
+ public TaskResult run() {
+ while (!_taskFinishFlag) {
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ // This wait is to prevent the task from completing before being stopped
+ try {
+ Thread.sleep(500L);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ return new TaskResult(TaskResult.Status.COMPLETED, "");
+ }
-
Assert.assertTrue(_driver.getWorkflowContext(jobQueueName).getWorkflowState().equals(TaskState.STOPPED));
+ @Override
+ public void cancel() {
+ _taskFinishFlag = true;
+ }
}
-}
\ No newline at end of file
+}