This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 891a002  [Feature-#8373][MasterServer] Dependent tasks can re-run 
automatically in the case of complement (#8496)
891a002 is described below

commit 891a002beac4b429e20c03aeae77da4c72c39f6c
Author: xiangzihao <[email protected]>
AuthorDate: Fri Feb 25 10:34:02 2022 +0800

    [Feature-#8373][MasterServer] Dependent tasks can re-run automatically in 
the case of complement (#8496)
    
    * first add feature_8373
    
    * fix code smell
    
    * add blank line
    
    * fix some problems
    
    * fix unit test error
---
 .../api/controller/ExecutorController.java         |  34 ++++-
 .../api/service/ExecutorService.java               |   4 +-
 .../api/service/impl/ExecutorServiceImpl.java      | 123 +++++++++++++++--
 .../main/resources/i18n/messages_en_US.properties  |   2 +
 .../main/resources/i18n/messages_zh_CN.properties  |   2 +
 .../api/controller/ExecutorControllerTest.java     |  12 +-
 .../api/service/ExecutorServiceTest.java           |  22 ++-
 .../common/enums/ComplementDependentMode.java      |  49 +++++++
 .../dao/entity/DependentProcessDefinition.java     | 149 +++++++++++++++++++++
 .../dao/mapper/ScheduleMapper.java                 |   8 ++
 .../dao/mapper/WorkFlowLineageMapper.java          |   9 ++
 .../dolphinscheduler/dao/mapper/ScheduleMapper.xml |  11 ++
 .../dao/mapper/WorkFlowLineageMapper.xml           |  17 +++
 .../server/PythonGatewayServer.java                |   5 +-
 .../service/process/ProcessService.java            |  28 ++++
 15 files changed, 443 insertions(+), 32 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
index 45bfecc..10c6177 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
@@ -29,6 +29,7 @@ import 
org.apache.dolphinscheduler.api.service.ExecutorService;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.RunMode;
@@ -107,7 +108,9 @@ public class ExecutorController extends BaseController {
         @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", 
dataType = "String", example = "default"),
         @ApiImplicitParam(name = "environmentCode", value = 
"ENVIRONMENT_CODE", dataType = "Long", example = "-1"),
         @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = 
"Int", example = "100"),
-        @ApiImplicitParam(name = "expectedParallelismNumber", value = 
"EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8")
+        @ApiImplicitParam(name = "expectedParallelismNumber", value = 
"EXPECTED_PARALLELISM_NUMBER", dataType = "Int" , example = "8"),
+        @ApiImplicitParam(name = "dryRun", value = "DRY_RUN", dataType = 
"Int", example = "0"),
+        @ApiImplicitParam(name = "complementDependentMode", value = 
"COMPLEMENT_DEPENDENT_MODE", dataType = "complementDependentMode")
     })
     @PostMapping(value = "start-process-instance")
     @ResponseStatus(HttpStatus.OK)
@@ -130,7 +133,8 @@ public class ExecutorController extends BaseController {
                                        @RequestParam(value = "timeout", 
required = false) Integer timeout,
                                        @RequestParam(value = "startParams", 
required = false) String startParams,
                                        @RequestParam(value = 
"expectedParallelismNumber", required = false) Integer 
expectedParallelismNumber,
-                                       @RequestParam(value = "dryRun", 
defaultValue = "0", required = false) int dryRun) {
+                                       @RequestParam(value = "dryRun", 
defaultValue = "0", required = false) int dryRun,
+                                       @RequestParam(value = 
"complementDependentMode", required = false) ComplementDependentMode 
complementDependentMode) {
 
         if (timeout == null) {
             timeout = Constants.MAX_TASK_TIMEOUT;
@@ -139,8 +143,15 @@ public class ExecutorController extends BaseController {
         if (startParams != null) {
             startParamMap = JSONUtils.toMap(startParams);
         }
-        Map<String, Object> result = 
execService.execProcessInstance(loginUser, projectCode, processDefinitionCode, 
scheduleTime, execType, failureStrategy,
-            startNodeList, taskDependType, warningType, warningGroupId, 
runMode, processInstancePriority, workerGroup, environmentCode,timeout, 
startParamMap, expectedParallelismNumber, dryRun);
+
+        if (complementDependentMode == null) {
+            complementDependentMode = ComplementDependentMode.OFF_MODE;
+        }
+
+        Map<String, Object> result = 
execService.execProcessInstance(loginUser, projectCode, processDefinitionCode,
+                scheduleTime, execType, failureStrategy,
+                startNodeList, taskDependType, warningType, warningGroupId, 
runMode, processInstancePriority,
+                workerGroup, environmentCode, timeout, startParamMap, 
expectedParallelismNumber, dryRun, complementDependentMode);
         return returnDataList(result);
     }
 
@@ -181,7 +192,9 @@ public class ExecutorController extends BaseController {
             @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", 
dataType = "String", example = "default"),
             @ApiImplicitParam(name = "environmentCode", value = 
"ENVIRONMENT_CODE", dataType = "Long", example = "-1"),
             @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = 
"Int", example = "100"),
-            @ApiImplicitParam(name = "expectedParallelismNumber", value = 
"EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8")
+            @ApiImplicitParam(name = "expectedParallelismNumber", value = 
"EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8"),
+            @ApiImplicitParam(name = "dryRun", value = "DRY_RUN", dataType = 
"Int", example = "0"),
+            @ApiImplicitParam(name = "complementDependentMode", value = 
"COMPLEMENT_DEPENDENT_MODE", dataType = "complementDependentMode")
     })
     @PostMapping(value = "batch-start-process-instance")
     @ResponseStatus(HttpStatus.OK)
