Be able to stop workflow when no job is running. Currently, to stop a workflow, the target state of the workflow is set to STOP, then when each job(as a resource in ideal state) was processed in job rebalancer, it will check whether all the jobs in the workflow is done(not in IN_PROGRESS or STOPPING) and set the workflow state to be STOP. However, if all the jobs are already done, thereâs no job in ideal state to process, so the workflow state never gets a chance to be set to STOP.
This commit adds a check in workflow rebalancer to set the state when all jobs are already done. A test is added to test specifically this case. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/408082a3 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/408082a3 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/408082a3 Branch: refs/heads/master Commit: 408082a33d91f84556c3da31232fb6d4097b4371 Parents: 94f3961 Author: Weihan Kong <[email protected]> Authored: Mon Feb 13 13:52:16 2017 -0800 Committer: Junkai Xue <[email protected]> Committed: Tue Oct 3 15:08:33 2017 -0700 ---------------------------------------------------------------------- .../apache/helix/task/WorkflowRebalancer.java | 4 ++ .../integration/task/TestStopWorkflow.java | 45 ++++++++++++++++++++ 2 files changed, 49 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/408082a3/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java index 830f93a..8e72f7a 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java @@ -76,6 +76,10 @@ public class WorkflowRebalancer extends TaskRebalancer { if (targetState == TargetState.STOP) { LOG.info("Workflow " + workflow + "is marked as stopped."); + if (isWorkflowStopped(workflowCtx, workflowCfg)) { + workflowCtx.setWorkflowState(TaskState.STOPPED); + TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx); + } return buildEmptyAssignment(workflow, currStateOutput); } http://git-wip-us.apache.org/repos/asf/helix/blob/408082a3/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java new file mode 100644 index 0000000..b641698 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java @@ -0,0 +1,45 @@ +package org.apache.helix.integration.task; + +import com.google.common.collect.ImmutableMap; +import org.apache.helix.TestHelper; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobQueue; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.TaskUtil; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestStopWorkflow extends TaskTestBase { + @BeforeClass + public void beforeClass() throws Exception { + _numParitions = 1; + super.beforeClass(); + } + + @Test + public void testStopWorkflow() throws InterruptedException { + String jobQueueName = TestHelper.getTestMethodName(); + JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG) + .setMaxAttemptsPerTask(1) + .setWorkflow(jobQueueName) + .setJobCommandConfigMap(ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, "1")); + + JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName); + jobQueue.enqueueJob("job1_will_succeed", jobBuilder); + jobQueue.enqueueJob("job2_will_fail", jobBuilder); + _driver.start(jobQueue.build()); + + // job1 should succeed and job2 should fail, wait until that happens + _driver.pollForJobState(jobQueueName, + TaskUtil.getNamespacedJobName(jobQueueName, "job2_will_fail"), TaskState.FAILED); + + Assert.assertTrue(_driver.getWorkflowContext(jobQueueName).getWorkflowState().equals(TaskState.IN_PROGRESS)); + + // Now stop the workflow, and it should be stopped because all jobs have completed or failed. + _driver.waitToStop(jobQueueName, 4000); + + Assert.assertTrue(_driver.getWorkflowContext(jobQueueName).getWorkflowState().equals(TaskState.STOPPED)); + } +}
