This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new b1d57dbce4 Check the status of the child process when the parent 
process is running (#9567)
b1d57dbce4 is described below

commit b1d57dbce491d13d29ca44ebab776c49fa3656c7
Author: WangJPLeo <[email protected]>
AuthorDate: Mon Apr 18 20:27:11 2022 +0800

    Check the status of the child process when the parent process is running 
(#9567)
    
    Co-authored-by: WangJPLeo <[email protected]>
---
 .../apache/dolphinscheduler/api/enums/Status.java  |  1 +
 .../api/service/ExecutorService.java               |  7 +++
 .../api/service/impl/ExecutorServiceImpl.java      | 57 +++++++++++++++-------
 .../api/service/ExecutorServiceTest.java           |  8 +++
 .../dao/mapper/ProcessTaskRelationMapper.java      |  8 +++
 .../dao/mapper/ProcessTaskRelationMapper.xml       |  7 +++
 6 files changed, 71 insertions(+), 17 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 81802d5f9f..4954adf431 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -242,6 +242,7 @@ public enum Status {
     PROCESS_INSTANCE_EXIST(50002, "process instance {0} already exists", 
"工作流实例[{0}]已存在"),
     PROCESS_DEFINE_NOT_EXIST(50003, "process definition {0} does not exist", 
"工作流定义[{0}]不存在"),
     PROCESS_DEFINE_NOT_RELEASE(50004, "process definition {0} not on line", 
"工作流定义[{0}]不是上线状态"),
+    SUB_PROCESS_DEFINE_NOT_RELEASE(50004, "exist sub process definition not on 
line", "存在子工作流定义不是上线状态"),
     PROCESS_INSTANCE_ALREADY_CHANGED(50005, "the status of process instance 
{0} is already {1}", "工作流实例[{0}]的状态已经是[{1}]"),
     PROCESS_INSTANCE_STATE_OPERATION_ERROR(50006, "the status of process 
instance {0} is {1},Cannot perform {2} operation", 
"工作流实例[{0}]的状态是[{1}],无法执行[{2}]操作"),
     SUB_PROCESS_INSTANCE_NOT_EXIST(50007, "the task belong to process instance 
does not exist", "子工作流实例不存在"),
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
index 1087d595eb..2c9fcff7a9 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
@@ -95,4 +95,11 @@ public interface ExecutorService {
      * @return check result code
      */
     Map<String, Object> startCheckByProcessDefinedCode(long 
processDefinitionCode);
+
+    /**
+     * check if the current process has subprocesses and all subprocesses are 
valid
+     * @param processDefinition
+     * @return check result
+     */
+    boolean checkSubProcessDefinitionValid(ProcessDefinition 
processDefinition);
 }
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 4486824314..33cc6a2fbe 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -44,18 +44,9 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.Command;
-import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.entity.Schedule;
-import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
-import org.apache.dolphinscheduler.dao.entity.Tenant;
-import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.entity.*;
+import org.apache.dolphinscheduler.dao.mapper.*;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
 import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
@@ -67,11 +58,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
@@ -110,6 +97,12 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
     @Autowired
     StateEventCallbackService stateEventCallbackService;
 
+    @Autowired
+    private TaskDefinitionMapper taskDefinitionMapper;
+
+    @Autowired
+    private ProcessTaskRelationMapper processTaskRelationMapper;
+
     /**
      * execute process instance
      *
@@ -226,12 +219,42 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
         } else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) 
{
             // check process definition online
             putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, 
processDefineCode);
+        } else if (!checkSubProcessDefinitionValid(processDefinition)){
+            // check sub process definition online
+            putMsg(result, Status.SUB_PROCESS_DEFINE_NOT_RELEASE);
         } else {
             result.put(Constants.STATUS, Status.SUCCESS);
         }
         return result;
     }
 
+    /**
+     * check if the current process has subprocesses and all subprocesses are 
valid
+     * @param processDefinition
+     * @return check result
+     */
+    @Override
+    public boolean checkSubProcessDefinitionValid(ProcessDefinition 
processDefinition) {
+        // query all subprocesses under the current process
+        List<ProcessTaskRelation> processTaskRelations = 
processTaskRelationMapper.queryDownstreamByProcessDefinitionCode(processDefinition.getCode());
+        if (processTaskRelations.isEmpty()){
+            return true;
+        }
+        Set<Long> relationCodes = 
processTaskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toSet());
+        List<TaskDefinition> taskDefinitions = 
taskDefinitionMapper.queryByCodeList(relationCodes);
+
+        // find out the process definition code
+        Set<Long> processDefinitionCodeSet = new HashSet<>();
+        taskDefinitions.stream()
+                .filter(task -> 
TaskConstants.TASK_TYPE_SUB_PROCESS.equalsIgnoreCase(task.getTaskType()))
+                .forEach(taskDefinition -> 
processDefinitionCodeSet.add(Long.valueOf(JSONUtils.getNodeString(taskDefinition.getTaskParams(),
 Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE))));
+
+        // check sub releaseState
+        List<ProcessDefinition> processDefinitions = 
processDefinitionMapper.queryByCodes(processDefinitionCodeSet);
+        return processDefinitions.stream().filter(definition -> 
definition.getReleaseState().equals(ReleaseState.OFFLINE)).collect(Collectors.toSet()).isEmpty();
+    }
+
+
     /**
      * do action to process instance:pause, stop, repeat, recover from pause, 
recover from stop
      *
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index cc6fc86e25..3f2bd02575 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -40,7 +40,9 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
@@ -78,6 +80,12 @@ public class ExecutorServiceTest {
     @Mock
     private ProcessDefinitionMapper processDefinitionMapper;
 
+    @Mock
+    private ProcessTaskRelationMapper processTaskRelationMapper;
+
+    @Mock
+    private TaskDefinitionMapper taskDefinitionMapper;
+
     @Mock
     private ProjectMapper projectMapper;
 
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
index 264c40e080..c14f4b4db8 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
@@ -180,4 +180,12 @@ public interface ProcessTaskRelationMapper extends 
BaseMapper<ProcessTaskRelatio
                     @Param("preTaskCode") long preTaskCode,
                     @Param("postTaskCode") long postTaskCode);
 
+    /**
+     * query downstream process task relation by processDefinitionCode
+     * @param processDefinitionCode
+     * @return ProcessTaskRelation
+     */
+    List<ProcessTaskRelation> 
queryDownstreamByProcessDefinitionCode(@Param("processDefinitionCode") long 
processDefinitionCode);
+
+
 }
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
index 4598ec0ead..e25fedefbb 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
@@ -69,6 +69,13 @@
             
#{relation.conditionType},#{relation.conditionParams},#{relation.createTime},#{relation.updateTime})
         </foreach>
     </insert>
+    <select id="queryDownstreamByProcessDefinitionCode" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
+        select
+        <include refid="baseSql"/>
+        from t_ds_process_task_relation
+        where process_definition_code = #{processDefinitionCode}
+        and pre_task_version >= 1
+    </select>
     <select id="queryDownstreamByTaskCode" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
         select
         <include refid="baseSql"/>

Reply via email to