This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 891a002 [Feature-#8373][MasterServer] Dependent tasks can re-run
automatically in the case of complement (#8496)
891a002 is described below
commit 891a002beac4b429e20c03aeae77da4c72c39f6c
Author: xiangzihao <[email protected]>
AuthorDate: Fri Feb 25 10:34:02 2022 +0800
[Feature-#8373][MasterServer] Dependent tasks can re-run automatically in
the case of complement (#8496)
* first add feature_8373
* fix code smell
* add blank line
* fix some problems
* fix unit test error
---
.../api/controller/ExecutorController.java | 34 ++++-
.../api/service/ExecutorService.java | 4 +-
.../api/service/impl/ExecutorServiceImpl.java | 123 +++++++++++++++--
.../main/resources/i18n/messages_en_US.properties | 2 +
.../main/resources/i18n/messages_zh_CN.properties | 2 +
.../api/controller/ExecutorControllerTest.java | 12 +-
.../api/service/ExecutorServiceTest.java | 22 ++-
.../common/enums/ComplementDependentMode.java | 49 +++++++
.../dao/entity/DependentProcessDefinition.java | 149 +++++++++++++++++++++
.../dao/mapper/ScheduleMapper.java | 8 ++
.../dao/mapper/WorkFlowLineageMapper.java | 9 ++
.../dolphinscheduler/dao/mapper/ScheduleMapper.xml | 11 ++
.../dao/mapper/WorkFlowLineageMapper.xml | 17 +++
.../server/PythonGatewayServer.java | 5 +-
.../service/process/ProcessService.java | 28 ++++
15 files changed, 443 insertions(+), 32 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
index 45bfecc..10c6177 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
@@ -29,6 +29,7 @@ import
org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.RunMode;
@@ -107,7 +108,9 @@ public class ExecutorController extends BaseController {
@ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP",
dataType = "String", example = "default"),
@ApiImplicitParam(name = "environmentCode", value =
"ENVIRONMENT_CODE", dataType = "Long", example = "-1"),
@ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType =
"Int", example = "100"),
- @ApiImplicitParam(name = "expectedParallelismNumber", value =
"EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8")
+ @ApiImplicitParam(name = "expectedParallelismNumber", value =
"EXPECTED_PARALLELISM_NUMBER", dataType = "Int" , example = "8"),
+ @ApiImplicitParam(name = "dryRun", value = "DRY_RUN", dataType =
"Int", example = "0"),
+ @ApiImplicitParam(name = "complementDependentMode", value =
"COMPLEMENT_DEPENDENT_MODE", dataType = "complementDependentMode")
})
@PostMapping(value = "start-process-instance")
@ResponseStatus(HttpStatus.OK)
@@ -130,7 +133,8 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "timeout",
required = false) Integer timeout,
@RequestParam(value = "startParams",
required = false) String startParams,
@RequestParam(value =
"expectedParallelismNumber", required = false) Integer
expectedParallelismNumber,
- @RequestParam(value = "dryRun",
defaultValue = "0", required = false) int dryRun) {
+ @RequestParam(value = "dryRun",
defaultValue = "0", required = false) int dryRun,
+ @RequestParam(value =
"complementDependentMode", required = false) ComplementDependentMode
complementDependentMode) {
if (timeout == null) {
timeout = Constants.MAX_TASK_TIMEOUT;
@@ -139,8 +143,15 @@ public class ExecutorController extends BaseController {
if (startParams != null) {
startParamMap = JSONUtils.toMap(startParams);
}
- Map<String, Object> result =
execService.execProcessInstance(loginUser, projectCode, processDefinitionCode,
scheduleTime, execType, failureStrategy,
- startNodeList, taskDependType, warningType, warningGroupId,
runMode, processInstancePriority, workerGroup, environmentCode,timeout,
startParamMap, expectedParallelismNumber, dryRun);
+
+ if (complementDependentMode == null) {
+ complementDependentMode = ComplementDependentMode.OFF_MODE;
+ }
+
+ Map<String, Object> result =
execService.execProcessInstance(loginUser, projectCode, processDefinitionCode,
+ scheduleTime, execType, failureStrategy,
+ startNodeList, taskDependType, warningType, warningGroupId,
runMode, processInstancePriority,
+ workerGroup, environmentCode, timeout, startParamMap,
expectedParallelismNumber, dryRun, complementDependentMode);
return returnDataList(result);
}
@@ -181,7 +192,9 @@ public class ExecutorController extends BaseController {
@ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP",
dataType = "String", example = "default"),
@ApiImplicitParam(name = "environmentCode", value =
"ENVIRONMENT_CODE", dataType = "Long", example = "-1"),
@ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType =
"Int", example = "100"),
- @ApiImplicitParam(name = "expectedParallelismNumber", value =
"EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8")
+ @ApiImplicitParam(name = "expectedParallelismNumber", value =
"EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8"),
+ @ApiImplicitParam(name = "dryRun", value = "DRY_RUN", dataType =
"Int", example = "0"),
+ @ApiImplicitParam(name = "complementDependentMode", value =
"COMPLEMENT_DEPENDENT_MODE", dataType = "complementDependentMode")
})
@PostMapping(value = "batch-start-process-instance")
@ResponseStatus(HttpStatus.OK)
@@ -204,7 +217,8 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "timeout",
required = false) Integer timeout,
@RequestParam(value = "startParams",
required = false) String startParams,
@RequestParam(value =
"expectedParallelismNumber", required = false) Integer
expectedParallelismNumber,
- @RequestParam(value = "dryRun",
defaultValue = "0", required = false) int dryRun) {
+ @RequestParam(value = "dryRun",
defaultValue = "0", required = false) int dryRun,
+ @RequestParam(value =
"complementDependentMode", required = false) ComplementDependentMode
complementDependentMode) {
if (timeout == null) {
timeout = Constants.MAX_TASK_TIMEOUT;
@@ -215,6 +229,10 @@ public class ExecutorController extends BaseController {
startParamMap = JSONUtils.toMap(startParams);
}
+ if (complementDependentMode == null) {
+ complementDependentMode = ComplementDependentMode.OFF_MODE;
+ }
+
Map<String, Object> result = new HashMap<>();
List<String> processDefinitionCodeArray =
Arrays.asList(processDefinitionCodes.split(Constants.COMMA));
List<String> startFailedProcessDefinitionCodeList = new ArrayList<>();
@@ -224,7 +242,9 @@ public class ExecutorController extends BaseController {
for (String strProcessDefinitionCode : processDefinitionCodeArray) {
long processDefinitionCode =
Long.parseLong(strProcessDefinitionCode);
result = execService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, scheduleTime, execType, failureStrategy,
- startNodeList, taskDependType, warningType,
warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode,
timeout, startParamMap, expectedParallelismNumber, dryRun);
+ startNodeList, taskDependType, warningType,
warningGroupId, runMode, processInstancePriority,
+ workerGroup, environmentCode, timeout, startParamMap,
expectedParallelismNumber, dryRun,
+ complementDependentMode);
if (!Status.SUCCESS.equals(result.get(Constants.STATUS))) {
startFailedProcessDefinitionCodeList.add(String.valueOf(processDefinitionCode));
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
index 2fa065b..1087d59 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.RunMode;
@@ -63,7 +64,8 @@ public interface ExecutorService {
RunMode runMode,
Priority processInstancePriority,
String workerGroup, Long environmentCode, Integer timeout,
Map<String, String> startParams,
Integer expectedParallelismNumber,
- int dryRun);
+ int dryRun,
+ ComplementDependentMode
complementDependentMode);
/**
* check whether the process definition can be executed
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 5991e79..6dc8ab7 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -24,6 +24,7 @@ import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
+import org.apache.commons.beanutils.BeanUtils;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ExecutorService;
@@ -31,6 +32,8 @@ import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
+import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
@@ -44,6 +47,7 @@ import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
@@ -66,9 +70,9 @@ import org.apache.commons.lang.StringUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,11 +101,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
@Autowired
private MonitorService monitorService;
-
@Autowired
private ProcessInstanceMapper processInstanceMapper;
-
@Autowired
private ProcessService processService;
@@ -138,7 +140,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
RunMode runMode,
Priority
processInstancePriority, String workerGroup, Long environmentCode,Integer
timeout,
Map<String, String>
startParams, Integer expectedParallelismNumber,
- int dryRun) {
+ int dryRun,
+ ComplementDependentMode
complementDependentMode) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode);
@@ -175,7 +178,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
*/
int create = this.createCommand(commandType,
processDefinition.getCode(),
taskDependType, failureStrategy, startNodeList, cronTime,
warningType, loginUser.getId(),
- warningGroupId, runMode, processInstancePriority, workerGroup,
environmentCode, startParams, expectedParallelismNumber, dryRun);
+ warningGroupId, runMode, processInstancePriority, workerGroup,
environmentCode, startParams,
+ expectedParallelismNumber, dryRun, complementDependentMode);
if (create > 0) {
processDefinition.setWarningGroupId(warningGroupId);
@@ -536,7 +540,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
String startNodeList, String schedule,
WarningType warningType,
int executorId, int warningGroupId,
RunMode runMode, Priority
processInstancePriority, String workerGroup, Long environmentCode,
- Map<String, String> startParams, Integer
expectedParallelismNumber, int dryRun) {
+ Map<String, String> startParams, Integer
expectedParallelismNumber, int dryRun, ComplementDependentMode
complementDependentMode) {
/**
* instantiate command schedule instance
@@ -599,7 +603,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
if (start == null || end == null) {
return 0;
}
- return createComplementCommandList(start, end, runMode, command,
expectedParallelismNumber);
+ return createComplementCommandList(start, end, runMode, command,
expectedParallelismNumber, complementDependentMode);
} else {
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
return processService.createCommand(command);
@@ -608,15 +612,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
/**
* create complement command
- * close left open right
+ * close left and close right
*
* @param start
* @param end
* @param runMode
* @return
*/
- private int createComplementCommandList(Date start, Date end, RunMode
runMode, Command command, Integer expectedParallelismNumber) {
+ private int createComplementCommandList(Date start, Date end, RunMode
runMode, Command command,
+ Integer expectedParallelismNumber,
ComplementDependentMode complementDependentMode) {
int createCount = 0;
+ int dependentProcessDefinitionCreateCount = 0;
+
runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
Map<String, String> cmdParam =
JSONUtils.toMap(command.getCommandParam());
switch (runMode) {
@@ -629,6 +636,17 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE,
DateUtils.dateToString(end));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
createCount = processService.createCommand(command);
+
+ // dependent process definition
+ List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
+
+ if (schedules.isEmpty() || complementDependentMode ==
ComplementDependentMode.OFF_MODE) {
+ logger.info("process code: {} complement dependent in off
mode or schedule's size is 0, skip "
+ + "dependent complement data",
command.getProcessDefinitionCode());
+ } else {
+ dependentProcessDefinitionCreateCount +=
createComplementDependentCommand(schedules, command);
+ }
+
break;
}
case RUN_MODE_PARALLEL: {
@@ -637,7 +655,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
break;
}
- LinkedList<Date> listDate = new LinkedList<>();
+ List<Date> listDate = new ArrayList<>();
List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
listDate.addAll(CronUtils.getSelfFireDateList(start, end,
schedules));
int listDateSize = listDate.size();
@@ -650,7 +668,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
}
}
logger.info("In parallel mode, current
expectedParallelismNumber:{}", createCount);
-
+
// Distribute the number of tasks equally to each command.
// The last command with insufficient quantity will be
assigned to the remaining tasks.
int itemsPerCommand = (listDateSize / createCount);
@@ -673,6 +691,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE,
DateUtils.dateToString(listDate.get(endDateIndex)));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
processService.createCommand(command);
+
+ if (schedules.isEmpty() || complementDependentMode ==
ComplementDependentMode.OFF_MODE) {
+ logger.info("process code: {} complement dependent
in off mode or schedule's size is 0, skip "
+ + "dependent complement data",
command.getProcessDefinitionCode());
+ } else {
+ dependentProcessDefinitionCreateCount +=
createComplementDependentCommand(schedules, command);
+ }
}
}
break;
@@ -680,7 +705,81 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
default:
break;
}
- logger.info("create complement command count: {}", createCount);
+ logger.info("create complement command count: {}, create dependent
complement command count: {}", createCount
+ , dependentProcessDefinitionCreateCount);
return createCount;
}
+
+ /**
+ * create complement dependent command
+ */
+ private int createComplementDependentCommand(List<Schedule> schedules,
Command command) {
+ int dependentProcessDefinitionCreateCount = 0;
+ Command dependentCommand;
+
+ try {
+ dependentCommand = (Command) BeanUtils.cloneBean(command);
+ } catch (Exception e) {
+ logger.error("copy dependent command error: ", e);
+ return dependentProcessDefinitionCreateCount;
+ }
+
+ List<DependentProcessDefinition> dependentProcessDefinitionList =
+
getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(),
+ CronUtils.getMaxCycle(schedules.get(0).getCrontab()),
+ dependentCommand.getWorkerGroup());
+
+ dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
+ for (DependentProcessDefinition dependentProcessDefinition :
dependentProcessDefinitionList) {
+
dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode());
+
dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup());
+ Map<String, String> cmdParam =
JSONUtils.toMap(dependentCommand.getCommandParam());
+ cmdParam.put(CMD_PARAM_START_NODES,
String.valueOf(dependentProcessDefinition.getTaskDefinitionCode()));
+ dependentCommand.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ dependentProcessDefinitionCreateCount +=
processService.createCommand(dependentCommand);
+ }
+
+ return dependentProcessDefinitionCreateCount;
+ }
+
+ /**
+ * get complement dependent process definition list
+ */
+ private List<DependentProcessDefinition>
getComplementDependentDefinitionList(long processDefinitionCode,
+
CycleEnum processDefinitionCycle,
+
String workerGroup) {
+ List<DependentProcessDefinition> dependentProcessDefinitionList =
+
processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
+
+ return
checkDependentProcessDefinitionValid(dependentProcessDefinitionList,processDefinitionCycle,workerGroup);
+ }
+
+ /**
+ * Check whether the dependency cycle of the dependent node is consistent
with the schedule cycle of
+ * the dependent process definition and if there is no worker group in
the schedule, use the complement selection's
+ * worker group
+ */
+ private List<DependentProcessDefinition>
checkDependentProcessDefinitionValid(List<DependentProcessDefinition>
dependentProcessDefinitionList,
+
CycleEnum processDefinitionCycle,
+
String workerGroup) {
+ List<DependentProcessDefinition> validDependentProcessDefinitionList =
new ArrayList<>();
+
+ List<Long> processDefinitionCodeList =
dependentProcessDefinitionList.stream()
+ .map(DependentProcessDefinition::getProcessDefinitionCode)
+ .collect(Collectors.toList());
+
+ Map<Long, String> processDefinitionWorkerGroupMap =
processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
+
+ for (DependentProcessDefinition dependentProcessDefinition :
dependentProcessDefinitionList) {
+ if (dependentProcessDefinition.getDependentCycle() ==
processDefinitionCycle) {
+ if
(processDefinitionWorkerGroupMap.get(dependentProcessDefinition.getProcessDefinitionCode())
== null) {
+ dependentProcessDefinition.setWorkerGroup(workerGroup);
+ }
+
+
validDependentProcessDefinitionList.add(dependentProcessDefinition);
+ }
+ }
+
+ return validDependentProcessDefinitionList;
+ }
}
diff --git
a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
index 5c568ae..752313e 100644
--- a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
+++ b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
@@ -179,6 +179,8 @@ PROCESS_INSTANCE_END_TIME=process instance end time
PROCESS_INSTANCE_SIZE=process instance size
PROCESS_INSTANCE_PRIORITY=process instance priority
EXPECTED_PARALLELISM_NUMBER=custom parallelism to set the complement task
threads
+DRY_RUN=dry run
+COMPLEMENT_DEPENDENT_MODE=complement dependent mode
UPDATE_SCHEDULE_NOTES=update schedule
SCHEDULE_ID=schedule id
ONLINE_SCHEDULE_NOTES=online schedule
diff --git
a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
index 033621c..9492795 100644
--- a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
+++ b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
@@ -165,6 +165,8 @@ RECEIVERS_CC=收件人(抄送)
WORKER_GROUP_ID=Worker Server分组ID
PROCESS_INSTANCE_PRIORITY=流程实例优先级
EXPECTED_PARALLELISM_NUMBER=补数任务自定义并行度
+DRY_RUN=是否空跑
+COMPLEMENT_DEPENDENT_MODE=补数依赖的类型
UPDATE_SCHEDULE_NOTES=更新定时
SCHEDULE_ID=定时ID
ONLINE_SCHEDULE_NOTES=定时上线
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
index c7c4b82..2d5c03a 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
@@ -66,6 +66,7 @@ public class ExecutorControllerTest extends
AbstractControllerTest {
final ImmutableMap<String, String> startParams = ImmutableMap.of("start",
"params");
final Integer expectedParallelismNumber = 6;
final int dryRun = 7;
+ final ComplementDependentMode complementDependentMode =
ComplementDependentMode.OFF_MODE;
final JsonObject expectResponseContent = gson
.fromJson("{\"code\":0,\"msg\":\"success\",\"data\":\"Test
Data\",\"success\":true,\"failed\":false}"
@@ -102,7 +103,7 @@ public class ExecutorControllerTest extends
AbstractControllerTest {
when(executorService.execProcessInstance(any(User.class),
eq(projectCode), eq(processDefinitionCode),
eq(scheduleTime), eq(execType), eq(failureStrategy),
eq(startNodeList), eq(taskDependType), eq(warningType),
eq(warningGroupId), eq(runMode), eq(processInstancePriority),
eq(workerGroup), eq(environmentCode),
- eq(timeout), eq(startParams), eq(expectedParallelismNumber),
eq(dryRun)))
+ eq(timeout), eq(startParams), eq(expectedParallelismNumber),
eq(dryRun), eq(complementDependentMode)))
.thenReturn(executeServiceResult);
//When
@@ -141,7 +142,8 @@ public class ExecutorControllerTest extends
AbstractControllerTest {
when(executorService.execProcessInstance(any(User.class),
eq(projectCode), eq(processDefinitionCode),
eq(scheduleTime), eq(execType), eq(failureStrategy),
eq(startNodeList), eq(taskDependType), eq(warningType),
eq(warningGroupId), eq(runMode), eq(processInstancePriority),
eq(workerGroup), eq(environmentCode),
- eq(Constants.MAX_TASK_TIMEOUT), eq(startParams),
eq(expectedParallelismNumber), eq(dryRun))).thenReturn(executeServiceResult);
+ eq(Constants.MAX_TASK_TIMEOUT), eq(startParams),
eq(expectedParallelismNumber), eq(dryRun),
+ eq(complementDependentMode))).thenReturn(executeServiceResult);
//When
final MvcResult mvcResult =
mockMvc.perform(post("/projects/{projectCode}/executors/start-process-instance",
projectCode)
@@ -178,7 +180,8 @@ public class ExecutorControllerTest extends
AbstractControllerTest {
when(executorService.execProcessInstance(any(User.class),
eq(projectCode), eq(processDefinitionCode),
eq(scheduleTime), eq(execType), eq(failureStrategy),
eq(startNodeList), eq(taskDependType), eq(warningType),
eq(warningGroupId), eq(runMode), eq(processInstancePriority),
eq(workerGroup), eq(environmentCode),
- eq(timeout), eq(null), eq(expectedParallelismNumber),
eq(dryRun))).thenReturn(executeServiceResult);
+ eq(timeout), eq(null), eq(expectedParallelismNumber),
eq(dryRun),
+ eq(complementDependentMode))).thenReturn(executeServiceResult);
//When
final MvcResult mvcResult =
mockMvc.perform(post("/projects/{projectCode}/executors/start-process-instance",
projectCode)
@@ -203,7 +206,8 @@ public class ExecutorControllerTest extends
AbstractControllerTest {
when(executorService.execProcessInstance(any(User.class),
eq(projectCode), eq(processDefinitionCode),
eq(null), eq(null), eq(failureStrategy),
eq(null), eq(null), eq(warningType),
eq(0), eq(null), eq(null), eq("default"), eq(-1L),
- eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null),
eq(0))).thenReturn(executeServiceResult);
+ eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0),
+ eq(complementDependentMode))).thenReturn(executeServiceResult);
//When
final MvcResult mvcResult =
mockMvc.perform(post("/projects/{projectCode}/executors/start-process-instance",
projectCode)
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index 90fb173..2e4c92f 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -29,6 +29,7 @@ import
org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
@@ -166,7 +167,8 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null,
0, Constants.DRY_RUN_FLAG_NO);
+ Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null,
0, Constants.DRY_RUN_FLAG_NO,
+ ComplementDependentMode.OFF_MODE);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
@@ -184,7 +186,8 @@ public class ExecutorServiceTest {
null, "n1,n2",
null, null, 0,
RunMode.RUN_MODE_SERIAL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null,
0, Constants.DRY_RUN_FLAG_NO);
+ Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null,
0, Constants.DRY_RUN_FLAG_NO,
+ ComplementDependentMode.OFF_MODE);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
@@ -202,7 +205,8 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null,
0, Constants.DRY_RUN_FLAG_NO);
+ Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null,
0, Constants.DRY_RUN_FLAG_NO,
+ ComplementDependentMode.OFF_MODE);
Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR,
result.get(Constants.STATUS));
verify(processService, times(0)).createCommand(any(Command.class));
}
@@ -219,7 +223,8 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null,
0, Constants.DRY_RUN_FLAG_NO);
+ Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null,
0, Constants.DRY_RUN_FLAG_NO,
+ ComplementDependentMode.OFF_MODE);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
}
@@ -236,7 +241,8 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null,
0, Constants.DRY_RUN_FLAG_NO);
+ Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null,
0, Constants.DRY_RUN_FLAG_NO,
+ ComplementDependentMode.OFF_MODE);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(31)).createCommand(any(Command.class));
@@ -254,7 +260,8 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null,
15, Constants.DRY_RUN_FLAG_NO);
+ Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null,
15, Constants.DRY_RUN_FLAG_NO,
+ ComplementDependentMode.OFF_MODE);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(15)).createCommand(any(Command.class));
@@ -269,7 +276,8 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null,
0, Constants.DRY_RUN_FLAG_NO);
+ Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null,
0, Constants.DRY_RUN_FLAG_NO,
+ ComplementDependentMode.OFF_MODE);
Assert.assertEquals(result.get(Constants.STATUS),
Status.MASTER_NOT_EXISTS);
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ComplementDependentMode.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ComplementDependentMode.java
new file mode 100644
index 0000000..68f8e57
--- /dev/null
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ComplementDependentMode.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.enums;
+
+import com.baomidou.mybatisplus.annotation.EnumValue;
+
+/**
+ * task node depend type
+ */
+public enum ComplementDependentMode {
+ /**
+ * 0 off mode
+ * 1 run complement data with all dependent process
+ */
+ OFF_MODE(0,"off mode"),
+ ALL_DEPENDENT(1,"all dependent");
+
+ ComplementDependentMode(int code, String desc) {
+ this.code = code;
+ this.desc = desc;
+ }
+
+ @EnumValue
+ private final int code;
+ private final String desc;
+
+ public int getCode() {
+ return code;
+ }
+
+ public String getDesc() {
+ return desc;
+ }
+}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
new file mode 100644
index 0000000..3952b40
--- /dev/null
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.entity;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CycleEnum;
+import org.apache.dolphinscheduler.common.model.DependentItem;
+import org.apache.dolphinscheduler.common.model.DependentTaskModel;
+import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+
+import java.util.List;
+
+/**
+ * dependent process definition
+ */
+public class DependentProcessDefinition {
+
+ /**
+ * process definition code
+ */
+ private long processDefinitionCode;
+
+ /**
+ * process definition name
+ */
+ private String processDefinitionName;
+
+ /**
+ * task definition name
+ */
+ private long taskDefinitionCode;
+
+ /**
+ * task definition params
+ */
+ private String taskParams;
+
+ /**
+ * schedule worker group
+ */
+ private String workerGroup;
+
+ /**
+ * get dependent cycle
+ * @return CycleEnum
+ */
+ public CycleEnum getDependentCycle() {
+ DependentParameters dependentParameters =
this.getDependentParameters();
+ List<DependentTaskModel> dependentTaskModelList =
dependentParameters.getDependTaskList();
+
+ for (DependentTaskModel dependentTaskModel : dependentTaskModelList) {
+ List<DependentItem> dependentItemList =
dependentTaskModel.getDependItemList();
+ for (DependentItem dependentItem : dependentItemList) {
+ if (this.getProcessDefinitionCode() ==
dependentItem.getDefinitionCode()) {
+ return cycle2CycleEnum(dependentItem.getCycle());
+ }
+ }
+ }
+
+ return CycleEnum.DAY;
+ }
+
+ public CycleEnum cycle2CycleEnum(String cycle) {
+ CycleEnum cycleEnum = null;
+
+ switch (cycle) {
+ case "day":
+ cycleEnum = CycleEnum.DAY;
+ break;
+ case "hour":
+ cycleEnum = CycleEnum.HOUR;
+ break;
+ case "week":
+ cycleEnum = CycleEnum.WEEK;
+ break;
+ case "month":
+ cycleEnum = CycleEnum.MONTH;
+ break;
+ default:
+ break;
+ }
+ return cycleEnum;
+ }
+
+ public DependentParameters getDependentParameters() {
+ return JSONUtils.parseObject(getDependence(),
DependentParameters.class);
+ }
+
+ public String getDependence() {
+ return JSONUtils.getNodeString(this.taskParams, Constants.DEPENDENCE);
+ }
+
+ public String getProcessDefinitionName() {
+ return this.processDefinitionName;
+ }
+
+ public void setprocessDefinitionName(String name) {
+ this.processDefinitionName = name;
+ }
+
+ public long getProcessDefinitionCode() {
+ return this.processDefinitionCode;
+ }
+
+ public void setProcessDefinitionCode(long code) {
+ this.processDefinitionCode = code;
+ }
+
+ public long getTaskDefinitionCode() {
+ return this.taskDefinitionCode;
+ }
+
+ public void setTaskDefinitionCode(long code) {
+ this.taskDefinitionCode = code;
+ }
+
+ public String getTaskParams() {
+ return this.taskParams;
+ }
+
+ public void setTaskParams(String taskParams) {
+ this.taskParams = taskParams;
+ }
+
+ public String getWorkerGroup() {
+ return this.workerGroup;
+ }
+
+ public void setWorkerGroup(String workerGroup) {
+ this.workerGroup = workerGroup;
+ }
+
+}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
index d0b2d32..2af88fc 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
@@ -86,4 +86,12 @@ public interface ScheduleMapper extends BaseMapper<Schedule>
{
* @return schedule
*/
Schedule queryByProcessDefinitionCode(@Param("processDefinitionCode") long
processDefinitionCode);
+
+ /**
+ * query worker group list by process definition code
+ *
+ * @param processDefinitionCodeList processDefinitionCodeList
+ * @return schedule
+ */
+ List<Schedule>
querySchedulesByProcessDefinitionCodes(@Param("processDefinitionCodeList")
List<Long> processDefinitionCodeList);
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
index 249e42a..314f542 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao.mapper;
+import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
@@ -69,4 +70,12 @@ public interface WorkFlowLineageMapper {
*/
List<ProcessLineage> queryProcessLineageByCode(@Param("projectCode") long
projectCode,
@Param("processDefinitionCode") long processDefinitionCode);
+
+
+ /**
+ * query process definition by name
+ *
+ * @return dependent process definition
+ */
+ List<DependentProcessDefinition>
queryDependentProcessDefinitionByProcessDefinitionCode(@Param("code") long
code);
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml
index 85a26dd..b22ac9f 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml
@@ -70,6 +70,17 @@
from t_ds_schedules
where process_definition_code = #{processDefinitionCode}
</select>
+
+ <select id="querySchedulesByProcessDefinitionCodes"
resultType="org.apache.dolphinscheduler.dao.entity.Schedule">
+ select
+ <include refid="baseSql"/>
+ from t_ds_schedules
+ where process_definition_code in
+ <foreach collection="processDefinitionCodeList" item="code"
index="index" open="(" close=")" separator=",">
+ #{code}
+ </foreach>
+ </select>
+
<select id="queryReleaseSchedulerListByProcessDefinitionCode"
resultType="org.apache.dolphinscheduler.dao.entity.Schedule">
select
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
index 2d94b04..eed6476 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
@@ -88,4 +88,21 @@
where project_code = #{projectCode}
and process_definition_code = #{processDefinitionCode}
</select>
+
+ <select id="queryDependentProcessDefinitionByProcessDefinitionCode"
resultType="DependentProcessDefinition">
+ SELECT
+ c.code AS process_definition_code
+ ,c.name AS process_definition_name
+ ,a.code AS task_definition_code
+ ,a.task_params
+ FROM
+ t_ds_task_definition a
+ JOIN t_ds_process_task_relation b ON a.code = b.pre_task_code and
a.version = b.pre_task_version
+ JOIN t_ds_process_definition c ON c.code = b.process_definition_code
AND c.version = b.process_definition_version AND c.project_code = b.project_code
+ WHERE 1=1
+ <if test="code != null and code != ''">
+ AND a.task_params LIKE concat('%', #{code}, '%')
+ </if>
+ AND a.task_type = 'DEPENDENT'
+ </select>
</mapper>
diff --git
a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
index cea26c7..9fa1366 100644
---
a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
+++
b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
@@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.api.service.TenantService;
import org.apache.dolphinscheduler.api.service.UsersService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
@@ -94,6 +95,7 @@ public class PythonGatewayServer extends
SpringBootServletInitializer {
private static final TaskDependType DEFAULT_TASK_DEPEND_TYPE =
TaskDependType.TASK_POST;
private static final RunMode DEFAULT_RUN_MODE = RunMode.RUN_MODE_SERIAL;
private static final int DEFAULT_DRY_RUN = 0;
+ private static final ComplementDependentMode COMPLEMENT_DEPENDENT_MODE =
ComplementDependentMode.OFF_MODE;
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
@@ -341,7 +343,8 @@ public class PythonGatewayServer extends
SpringBootServletInitializer {
timeout,
null,
null,
- DEFAULT_DRY_RUN
+ DEFAULT_DRY_RUN,
+ COMPLEMENT_DEPENDENT_MODE
);
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 43714e7..8fabaf9 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -60,6 +60,7 @@ import
org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.DataSource;
+import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.DqComparisonType;
import org.apache.dolphinscheduler.dao.entity.DqExecuteResult;
import org.apache.dolphinscheduler.dao.entity.DqRule;
@@ -114,6 +115,7 @@ import
org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
+import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.dao.utils.DqRuleUtils;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
@@ -142,6 +144,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
+import java.util.stream.Collector;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@@ -256,6 +259,9 @@ public class ProcessService {
@Autowired
private TaskGroupMapper taskGroupMapper;
+ @Autowired
+ private WorkFlowLineageMapper workFlowLineageMapper;
+
/**
* handle Command (construct ProcessInstance from Command) , wrapped in
transaction
*
@@ -1901,6 +1907,28 @@ public class ProcessService {
}
/**
+ * query Schedule by processDefinitionCode
+ *
+ * @param processDefinitionCodeList processDefinitionCodeList
+ * @see Schedule
+ */
+ public Map<Long, String>
queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList) {
+ List<Schedule> processDefinitionScheduleList =
scheduleMapper.querySchedulesByProcessDefinitionCodes(processDefinitionCodeList);
+ return
processDefinitionScheduleList.stream().collect(Collectors.toMap(Schedule::getProcessDefinitionCode,
+ Schedule::getWorkerGroup));
+ }
+
+ /**
+ * query dependent process definition by process definition code
+ *
+ * @param processDefinitionCode processDefinitionCode
+ * @see DependentProcessDefinition
+ */
+ public List<DependentProcessDefinition>
queryDependentProcessDefinitionByProcessDefinitionCode(long
processDefinitionCode) {
+ return
workFlowLineageMapper.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
+ }
+
+ /**
* query need failover process instance
*
* @param host host