@@ -204,7 +217,8 @@ public class ExecutorController extends BaseController {
                                        @RequestParam(value = "timeout", 
required = false) Integer timeout,
                                        @RequestParam(value = "startParams", 
required = false) String startParams,
                                        @RequestParam(value = 
"expectedParallelismNumber", required = false) Integer 
expectedParallelismNumber,
-                                       @RequestParam(value = "dryRun", 
defaultValue = "0", required = false) int dryRun) {
+                                       @RequestParam(value = "dryRun", 
defaultValue = "0", required = false) int dryRun,
+                                       @RequestParam(value = 
"complementDependentMode", required = false) ComplementDependentMode 
complementDependentMode) {
 
         if (timeout == null) {
             timeout = Constants.MAX_TASK_TIMEOUT;
@@ -215,6 +229,10 @@ public class ExecutorController extends BaseController {
             startParamMap = JSONUtils.toMap(startParams);
         }
 
+        if (complementDependentMode == null) {
+            complementDependentMode = ComplementDependentMode.OFF_MODE;
+        }
+
         Map<String, Object> result = new HashMap<>();
         List<String> processDefinitionCodeArray = 
Arrays.asList(processDefinitionCodes.split(Constants.COMMA));
         List<String> startFailedProcessDefinitionCodeList = new ArrayList<>();
@@ -224,7 +242,9 @@ public class ExecutorController extends BaseController {
         for (String strProcessDefinitionCode : processDefinitionCodeArray) {
             long processDefinitionCode = 
Long.parseLong(strProcessDefinitionCode);
             result = execService.execProcessInstance(loginUser, projectCode, 
processDefinitionCode, scheduleTime, execType, failureStrategy,
-                    startNodeList, taskDependType, warningType, 
warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, 
timeout, startParamMap, expectedParallelismNumber, dryRun);
+                    startNodeList, taskDependType, warningType, 
warningGroupId, runMode, processInstancePriority,
+                    workerGroup, environmentCode, timeout, startParamMap, 
expectedParallelismNumber, dryRun,
+                    complementDependentMode);
 
             if (!Status.SUCCESS.equals(result.get(Constants.STATUS))) {
                 
startFailedProcessDefinitionCodeList.add(String.valueOf(processDefinitionCode));
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
index 2fa065b..1087d59 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service;
 
 import org.apache.dolphinscheduler.api.enums.ExecuteType;
 import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.RunMode;
@@ -63,7 +64,8 @@ public interface ExecutorService {
                                             RunMode runMode,
                                             Priority processInstancePriority, 
String workerGroup, Long environmentCode, Integer timeout,
                                             Map<String, String> startParams, 
Integer expectedParallelismNumber,
-                                            int dryRun);
+                                            int dryRun,
+                                            ComplementDependentMode 
complementDependentMode);
 
     /**
      * check whether the process definition can be executed
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 5991e79..6dc8ab7 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -24,6 +24,7 @@ import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
 import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
 
+import org.apache.commons.beanutils.BeanUtils;
 import org.apache.dolphinscheduler.api.enums.ExecuteType;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.ExecutorService;
@@ -31,6 +32,8 @@ import org.apache.dolphinscheduler.api.service.MonitorService;
 import org.apache.dolphinscheduler.api.service.ProjectService;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
+import org.apache.dolphinscheduler.common.enums.CycleEnum;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Flag;
@@ -44,6 +47,7 @@ import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.Project;
@@ -66,9 +70,9 @@ import org.apache.commons.lang.StringUtils;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -97,11 +101,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
     @Autowired
     private MonitorService monitorService;
 
-
     @Autowired
     private ProcessInstanceMapper processInstanceMapper;
 
-
     @Autowired
     private ProcessService processService;
 
@@ -138,7 +140,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
                                                    RunMode runMode,
                                                    Priority 
processInstancePriority, String workerGroup, Long environmentCode,Integer 
timeout,
                                                    Map<String, String> 
startParams, Integer expectedParallelismNumber,
-                                                   int dryRun) {
+                                                   int dryRun,
+                                                   ComplementDependentMode 
complementDependentMode) {
         Project project = projectMapper.queryByCode(projectCode);
         //check user access for project
         Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, project, projectCode);
@@ -175,7 +178,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
          */
         int create = this.createCommand(commandType, 
processDefinition.getCode(),
                 taskDependType, failureStrategy, startNodeList, cronTime, 
warningType, loginUser.getId(),
-                warningGroupId, runMode, processInstancePriority, workerGroup, 
environmentCode, startParams, expectedParallelismNumber, dryRun);
+                warningGroupId, runMode, processInstancePriority, workerGroup, 
environmentCode, startParams,
+                expectedParallelismNumber, dryRun, complementDependentMode);
 
         if (create > 0) {
             processDefinition.setWarningGroupId(warningGroupId);
@@ -536,7 +540,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
                               String startNodeList, String schedule, 
WarningType warningType,
                               int executorId, int warningGroupId,
                               RunMode runMode, Priority 
processInstancePriority, String workerGroup, Long environmentCode,
-                              Map<String, String> startParams, Integer 
expectedParallelismNumber, int dryRun) {
+                              Map<String, String> startParams, Integer 
expectedParallelismNumber, int dryRun, ComplementDependentMode 
complementDependentMode) {
 
         /**
          * instantiate command schedule instance
@@ -599,7 +603,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
             if (start == null || end == null) {
                 return 0;
             }
-            return createComplementCommandList(start, end, runMode, command, 
expectedParallelismNumber);
+            return createComplementCommandList(start, end, runMode, command, 
expectedParallelismNumber, complementDependentMode);
         } else {
             command.setCommandParam(JSONUtils.toJsonString(cmdParam));
             return processService.createCommand(command);
@@ -608,15 +612,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
 
     /**
      * create complement command
-     * close left open right
+     * close left and close right
      *
      * @param start
      * @param end
      * @param runMode
      * @return
      */
-    private int createComplementCommandList(Date start, Date end, RunMode 
runMode, Command command, Integer expectedParallelismNumber) {
+    private int createComplementCommandList(Date start, Date end, RunMode 
runMode, Command command,
+                                            Integer expectedParallelismNumber, 
ComplementDependentMode complementDependentMode) {
         int createCount = 0;
+        int dependentProcessDefinitionCreateCount = 0;
+
         runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
         Map<String, String> cmdParam = 
JSONUtils.toMap(command.getCommandParam());
         switch (runMode) {
@@ -629,6 +636,17 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
                 cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, 
DateUtils.dateToString(end));
                 command.setCommandParam(JSONUtils.toJsonString(cmdParam));
                 createCount = processService.createCommand(command);
+
+                // dependent process definition
+                List<Schedule> schedules = 
processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
+
+                if (schedules.isEmpty() || complementDependentMode == 
ComplementDependentMode.OFF_MODE) {
+                    logger.info("process code: {} complement dependent in off 
mode or schedule's size is 0, skip "
+                            + "dependent complement data", 
command.getProcessDefinitionCode());
+                } else {
+                    dependentProcessDefinitionCreateCount += 
createComplementDependentCommand(schedules, command);
+                }
+
                 break;
             }
             case RUN_MODE_PARALLEL: {
@@ -637,7 +655,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
                     break;
                 }
 
-                LinkedList<Date> listDate = new LinkedList<>();
+                List<Date> listDate = new ArrayList<>();
                 List<Schedule> schedules = 
processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
                 listDate.addAll(CronUtils.getSelfFireDateList(start, end, 
schedules));
                 int listDateSize = listDate.size();
@@ -650,7 +668,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
                         }
                     }
                     logger.info("In parallel mode, current 
expectedParallelismNumber:{}", createCount);
-
+                    
                     // Distribute the number of tasks equally to each command.
                     // The last command with insufficient quantity will be 
assigned to the remaining tasks.
                     int itemsPerCommand = (listDateSize / createCount);
@@ -673,6 +691,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
                         cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, 
DateUtils.dateToString(listDate.get(endDateIndex)));
                         
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
                         processService.createCommand(command);
+
+                        if (schedules.isEmpty() || complementDependentMode == 
ComplementDependentMode.OFF_MODE) {
+                            logger.info("process code: {} complement dependent 
in off mode or schedule's size is 0, skip "
+                                    + "dependent complement data", 
command.getProcessDefinitionCode());
+                        } else {
+                            dependentProcessDefinitionCreateCount += 
createComplementDependentCommand(schedules, command);
+                        }
                     }
                 }
                 break;
