Be able to stop workflow when no job is running.

Currently, to stop a workflow, the target state of the workflow is set to STOP, 
then when each job(as a resource in ideal state) was processed in job 
rebalancer, it will check whether all the jobs in the workflow is done(not in 
IN_PROGRESS or STOPPING) and set the workflow state to be STOP.
However, if all the jobs are already done, there’s no job in ideal state to 
process, so the workflow state never gets a chance to be set to STOP.

This commit adds a check in workflow rebalancer to set the state when all jobs 
are already done.

A test is added to test specifically this case.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/408082a3
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/408082a3
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/408082a3

Branch: refs/heads/master
Commit: 408082a33d91f84556c3da31232fb6d4097b4371
Parents: 94f3961
Author: Weihan Kong <[email protected]>
Authored: Mon Feb 13 13:52:16 2017 -0800
Committer: Junkai Xue <[email protected]>
Committed: Tue Oct 3 15:08:33 2017 -0700

----------------------------------------------------------------------
 .../apache/helix/task/WorkflowRebalancer.java   |  4 ++
 .../integration/task/TestStopWorkflow.java      | 45 ++++++++++++++++++++
 2 files changed, 49 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/408082a3/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 830f93a..8e72f7a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -76,6 +76,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
 
     if (targetState == TargetState.STOP) {
       LOG.info("Workflow " + workflow + "is marked as stopped.");
+      if (isWorkflowStopped(workflowCtx, workflowCfg)) {
+        workflowCtx.setWorkflowState(TaskState.STOPPED);
+        TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
+      }
       return buildEmptyAssignment(workflow, currStateOutput);
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/408082a3/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
new file mode 100644
index 0000000..b641698
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
@@ -0,0 +1,45 @@
+package org.apache.helix.integration.task;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestStopWorkflow extends TaskTestBase {
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numParitions = 1;
+    super.beforeClass();
+  }
+
+  @Test
+  public void testStopWorkflow() throws InterruptedException {
+    String jobQueueName = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder = 
JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setMaxAttemptsPerTask(1)
+        .setWorkflow(jobQueueName)
+        
.setJobCommandConfigMap(ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, 
"1"));
+
+    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
+    jobQueue.enqueueJob("job1_will_succeed", jobBuilder);
+    jobQueue.enqueueJob("job2_will_fail", jobBuilder);
+    _driver.start(jobQueue.build());
+
+    // job1 should succeed and job2 should fail, wait until that happens
+    _driver.pollForJobState(jobQueueName,
+        TaskUtil.getNamespacedJobName(jobQueueName, "job2_will_fail"), 
TaskState.FAILED);
+
+    
Assert.assertTrue(_driver.getWorkflowContext(jobQueueName).getWorkflowState().equals(TaskState.IN_PROGRESS));
+
+    // Now stop the workflow, and it should be stopped because all jobs have 
completed or failed.
+    _driver.waitToStop(jobQueueName, 4000);
+
+    
Assert.assertTrue(_driver.getWorkflowContext(jobQueueName).getWorkflowState().equals(TaskState.STOPPED));
+  }
+}

Reply via email to