This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b1d57dbce4 Check the status of the child process when the parent
process is running (#9567)
b1d57dbce4 is described below
commit b1d57dbce491d13d29ca44ebab776c49fa3656c7
Author: WangJPLeo <[email protected]>
AuthorDate: Mon Apr 18 20:27:11 2022 +0800
Check the status of the child process when the parent process is running
(#9567)
Co-authored-by: WangJPLeo <[email protected]>
---
.../apache/dolphinscheduler/api/enums/Status.java | 1 +
.../api/service/ExecutorService.java | 7 +++
.../api/service/impl/ExecutorServiceImpl.java | 57 +++++++++++++++-------
.../api/service/ExecutorServiceTest.java | 8 +++
.../dao/mapper/ProcessTaskRelationMapper.java | 8 +++
.../dao/mapper/ProcessTaskRelationMapper.xml | 7 +++
6 files changed, 71 insertions(+), 17 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 81802d5f9f..4954adf431 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -242,6 +242,7 @@ public enum Status {
PROCESS_INSTANCE_EXIST(50002, "process instance {0} already exists",
"工作流实例[{0}]已存在"),
PROCESS_DEFINE_NOT_EXIST(50003, "process definition {0} does not exist",
"工作流定义[{0}]不存在"),
PROCESS_DEFINE_NOT_RELEASE(50004, "process definition {0} not on line",
"工作流定义[{0}]不是上线状态"),
+ SUB_PROCESS_DEFINE_NOT_RELEASE(50004, "exist sub process definition not on
line", "存在子工作流定义不是上线状态"),
PROCESS_INSTANCE_ALREADY_CHANGED(50005, "the status of process instance
{0} is already {1}", "工作流实例[{0}]的状态已经是[{1}]"),
PROCESS_INSTANCE_STATE_OPERATION_ERROR(50006, "the status of process
instance {0} is {1},Cannot perform {2} operation",
"工作流实例[{0}]的状态是[{1}],无法执行[{2}]操作"),
SUB_PROCESS_INSTANCE_NOT_EXIST(50007, "the task belong to process instance
does not exist", "子工作流实例不存在"),
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
index 1087d595eb..2c9fcff7a9 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
@@ -95,4 +95,11 @@ public interface ExecutorService {
* @return check result code
*/
Map<String, Object> startCheckByProcessDefinedCode(long
processDefinitionCode);
+
+ /**
+ * check if the current process has subprocesses and all subprocesses are
valid
+ * @param processDefinition
+ * @return check result
+ */
+ boolean checkSubProcessDefinitionValid(ProcessDefinition
processDefinition);
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 4486824314..33cc6a2fbe 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -44,18 +44,9 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.Command;
-import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.entity.Schedule;
-import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
-import org.apache.dolphinscheduler.dao.entity.Tenant;
-import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.entity.*;
+import org.apache.dolphinscheduler.dao.mapper.*;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
@@ -67,11 +58,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@@ -110,6 +97,12 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
@Autowired
StateEventCallbackService stateEventCallbackService;
+ @Autowired
+ private TaskDefinitionMapper taskDefinitionMapper;
+
+ @Autowired
+ private ProcessTaskRelationMapper processTaskRelationMapper;
+
/**
* execute process instance
*
@@ -226,12 +219,42 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
} else if (processDefinition.getReleaseState() != ReleaseState.ONLINE)
{
// check process definition online
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE,
processDefineCode);
+ } else if (!checkSubProcessDefinitionValid(processDefinition)){
+ // check sub process definition online
+ putMsg(result, Status.SUB_PROCESS_DEFINE_NOT_RELEASE);
} else {
result.put(Constants.STATUS, Status.SUCCESS);
}
return result;
}
+ /**
+ * check if the current process has subprocesses and all subprocesses are
valid
+ * @param processDefinition
+ * @return check result
+ */
+ @Override
+ public boolean checkSubProcessDefinitionValid(ProcessDefinition
processDefinition) {
+ // query all subprocesses under the current process
+ List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryDownstreamByProcessDefinitionCode(processDefinition.getCode());
+ if (processTaskRelations.isEmpty()){
+ return true;
+ }
+ Set<Long> relationCodes =
processTaskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toSet());
+ List<TaskDefinition> taskDefinitions =
taskDefinitionMapper.queryByCodeList(relationCodes);
+
+ // find out the process definition code
+ Set<Long> processDefinitionCodeSet = new HashSet<>();
+ taskDefinitions.stream()
+ .filter(task ->
TaskConstants.TASK_TYPE_SUB_PROCESS.equalsIgnoreCase(task.getTaskType()))
+ .forEach(taskDefinition ->
processDefinitionCodeSet.add(Long.valueOf(JSONUtils.getNodeString(taskDefinition.getTaskParams(),
Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE))));
+
+ // check sub releaseState
+ List<ProcessDefinition> processDefinitions =
processDefinitionMapper.queryByCodes(processDefinitionCodeSet);
+ return processDefinitions.stream().filter(definition ->
definition.getReleaseState().equals(ReleaseState.OFFLINE)).collect(Collectors.toSet()).isEmpty();
+ }
+
+
/**
* do action to process instance:pause, stop, repeat, recover from pause,
recover from stop
*
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index cc6fc86e25..3f2bd02575 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -40,7 +40,9 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -78,6 +80,12 @@ public class ExecutorServiceTest {
@Mock
private ProcessDefinitionMapper processDefinitionMapper;
+ @Mock
+ private ProcessTaskRelationMapper processTaskRelationMapper;
+
+ @Mock
+ private TaskDefinitionMapper taskDefinitionMapper;
+
@Mock
private ProjectMapper projectMapper;
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
index 264c40e080..c14f4b4db8 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
@@ -180,4 +180,12 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
@Param("preTaskCode") long preTaskCode,
@Param("postTaskCode") long postTaskCode);
+ /**
+ * query downstream process task relation by processDefinitionCode
+ * @param processDefinitionCode
+ * @return ProcessTaskRelation
+ */
+ List<ProcessTaskRelation>
queryDownstreamByProcessDefinitionCode(@Param("processDefinitionCode") long
processDefinitionCode);
+
+
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
index 4598ec0ead..e25fedefbb 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
@@ -69,6 +69,13 @@
#{relation.conditionType},#{relation.conditionParams},#{relation.createTime},#{relation.updateTime})
</foreach>
</insert>
+ <select id="queryDownstreamByProcessDefinitionCode"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
+ select
+ <include refid="baseSql"/>
+ from t_ds_process_task_relation
+ where process_definition_code = #{processDefinitionCode}
+ and pre_task_version >= 1
+ </select>
<select id="queryDownstreamByTaskCode"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select
<include refid="baseSql"/>