@@ -680,7 +705,81 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
             default:
                 break;
         }
-        logger.info("create complement command count: {}", createCount);
+        logger.info("create complement command count: {}, create dependent 
complement command count: {}", createCount
+                , dependentProcessDefinitionCreateCount);
         return createCount;
     }
+
+    /**
+     * create complement dependent command
+     */
+    private int createComplementDependentCommand(List<Schedule> schedules, 
Command command) {
+        int dependentProcessDefinitionCreateCount = 0;
+        Command dependentCommand;
+
+        try {
+            dependentCommand = (Command) BeanUtils.cloneBean(command);
+        } catch (Exception e) {
+            logger.error("copy dependent command error: ", e);
+            return dependentProcessDefinitionCreateCount;
+        }
+
+        List<DependentProcessDefinition> dependentProcessDefinitionList =
+                
getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(),
+                        CronUtils.getMaxCycle(schedules.get(0).getCrontab()),
+                        dependentCommand.getWorkerGroup());
+
+        dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
+        for (DependentProcessDefinition dependentProcessDefinition : 
dependentProcessDefinitionList) {
+            
dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode());
+            
dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup());
+            Map<String, String> cmdParam = 
JSONUtils.toMap(dependentCommand.getCommandParam());
+            cmdParam.put(CMD_PARAM_START_NODES, 
String.valueOf(dependentProcessDefinition.getTaskDefinitionCode()));
+            dependentCommand.setCommandParam(JSONUtils.toJsonString(cmdParam));
+            dependentProcessDefinitionCreateCount += 
processService.createCommand(dependentCommand);
+        }
+
+        return dependentProcessDefinitionCreateCount;
+    }
+
+    /**
+     * get complement dependent process definition list
+     */
+    private List<DependentProcessDefinition> 
getComplementDependentDefinitionList(long processDefinitionCode,
+                                                                               
CycleEnum processDefinitionCycle,
+                                                                               
String workerGroup) {
+        List<DependentProcessDefinition> dependentProcessDefinitionList =
+                
processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
+
+        return 
checkDependentProcessDefinitionValid(dependentProcessDefinitionList,processDefinitionCycle,workerGroup);
+    }
+
+    /**
+     *  Check whether the dependency cycle of the dependent node is consistent 
with the schedule cycle of
+     *  the dependent process definition and if there is no worker group in 
the schedule, use the complement selection's
+     *  worker group
+     */
+    private List<DependentProcessDefinition> 
checkDependentProcessDefinitionValid(List<DependentProcessDefinition> 
dependentProcessDefinitionList,
+                                                                             
CycleEnum processDefinitionCycle,
+                                                                             
String workerGroup) {
+        List<DependentProcessDefinition> validDependentProcessDefinitionList = 
new ArrayList<>();
+
+        List<Long> processDefinitionCodeList = 
dependentProcessDefinitionList.stream()
+                .map(DependentProcessDefinition::getProcessDefinitionCode)
+                .collect(Collectors.toList());
+
+        Map<Long, String> processDefinitionWorkerGroupMap = 
processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
+
+        for (DependentProcessDefinition dependentProcessDefinition : 
dependentProcessDefinitionList) {
+            if (dependentProcessDefinition.getDependentCycle() == 
processDefinitionCycle) {
+                if 
(processDefinitionWorkerGroupMap.get(dependentProcessDefinition.getProcessDefinitionCode())
 == null) {
+                    dependentProcessDefinition.setWorkerGroup(workerGroup);
+                }
+
+                
validDependentProcessDefinitionList.add(dependentProcessDefinition);
+            }
+        }
+
+        return validDependentProcessDefinitionList;
+    }
 }
