This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch 3.1.8-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.1.8-prepare by this push:
     new 1e029fbad7 [Fix-14303][Master] Workflow with sub_process task can't be 
stopped w… (#14343)
1e029fbad7 is described below

commit 1e029fbad7d091f268397b1d51cfdfedcf3af786
Author: HomminLee <[email protected]>
AuthorDate: Wed Jul 19 11:13:21 2023 +0800

    [Fix-14303][Master] Workflow with sub_process task can't be stopped w… 
(#14343)
    
    * [Fix-14303][Master] Workflow with sub_process task can't be stopped when 
running
    
    * add test
    
    * fix error from cherry-pick
---
 .../master/runner/WorkflowExecuteRunnable.java     |  2 +-
 .../master/runner/task/SubTaskProcessor.java       | 19 +++++++-
 .../server/master/SubProcessTaskTest.java          | 54 ++++++++++++++++++++++
 3 files changed, 72 insertions(+), 3 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 4556767a88..8bbd0bb064 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -606,7 +606,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
         Date scheduleDate = processInstance.getScheduleTime();
         if (scheduleDate == null) {
             if (CollectionUtils.isEmpty(complementListDate)) {
-                log.info("complementListDate is empty, process complement end. 
process id:{}", processInstance.getId());
+                logger.info("complementListDate is empty, process complement 
end. process id:{}", processInstance.getId());
 
                 return true;
             }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 8fe41da9e7..e53dfa0d6a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -129,14 +129,29 @@ public class SubTaskProcessor extends BaseTaskProcessor {
                 subProcessInstance.getId(),
                 subProcessInstance.getState());
         if (subProcessInstance != null && 
subProcessInstance.getState().isFinished()) {
-            // todo: check the status and transform
-            
taskInstance.setState(TaskExecutionStatus.of(subProcessInstance.getState().getCode()));
+            
taskInstance.setState(subProcessFinishedStateTransform(subProcessInstance.getState()));
             taskInstance.setEndTime(new Date());
             dealFinish();
             processService.saveTaskInstance(taskInstance);
         }
     }
 
+    private TaskExecutionStatus 
subProcessFinishedStateTransform(WorkflowExecutionStatus state) {
+        if (state.isSuccess()) {
+            return TaskExecutionStatus.SUCCESS;
+        } else if (state.isFailure()) {
+            return TaskExecutionStatus.FAILURE;
+        } else if (state.isStop()) {
+            return TaskExecutionStatus.KILL;
+        } else if (state.isPause()) {
+            return TaskExecutionStatus.PAUSE;
+        } else {
+            // todo: should handle 'BLOCK' state?
+            logger.error("Can't transform workflow state: {}", state);
+            throw new IllegalArgumentException(String.format("Can't transform 
workflow state: %s", state.name()));
+        }
+    }
+
     /**
      * get the params from subProcessInstance to this subProcessTask
      */
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
index 1645a1aa44..dacb63a92e 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
@@ -150,6 +150,60 @@ public class SubProcessTaskTest {
         Assert.assertEquals(TaskExecutionStatus.SUCCESS, status);
     }
 
+    @Test
+    public void testStop() {
+        TaskInstance taskInstance = 
testBasicInit(WorkflowExecutionStatus.STOP);
+        taskInstance.setVarPool(getProperty());
+        taskInstance.setTaskParams("{\"processDefinitionCode\":110," +
+                "\"dependence\":{},\"localParams\":[{\"prop\":\"key\"," +
+                "\"direct\":\"out\",\"type\":\"VARCHAR\",\"value\":\"\"}," +
+                "{\"prop\":\"database_name\",\"direct\":\"OUT\"," +
+                "\"type\":\"VARCHAR\",\"value\":\"\"}]," +
+                "\"conditionResult\":{\"successNode\":[],\"failedNode\":[]}," +
+                "\"waitStartTimeout\":{},\"switchResult\":{}}");
+        SubTaskProcessor subTaskProcessor = new SubTaskProcessor();
+        subTaskProcessor.init(taskInstance, processInstance);
+        subTaskProcessor.action(TaskAction.RUN);
+        TaskExecutionStatus status = taskInstance.getState();
+        Assert.assertEquals(TaskExecutionStatus.KILL, status);
+    }
+
+    @Test
+    public void testFail() {
+        TaskInstance taskInstance = 
testBasicInit(WorkflowExecutionStatus.FAILURE);
+        taskInstance.setVarPool(getProperty());
+        taskInstance.setTaskParams("{\"processDefinitionCode\":110," +
+                "\"dependence\":{},\"localParams\":[{\"prop\":\"key\"," +
+                "\"direct\":\"out\",\"type\":\"VARCHAR\",\"value\":\"\"}," +
+                "{\"prop\":\"database_name\",\"direct\":\"OUT\"," +
+                "\"type\":\"VARCHAR\",\"value\":\"\"}]," +
+                "\"conditionResult\":{\"successNode\":[],\"failedNode\":[]}," +
+                "\"waitStartTimeout\":{},\"switchResult\":{}}");
+        SubTaskProcessor subTaskProcessor = new SubTaskProcessor();
+        subTaskProcessor.init(taskInstance, processInstance);
+        subTaskProcessor.action(TaskAction.RUN);
+        TaskExecutionStatus status = taskInstance.getState();
+        Assert.assertEquals(TaskExecutionStatus.FAILURE, status);
+    }
+
+    @Test
+    public void testPAUSE() {
+        TaskInstance taskInstance = 
testBasicInit(WorkflowExecutionStatus.PAUSE);
+        taskInstance.setVarPool(getProperty());
+        taskInstance.setTaskParams("{\"processDefinitionCode\":110," +
+                "\"dependence\":{},\"localParams\":[{\"prop\":\"key\"," +
+                "\"direct\":\"out\",\"type\":\"VARCHAR\",\"value\":\"\"}," +
+                "{\"prop\":\"database_name\",\"direct\":\"OUT\"," +
+                "\"type\":\"VARCHAR\",\"value\":\"\"}]," +
+                "\"conditionResult\":{\"successNode\":[],\"failedNode\":[]}," +
+                "\"waitStartTimeout\":{},\"switchResult\":{}}");
+        SubTaskProcessor subTaskProcessor = new SubTaskProcessor();
+        subTaskProcessor.init(taskInstance, processInstance);
+        subTaskProcessor.action(TaskAction.RUN);
+        TaskExecutionStatus status = taskInstance.getState();
+        Assert.assertEquals(TaskExecutionStatus.PAUSE, status);
+    }
+
     private String getProperty() {
         List<Property> varPools = new ArrayList<>();
         Property property = new Property();

Reply via email to