This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch 2.0.8-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.8-prepare by this push:
new cbafb7fe06 fix 13346 (#13411)
cbafb7fe06 is described below
commit cbafb7fe06f41114b0e6eb78f3c40be660f4e51b
Author: JinYong Li <[email protected]>
AuthorDate: Wed Jan 18 10:31:51 2023 +0800
fix 13346 (#13411)
Co-authored-by: JinyLeeChina <[email protected]>
---
.../api/service/impl/ExecutorServiceImpl.java | 2 +-
.../common/enums/ExecutionStatus.java | 20 ++++++++++++++------
.../dolphinscheduler/dao/entity/TaskInstance.java | 3 +--
.../server/master/runner/WorkflowExecuteThread.java | 19 +++++++++++--------
.../master/runner/task/CommonTaskProcessor.java | 2 +-
.../master/runner/task/ConditionTaskProcessor.java | 2 +-
.../master/runner/task/DependentTaskProcessor.java | 2 +-
.../master/runner/task/SwitchTaskProcessor.java | 2 +-
.../dolphinscheduler/spi/task/ExecutionStatus.java | 5 ++---
9 files changed, 33 insertions(+), 24 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index fe21842f72..50d0856cef 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -339,7 +339,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
}
break;
case RECOVER_SUSPENDED_PROCESS:
- if (executionStatus.typeIsPause() ||
executionStatus.typeIsCancel()) {
+ if (executionStatus.typeIsCancel()) {
checkResult = true;
}
break;
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
index 637eab2a4c..b3fac4094f 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
@@ -99,8 +99,7 @@ public enum ExecutionStatus {
* @return status
*/
public boolean typeIsFinished() {
- return typeIsSuccess() || typeIsFailure() || typeIsCancel() ||
typeIsPause()
- || typeIsStop();
+ return typeIsSuccess() || typeIsFailure() || typeIsCancel();
}
/**
@@ -113,12 +112,12 @@ public enum ExecutionStatus {
}
/**
- * status is pause
+ * status is ready pause
*
* @return status
*/
- public boolean typeIsPause() {
- return this == PAUSE;
+ public boolean typeIsReadyPause() {
+ return this == READY_PAUSE;
}
/**
@@ -145,7 +144,16 @@ public enum ExecutionStatus {
* @return status
*/
public boolean typeIsCancel() {
- return this == KILL || this == STOP;
+ return this == KILL || this == STOP || this == PAUSE;
+ }
+
+ /**
+ * status is ready cancel
+ *
+ * @return status
+ */
+ public boolean typeIsReadyCancel() {
+ return this == READY_PAUSE || this == READY_STOP;
}
public int getCode() {
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 4019f20508..e2b8034113 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -546,8 +546,7 @@ public class TaskInstance implements Serializable {
}
public boolean isTaskComplete() {
- return this.getState().typeIsPause()
- || this.getState().typeIsSuccess()
+ return this.getState().typeIsSuccess()
|| this.getState().typeIsCancel()
|| (this.getState().typeIsFailure() && !taskCanRetry());
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index cc6078a1e6..7d542157f1 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -501,7 +501,7 @@ public class WorkflowExecuteThread implements Runnable {
logger.info("process:{} state {} change to {}",
processInstance.getId(), processInstance.getState(),
stateEvent.getExecutionStatus());
processInstance =
processService.findProcessInstanceById(this.processInstance.getId());
- if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
+ if (stateEvent.getExecutionStatus().typeIsCancel()) {
this.updateProcessInstanceState(stateEvent);
return true;
}
@@ -529,7 +529,7 @@ public class WorkflowExecuteThread implements Runnable {
return false;
}
- if (processInstance.getState() == ExecutionStatus.READY_STOP) {
+ if (processInstance.getState().typeIsReadyCancel()) {
return false;
}
@@ -984,7 +984,12 @@ public class WorkflowExecuteThread implements Runnable {
continue;
}
TaskInstance task = createTaskInstance(processInstance,
taskNodeObject);
- taskInstances.add(task);
+ if (processInstance.getState().typeIsReadyPause() &&
(!activeTaskProcessorMaps.isEmpty() || !completeTaskList.isEmpty())) {
+ task.setState(ExecutionStatus.PAUSE);
+ completeTaskList.put(String.valueOf(task.getTaskCode()), task);
+ } else {
+ taskInstances.add(task);
+ }
}
// if previous node success , post node submit
@@ -998,7 +1003,7 @@ public class WorkflowExecuteThread implements Runnable {
logger.info("task {} has already run success, task id:{}",
task.getName(), task.getId());
continue;
}
- if (task.getState().typeIsPause() ||
task.getState().typeIsCancel()) {
+ if (task.getState().typeIsCancel()) {
logger.info("task {} stopped, the state is {}, task id:{}",
task.getName(), task.getState(), task.getId());
} else {
addTaskToStandByList(task);
@@ -1030,7 +1035,7 @@ public class WorkflowExecuteThread implements Runnable {
return DependResult.WAITING;
}
ExecutionStatus depTaskState =
completeTaskList.get(depsNode).getState();
- if (depTaskState.typeIsPause() || depTaskState.typeIsCancel())
{
+ if (depTaskState.typeIsCancel()) {
return DependResult.NON_EXEC;
}
// ignore task state if current task is condition
@@ -1198,9 +1203,7 @@ public class WorkflowExecuteThread implements Runnable {
}
List<TaskInstance> pauseList =
getCompleteTaskByState(ExecutionStatus.PAUSE);
- if (CollectionUtils.isNotEmpty(pauseList)
- || !isComplementEnd()
- || readyToSubmitTaskQueue.size() > 0) {
+ if (CollectionUtils.isNotEmpty(pauseList) || !isComplementEnd()) {
return ExecutionStatus.PAUSE;
} else {
return ExecutionStatus.SUCCESS;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index 32d3e62165..e7e3a978d7 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -106,7 +106,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
protected boolean persistTask(TaskAction taskAction) {
switch (taskAction) {
case STOP:
- if (taskInstance.getState().typeIsFinished() &&
!taskInstance.getState().typeIsCancel()) {
+ if (taskInstance.getState().typeIsSuccess() ||
taskInstance.getState().typeIsFailure()) {
return true;
}
taskInstance.setState(ExecutionStatus.KILL);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index 20703904a5..41dd0b0649 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -123,7 +123,7 @@ public class ConditionTaskProcessor extends
BaseTaskProcessor {
protected boolean persistTask(TaskAction taskAction) {
switch (taskAction) {
case STOP:
- if (taskInstance.getState().typeIsFinished() &&
!taskInstance.getState().typeIsCancel()) {
+ if (taskInstance.getState().typeIsSuccess() ||
taskInstance.getState().typeIsFailure()) {
return true;
}
this.taskInstance.setState(ExecutionStatus.KILL);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index d83d748c2d..16e330d000 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -154,7 +154,7 @@ public class DependentTaskProcessor extends
BaseTaskProcessor {
protected boolean persistTask(TaskAction taskAction) {
switch (taskAction) {
case STOP:
- if (taskInstance.getState().typeIsFinished() &&
!taskInstance.getState().typeIsCancel()) {
+ if (taskInstance.getState().typeIsSuccess() ||
taskInstance.getState().typeIsFailure()) {
return true;
}
this.taskInstance.setState(ExecutionStatus.KILL);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 0829a98c7e..9cc00d3158 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -99,7 +99,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
protected boolean persistTask(TaskAction taskAction) {
switch (taskAction) {
case STOP:
- if (taskInstance.getState().typeIsFinished() &&
!taskInstance.getState().typeIsCancel()) {
+ if (taskInstance.getState().typeIsSuccess() ||
taskInstance.getState().typeIsFailure()) {
return true;
}
this.taskInstance.setState(ExecutionStatus.KILL);
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/ExecutionStatus.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/ExecutionStatus.java
index 836c0cd3d9..6f9503ed30 100644
---
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/ExecutionStatus.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/ExecutionStatus.java
@@ -96,8 +96,7 @@ public enum ExecutionStatus {
* @return status
*/
public boolean typeIsFinished() {
- return typeIsSuccess() || typeIsFailure() || typeIsCancel() ||
typeIsPause()
- || typeIsStop();
+ return typeIsSuccess() || typeIsFailure() || typeIsCancel();
}
/**
@@ -142,7 +141,7 @@ public enum ExecutionStatus {
* @return status
*/
public boolean typeIsCancel() {
- return this == KILL || this == STOP;
+ return this == KILL || this == STOP || this == PAUSE;
}
public int getCode() {