diff --git 
a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties 
b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
index 5c568ae..752313e 100644
--- a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
+++ b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
@@ -179,6 +179,8 @@ PROCESS_INSTANCE_END_TIME=process instance end time
 PROCESS_INSTANCE_SIZE=process instance size
 PROCESS_INSTANCE_PRIORITY=process instance priority
 EXPECTED_PARALLELISM_NUMBER=custom parallelism to set the complement task 
threads
+DRY_RUN=dry run
+COMPLEMENT_DEPENDENT_MODE=complement dependent mode
 UPDATE_SCHEDULE_NOTES=update schedule
 SCHEDULE_ID=schedule id
 ONLINE_SCHEDULE_NOTES=online schedule
diff --git 
a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties 
b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
index 033621c..9492795 100644
--- a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
+++ b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
@@ -165,6 +165,8 @@ RECEIVERS_CC=收件人(抄送)
 WORKER_GROUP_ID=Worker Server分组ID
 PROCESS_INSTANCE_PRIORITY=流程实例优先级
 EXPECTED_PARALLELISM_NUMBER=补数任务自定义并行度
+DRY_RUN=是否空跑
+COMPLEMENT_DEPENDENT_MODE=补数依赖的类型
 UPDATE_SCHEDULE_NOTES=更新定时
 SCHEDULE_ID=定时ID
 ONLINE_SCHEDULE_NOTES=定时上线
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
index c7c4b82..2d5c03a 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
@@ -66,6 +66,7 @@ public class ExecutorControllerTest extends 
AbstractControllerTest {
     final ImmutableMap<String, String> startParams = ImmutableMap.of("start", 
"params");
     final Integer expectedParallelismNumber = 6;
     final int dryRun = 7;
+    final ComplementDependentMode complementDependentMode = 
ComplementDependentMode.OFF_MODE;
 
     final JsonObject expectResponseContent = gson
             .fromJson("{\"code\":0,\"msg\":\"success\",\"data\":\"Test 
Data\",\"success\":true,\"failed\":false}"
@@ -102,7 +103,7 @@ public class ExecutorControllerTest extends 
AbstractControllerTest {
         when(executorService.execProcessInstance(any(User.class), 
eq(projectCode), eq(processDefinitionCode),
                 eq(scheduleTime), eq(execType), eq(failureStrategy), 
eq(startNodeList), eq(taskDependType), eq(warningType),
                 eq(warningGroupId), eq(runMode), eq(processInstancePriority), 
eq(workerGroup), eq(environmentCode),
-                eq(timeout), eq(startParams), eq(expectedParallelismNumber), 
eq(dryRun)))
+                eq(timeout), eq(startParams), eq(expectedParallelismNumber), 
eq(dryRun), eq(complementDependentMode)))
                 .thenReturn(executeServiceResult);
 
         //When
@@ -141,7 +142,8 @@ public class ExecutorControllerTest extends 
AbstractControllerTest {
         when(executorService.execProcessInstance(any(User.class), 
eq(projectCode), eq(processDefinitionCode),
                 eq(scheduleTime), eq(execType), eq(failureStrategy), 
eq(startNodeList), eq(taskDependType), eq(warningType),
                 eq(warningGroupId), eq(runMode), eq(processInstancePriority), 
eq(workerGroup), eq(environmentCode),
-                eq(Constants.MAX_TASK_TIMEOUT), eq(startParams), 
eq(expectedParallelismNumber), eq(dryRun))).thenReturn(executeServiceResult);
+                eq(Constants.MAX_TASK_TIMEOUT), eq(startParams), 
eq(expectedParallelismNumber), eq(dryRun),
+                eq(complementDependentMode))).thenReturn(executeServiceResult);
 
         //When
         final MvcResult mvcResult = 
