This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new cfd270f  Fix the order of workflow context update
cfd270f is described below

commit cfd270f2ab377126c36fd71151d414aee55baf66
Author: Ali Reza Zamani Zadeh Najari <[email protected]>
AuthorDate: Wed Sep 4 12:39:45 2019 -0700

    Fix the order of workflow context update
    
    * Fix the order of workflow context update
    
    In this commit:
    The order that workflow dispatcher updates the workflow status has been 
changed.
    If execution delay is set and job is inflight, the context will get updated.
    An integration test has been added.
    
    * minor fixes
---
 .../org/apache/helix/task/WorkflowDispatcher.java  | 27 +++----
 .../task/TestStopWorkflowWithExecutionDelay.java   | 93 ++++++++++++++++++++++
 2 files changed, 106 insertions(+), 14 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index 01c5e09..88f2168 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -107,21 +107,9 @@ public class WorkflowDispatcher extends 
AbstractTaskDispatcher {
       // future cleanup work
     }
 
-    // Step 3: handle workflow that should STOP
-    // For workflows that already reached final states, STOP should not take 
into effect
-    if (!finalStates.contains(workflowCtx.getWorkflowState())
-        && TargetState.STOP.equals(targetState)) {
-      LOG.info("Workflow " + workflow + "is marked as stopped.");
-      if (isWorkflowStopped(workflowCtx, workflowCfg)) {
-        workflowCtx.setWorkflowState(TaskState.STOPPED);
-        _clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
-      }
-      return;
-    }
-
     long currentTime = System.currentTimeMillis();
 
-    // Step 4: Check and process finished workflow context (confusing,
+    // Step 3: Check and process finished workflow context (confusing,
     // but its inside isWorkflowFinished())
     // Check if workflow has been finished and mark it if it is. Also update 
cluster status
     // monitor if provided
@@ -134,7 +122,7 @@ public class WorkflowDispatcher extends 
AbstractTaskDispatcher {
       _clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
     }
 
-    // Step 5: Handle finished workflows
+    // Step 4: Handle finished workflows
     if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
       LOG.info("Workflow " + workflow + " is finished.");
       long expiryTime = workflowCfg.getExpiry();
@@ -174,6 +162,17 @@ public class WorkflowDispatcher extends 
AbstractTaskDispatcher {
           workflow));
     }
 
+    // Step 5: handle workflow that should STOP
+    // For workflows that have already reached final states, STOP should not 
take into effect.
+    if (!finalStates.contains(workflowCtx.getWorkflowState())
+        && TargetState.STOP.equals(targetState)) {
+      LOG.info("Workflow " + workflow + " is marked as stopped. Workflow state 
is " + workflowCtx.getWorkflowState());
+      if (isWorkflowStopped(workflowCtx, workflowCfg)) {
+        workflowCtx.setWorkflowState(TaskState.STOPPED);
+        _clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
+      }
+      return;
+    }
     _clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
   }
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflowWithExecutionDelay.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflowWithExecutionDelay.java
new file mode 100644
index 0000000..c7fc988
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflowWithExecutionDelay.java
@@ -0,0 +1,93 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowContext;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * This test checks whether workflow stop works properly with execution delay 
set.
+ */
+public class TestStopWorkflowWithExecutionDelay extends TaskTestBase {
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+  }
+
+  @Test
+  public void testStopWorkflowWithExecutionDelay() throws Exception {
+    // Execution Delay is set to be 20 milliseconds. Any delay that causes the 
job to go to the
+    // inflightjob queue is sufficient for this test.
+    final long executionDelay = 20L;
+    // Timeout per task has been set to be a large number.
+    final long timeout = 60000L;
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder builder = new Workflow.Builder(workflowName);
+    // Workflow DAG Schematic:
+    //          JOB0
+    //           /\
+    //          /  \
+    //         /    \
+    //       JOB1   JOB2
+
+    JobConfig.Builder jobBuilder = 
JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        
.setTimeoutPerTask(timeout).setMaxAttemptsPerTask(1).setWorkflow(workflowName)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "10000"));
+
+    JobConfig.Builder jobBuilder2 = 
JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        
.setTimeoutPerTask(timeout).setMaxAttemptsPerTask(1).setWorkflow(workflowName)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "10000"));
+
+    JobConfig.Builder jobBuilder3 = 
JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        
.setTimeoutPerTask(timeout).setMaxAttemptsPerTask(1).setWorkflow(workflowName)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "10000"));
+
+    builder.addParentChildDependency("JOB0", "JOB1");
+    builder.addParentChildDependency("JOB0", "JOB2");
+    builder.addJob("JOB0", jobBuilder.setExecutionDelay(executionDelay));
+    builder.addJob("JOB1", jobBuilder2);
+    builder.addJob("JOB2", jobBuilder3);
+
+    _driver.start(builder.build());
+
+    // Wait until Workflow Context is created. and running.
+    _driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS);
+
+    // Check the Job0 is running.
+    _driver.pollForJobState(workflowName, 
TaskUtil.getNamespacedJobName(workflowName, "JOB0"),
+        TaskState.IN_PROGRESS);
+
+    // Stop the workflow
+    _driver.stop(workflowName);
+
+    _driver.pollForWorkflowState(workflowName, TaskState.STOPPED);
+  }
+}

Reply via email to