This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch 3.1.8-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.1.8-prepare by this push:
new 1e029fbad7 [Fix-14303][Master] Workflow with sub_process task can't be
stopped w… (#14343)
1e029fbad7 is described below
commit 1e029fbad7d091f268397b1d51cfdfedcf3af786
Author: HomminLee <[email protected]>
AuthorDate: Wed Jul 19 11:13:21 2023 +0800
[Fix-14303][Master] Workflow with sub_process task can't be stopped w…
(#14343)
* [Fix-14303][Master] Workflow with sub_process task can't be stopped when
running
* add test
* fix error from cherry-pick
---
.../master/runner/WorkflowExecuteRunnable.java | 2 +-
.../master/runner/task/SubTaskProcessor.java | 19 +++++++-
.../server/master/SubProcessTaskTest.java | 54 ++++++++++++++++++++++
3 files changed, 72 insertions(+), 3 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 4556767a88..8bbd0bb064 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -606,7 +606,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
Date scheduleDate = processInstance.getScheduleTime();
if (scheduleDate == null) {
if (CollectionUtils.isEmpty(complementListDate)) {
- log.info("complementListDate is empty, process complement end.
process id:{}", processInstance.getId());
+ logger.info("complementListDate is empty, process complement
end. process id:{}", processInstance.getId());
return true;
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 8fe41da9e7..e53dfa0d6a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -129,14 +129,29 @@ public class SubTaskProcessor extends BaseTaskProcessor {
subProcessInstance.getId(),
subProcessInstance.getState());
if (subProcessInstance != null &&
subProcessInstance.getState().isFinished()) {
- // todo: check the status and transform
-
taskInstance.setState(TaskExecutionStatus.of(subProcessInstance.getState().getCode()));
+
taskInstance.setState(subProcessFinishedStateTransform(subProcessInstance.getState()));
taskInstance.setEndTime(new Date());
dealFinish();
processService.saveTaskInstance(taskInstance);
}
}
+ private TaskExecutionStatus
subProcessFinishedStateTransform(WorkflowExecutionStatus state) {
+ if (state.isSuccess()) {
+ return TaskExecutionStatus.SUCCESS;
+ } else if (state.isFailure()) {
+ return TaskExecutionStatus.FAILURE;
+ } else if (state.isStop()) {
+ return TaskExecutionStatus.KILL;
+ } else if (state.isPause()) {
+ return TaskExecutionStatus.PAUSE;
+ } else {
+ // todo: should handle 'BLOCK' state?
+ logger.error("Can't transform workflow state: {}", state);
+ throw new IllegalArgumentException(String.format("Can't transform
workflow state: %s", state.name()));
+ }
+ }
+
/**
* get the params from subProcessInstance to this subProcessTask
*/
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
index 1645a1aa44..dacb63a92e 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
@@ -150,6 +150,60 @@ public class SubProcessTaskTest {
Assert.assertEquals(TaskExecutionStatus.SUCCESS, status);
}
+ @Test
+ public void testStop() {
+ TaskInstance taskInstance =
testBasicInit(WorkflowExecutionStatus.STOP);
+ taskInstance.setVarPool(getProperty());
+ taskInstance.setTaskParams("{\"processDefinitionCode\":110," +
+ "\"dependence\":{},\"localParams\":[{\"prop\":\"key\"," +
+ "\"direct\":\"out\",\"type\":\"VARCHAR\",\"value\":\"\"}," +
+ "{\"prop\":\"database_name\",\"direct\":\"OUT\"," +
+ "\"type\":\"VARCHAR\",\"value\":\"\"}]," +
+ "\"conditionResult\":{\"successNode\":[],\"failedNode\":[]}," +
+ "\"waitStartTimeout\":{},\"switchResult\":{}}");
+ SubTaskProcessor subTaskProcessor = new SubTaskProcessor();
+ subTaskProcessor.init(taskInstance, processInstance);
+ subTaskProcessor.action(TaskAction.RUN);
+ TaskExecutionStatus status = taskInstance.getState();
+ Assert.assertEquals(TaskExecutionStatus.KILL, status);
+ }
+
+ @Test
+ public void testFail() {
+ TaskInstance taskInstance =
testBasicInit(WorkflowExecutionStatus.FAILURE);
+ taskInstance.setVarPool(getProperty());
+ taskInstance.setTaskParams("{\"processDefinitionCode\":110," +
+ "\"dependence\":{},\"localParams\":[{\"prop\":\"key\"," +
+ "\"direct\":\"out\",\"type\":\"VARCHAR\",\"value\":\"\"}," +
+ "{\"prop\":\"database_name\",\"direct\":\"OUT\"," +
+ "\"type\":\"VARCHAR\",\"value\":\"\"}]," +
+ "\"conditionResult\":{\"successNode\":[],\"failedNode\":[]}," +
+ "\"waitStartTimeout\":{},\"switchResult\":{}}");
+ SubTaskProcessor subTaskProcessor = new SubTaskProcessor();
+ subTaskProcessor.init(taskInstance, processInstance);
+ subTaskProcessor.action(TaskAction.RUN);
+ TaskExecutionStatus status = taskInstance.getState();
+ Assert.assertEquals(TaskExecutionStatus.FAILURE, status);
+ }
+
+ @Test
+ public void testPAUSE() {
+ TaskInstance taskInstance =
testBasicInit(WorkflowExecutionStatus.PAUSE);
+ taskInstance.setVarPool(getProperty());
+ taskInstance.setTaskParams("{\"processDefinitionCode\":110," +
+ "\"dependence\":{},\"localParams\":[{\"prop\":\"key\"," +
+ "\"direct\":\"out\",\"type\":\"VARCHAR\",\"value\":\"\"}," +
+ "{\"prop\":\"database_name\",\"direct\":\"OUT\"," +
+ "\"type\":\"VARCHAR\",\"value\":\"\"}]," +
+ "\"conditionResult\":{\"successNode\":[],\"failedNode\":[]}," +
+ "\"waitStartTimeout\":{},\"switchResult\":{}}");
+ SubTaskProcessor subTaskProcessor = new SubTaskProcessor();
+ subTaskProcessor.init(taskInstance, processInstance);
+ subTaskProcessor.action(TaskAction.RUN);
+ TaskExecutionStatus status = taskInstance.getState();
+ Assert.assertEquals(TaskExecutionStatus.PAUSE, status);
+ }
+
private String getProperty() {
List<Property> varPools = new ArrayList<>();
Property property = new Property();