mockMvc.perform(post("/projects/{projectCode}/executors/start-process-instance",
 projectCode)
@@ -178,7 +180,8 @@ public class ExecutorControllerTest extends 
AbstractControllerTest {
         when(executorService.execProcessInstance(any(User.class), 
eq(projectCode), eq(processDefinitionCode),
                 eq(scheduleTime), eq(execType), eq(failureStrategy), 
eq(startNodeList), eq(taskDependType), eq(warningType),
                 eq(warningGroupId), eq(runMode), eq(processInstancePriority), 
eq(workerGroup), eq(environmentCode),
-                eq(timeout), eq(null), eq(expectedParallelismNumber), 
eq(dryRun))).thenReturn(executeServiceResult);
+                eq(timeout), eq(null), eq(expectedParallelismNumber), 
eq(dryRun),
+                eq(complementDependentMode))).thenReturn(executeServiceResult);
 
         //When
         final MvcResult mvcResult = 
mockMvc.perform(post("/projects/{projectCode}/executors/start-process-instance",
 projectCode)
@@ -203,7 +206,8 @@ public class ExecutorControllerTest extends 
AbstractControllerTest {
         when(executorService.execProcessInstance(any(User.class), 
eq(projectCode), eq(processDefinitionCode),
                                eq(null), eq(null), eq(failureStrategy), 
eq(null), eq(null), eq(warningType),
                 eq(0), eq(null), eq(null), eq("default"), eq(-1L),
-                eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), 
eq(0))).thenReturn(executeServiceResult);
+                eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0),
+                eq(complementDependentMode))).thenReturn(executeServiceResult);
 
                //When
         final MvcResult mvcResult = 
mockMvc.perform(post("/projects/{projectCode}/executors/start-process-instance",
 projectCode)
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index 90fb173..2e4c92f 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -29,6 +29,7 @@ import 
org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl;
 import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
@@ -166,7 +167,8 @@ public class ExecutorServiceTest {
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_SERIAL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 
0, Constants.DRY_RUN_FLAG_NO);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 
0, Constants.DRY_RUN_FLAG_NO,
+                ComplementDependentMode.OFF_MODE);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(processService, times(1)).createCommand(any(Command.class));
 
@@ -184,7 +186,8 @@ public class ExecutorServiceTest {
                 null, "n1,n2",
                 null, null, 0,
                 RunMode.RUN_MODE_SERIAL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 
0, Constants.DRY_RUN_FLAG_NO);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 
0, Constants.DRY_RUN_FLAG_NO,
+                ComplementDependentMode.OFF_MODE);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(processService, times(1)).createCommand(any(Command.class));
 
@@ -202,7 +205,8 @@ public class ExecutorServiceTest {
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_SERIAL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 
0, Constants.DRY_RUN_FLAG_NO);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 
0, Constants.DRY_RUN_FLAG_NO,
+                ComplementDependentMode.OFF_MODE);
         Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, 
result.get(Constants.STATUS));
         verify(processService, times(0)).createCommand(any(Command.class));
     }
@@ -219,7 +223,8 @@ public class ExecutorServiceTest {
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_SERIAL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 
0, Constants.DRY_RUN_FLAG_NO);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 
0, Constants.DRY_RUN_FLAG_NO,
+                ComplementDependentMode.OFF_MODE);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(processService, times(1)).createCommand(any(Command.class));
     }
@@ -236,7 +241,8 @@ public class ExecutorServiceTest {
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_PARALLEL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 
0, Constants.DRY_RUN_FLAG_NO);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 
0, Constants.DRY_RUN_FLAG_NO,
+                ComplementDependentMode.OFF_MODE);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(processService, times(31)).createCommand(any(Command.class));
 
@@ -254,7 +260,8 @@ public class ExecutorServiceTest {
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_PARALLEL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 
15, Constants.DRY_RUN_FLAG_NO);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 
15, Constants.DRY_RUN_FLAG_NO,
+                ComplementDependentMode.OFF_MODE);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(processService, times(15)).createCommand(any(Command.class));
 
@@ -269,7 +276,8 @@ public class ExecutorServiceTest {
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_PARALLEL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 
0, Constants.DRY_RUN_FLAG_NO);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 
0, Constants.DRY_RUN_FLAG_NO,
+                ComplementDependentMode.OFF_MODE);
         Assert.assertEquals(result.get(Constants.STATUS), 
Status.MASTER_NOT_EXISTS);
 
     }
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ComplementDependentMode.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ComplementDependentMode.java
new file mode 100644
index 0000000..68f8e57
--- /dev/null
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ComplementDependentMode.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.enums;
+
+import com.baomidou.mybatisplus.annotation.EnumValue;
+
+/**
+ * task node depend type
+ */
+public enum ComplementDependentMode {
+    /**
+     * 0 off mode
+     * 1 run complement data with all dependent process
+     */
+    OFF_MODE(0,"off mode"),
+    ALL_DEPENDENT(1,"all dependent");
+
+    ComplementDependentMode(int code, String desc) {
+        this.code = code;
+        this.desc = desc;
+    }
+
+    @EnumValue
+    private final int code;
+    private final String desc;
+
+    public int getCode() {
+        return code;
+    }
+
+    public String getDesc() {
+        return desc;
+    }
+}
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
new file mode 100644
index 0000000..3952b40
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.entity;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CycleEnum;
+import org.apache.dolphinscheduler.common.model.DependentItem;
+import org.apache.dolphinscheduler.common.model.DependentTaskModel;
+import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+
+import java.util.List;
+
+/**
+ * dependent process definition
+ */
+public class DependentProcessDefinition {
+
+    /**
+     * process definition code
+     */
+    private long processDefinitionCode;
+
+    /**
+     * process definition name
+     */
+    private String processDefinitionName;
+
+    /**
+     * task definition name
+     */
+    private long taskDefinitionCode;
+
+    /**
+     * task definition params
+     */
+    private String taskParams;
+
+    /**
+     * schedule worker group
+     */
+    private String workerGroup;
+
+    /**
+     * get dependent cycle
+     * @return CycleEnum
+     */
+    public CycleEnum getDependentCycle() {
+        DependentParameters dependentParameters = 
this.getDependentParameters();
+        List<DependentTaskModel> dependentTaskModelList = 
dependentParameters.getDependTaskList();
+
+        for (DependentTaskModel dependentTaskModel : dependentTaskModelList) {
+            List<DependentItem> dependentItemList = 
dependentTaskModel.getDependItemList();
+            for (DependentItem dependentItem : dependentItemList) {
+                if (this.getProcessDefinitionCode() == 
dependentItem.getDefinitionCode()) {
+                    return cycle2CycleEnum(dependentItem.getCycle());
+                }
+            }
+        }
+
+        return CycleEnum.DAY;
+    }
+
+    public CycleEnum cycle2CycleEnum(String cycle) {
+        CycleEnum cycleEnum = null;
+
+        switch (cycle) {
+            case "day":
+                cycleEnum = CycleEnum.DAY;
+                break;
+            case "hour":
+                cycleEnum =  CycleEnum.HOUR;
+                break;
+            case "week":
+                cycleEnum =  CycleEnum.WEEK;
+                break;
+            case "month":
+                cycleEnum =  CycleEnum.MONTH;
+                break;
+            default:
+                break;
+        }
+        return cycleEnum;
+    }
+
+    public DependentParameters getDependentParameters() {
+        return JSONUtils.parseObject(getDependence(), 
DependentParameters.class);
+    }
+
+    public String getDependence() {
+        return JSONUtils.getNodeString(this.taskParams, Constants.DEPENDENCE);
+    }
+
+    public String getProcessDefinitionName() {
+        return this.processDefinitionName;
+    }
+
+    public void setprocessDefinitionName(String name) {
+        this.processDefinitionName = name;
+    }
+
+    public long getProcessDefinitionCode() {
+        return this.processDefinitionCode;
+    }
+
+    public void setProcessDefinitionCode(long code) {
+        this.processDefinitionCode = code;
+    }
+
+    public long getTaskDefinitionCode() {
+        return this.taskDefinitionCode;
+    }
+
+    public void setTaskDefinitionCode(long code) {
+        this.taskDefinitionCode = code;
+    }
+
+    public String getTaskParams() {
+        return this.taskParams;
+    }
+
+    public void setTaskParams(String taskParams) {
+        this.taskParams = taskParams;
+    }
+
+    public String getWorkerGroup() {
+        return this.workerGroup;
+    }
+
+    public void setWorkerGroup(String workerGroup) {
+        this.workerGroup = workerGroup;
+    }
+
+}
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
index d0b2d32..2af88fc 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
@@ -86,4 +86,12 @@ public interface ScheduleMapper extends BaseMapper<Schedule> 
{
      * @return schedule
      */
     Schedule queryByProcessDefinitionCode(@Param("processDefinitionCode") long 
processDefinitionCode);
+
+    /**
+     * query worker group list by process definition code
+     *
+     * @param processDefinitionCodeList processDefinitionCodeList
+     * @return schedule
+     */
+    List<Schedule> 
querySchedulesByProcessDefinitionCodes(@Param("processDefinitionCodeList") 
List<Long> processDefinitionCodeList);
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
index 249e42a..314f542 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.dao.mapper;
 
+import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
 import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
 
@@ -69,4 +70,12 @@ public interface WorkFlowLineageMapper {
      */
     List<ProcessLineage> queryProcessLineageByCode(@Param("projectCode") long 
projectCode,
                                                    
@Param("processDefinitionCode") long processDefinitionCode);
+
+
+    /**
+     * query process definition by name
+     *
+     * @return dependent process definition
+     */
+    List<DependentProcessDefinition> 
queryDependentProcessDefinitionByProcessDefinitionCode(@Param("code") long 
code);
 }
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml
index 85a26dd..b22ac9f 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml
@@ -70,6 +70,17 @@
         from t_ds_schedules
         where process_definition_code = #{processDefinitionCode}
     </select>
+
+    <select id="querySchedulesByProcessDefinitionCodes" 
resultType="org.apache.dolphinscheduler.dao.entity.Schedule">
+        select
+        <include refid="baseSql"/>
+        from t_ds_schedules
+        where process_definition_code in
+        <foreach collection="processDefinitionCodeList" item="code" 
index="index" open="(" close=")" separator=",">
+            #{code}
+        </foreach>
+    </select>
+
     <select id="queryReleaseSchedulerListByProcessDefinitionCode"
             resultType="org.apache.dolphinscheduler.dao.entity.Schedule">
         select
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
index 2d94b04..eed6476 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
@@ -88,4 +88,21 @@
          where project_code = #{projectCode}
                 and process_definition_code = #{processDefinitionCode}
     </select>
+
+    <select id="queryDependentProcessDefinitionByProcessDefinitionCode" 
resultType="DependentProcessDefinition">
+        SELECT
+        c.code AS process_definition_code
+        ,c.name AS process_definition_name
+        ,a.code AS task_definition_code
+        ,a.task_params
+        FROM
+        t_ds_task_definition a
+        JOIN t_ds_process_task_relation b ON a.code    = b.pre_task_code and 
a.version = b.pre_task_version
+        JOIN t_ds_process_definition c ON c.code = b.process_definition_code 
AND c.version = b.process_definition_version AND c.project_code = b.project_code
+        WHERE 1=1
+        <if test="code != null and code != ''">
+            AND a.task_params LIKE concat('%', #{code}, '%')
+        </if>
+        AND a.task_type = 'DEPENDENT'
+    </select>
 </mapper>
diff --git 
a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
 
b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
index cea26c7..9fa1366 100644
--- 
a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
+++ 
b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
@@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.api.service.TenantService;
 import org.apache.dolphinscheduler.api.service.UsersService;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
@@ -94,6 +95,7 @@ public class PythonGatewayServer extends 
SpringBootServletInitializer {
     private static final TaskDependType DEFAULT_TASK_DEPEND_TYPE = 
TaskDependType.TASK_POST;
     private static final RunMode DEFAULT_RUN_MODE = RunMode.RUN_MODE_SERIAL;
     private static final int DEFAULT_DRY_RUN = 0;
+    private static final ComplementDependentMode COMPLEMENT_DEPENDENT_MODE = 
ComplementDependentMode.OFF_MODE;
 
     @Autowired
     private ProcessDefinitionMapper processDefinitionMapper;
@@ -341,7 +343,8 @@ public class PythonGatewayServer extends 
SpringBootServletInitializer {
             timeout,
             null,
             null,
-            DEFAULT_DRY_RUN
+            DEFAULT_DRY_RUN,
+            COMPLEMENT_DEPENDENT_MODE
         );
     }
 
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 43714e7..8fabaf9 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -60,6 +60,7 @@ import 
org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.DagData;
 import org.apache.dolphinscheduler.dao.entity.DataSource;
+import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.DqComparisonType;
 import org.apache.dolphinscheduler.dao.entity.DqExecuteResult;
 import org.apache.dolphinscheduler.dao.entity.DqRule;
@@ -114,6 +115,7 @@ import 
org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
+import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
 import org.apache.dolphinscheduler.dao.utils.DagHelper;
 import org.apache.dolphinscheduler.dao.utils.DqRuleUtils;
 import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
@@ -142,6 +144,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Collector;
 import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
@@ -256,6 +259,9 @@ public class ProcessService {
     @Autowired
     private TaskGroupMapper taskGroupMapper;
 
+    @Autowired
+    private WorkFlowLineageMapper workFlowLineageMapper;
+
     /**
      * handle Command (construct ProcessInstance from Command) , wrapped in 
transaction
      *
@@ -1901,6 +1907,28 @@ public class ProcessService {
     }
 
     /**
+     * query Schedule by processDefinitionCode
+     *
+     * @param processDefinitionCodeList processDefinitionCodeList
+     * @see Schedule
+     */
+    public Map<Long, String> 
queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList) {
+        List<Schedule> processDefinitionScheduleList = 
scheduleMapper.querySchedulesByProcessDefinitionCodes(processDefinitionCodeList);
+        return 
processDefinitionScheduleList.stream().collect(Collectors.toMap(Schedule::getProcessDefinitionCode,
+                Schedule::getWorkerGroup));
+    }
+
+    /**
+     * query dependent process definition by process definition code
+     *
+     * @param processDefinitionCode processDefinitionCode
+     * @see DependentProcessDefinition
+     */
+    public List<DependentProcessDefinition> 
queryDependentProcessDefinitionByProcessDefinitionCode(long 
processDefinitionCode) {
+        return 
workFlowLineageMapper.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
+    }
+
+    /**
      * query need failover process instance
      *
      * @param host host

Reply via email to