This is an automated email from the ASF dual-hosted git repository.
chufenggao pushed a commit to branch 3.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.0.2-prepare by this push:
new e500dc5b9f [fix] Validate before deleting workflow or task used by
other tasks (#10873) (#12731)
e500dc5b9f is described below
commit e500dc5b9fede022bac5ed6bc37755b2535cffe5
Author: Eric Gao <[email protected]>
AuthorDate: Sun Nov 6 00:06:34 2022 +0800
[fix] Validate before deleting workflow or task used by other tasks
(#10873) (#12731)
Co-authored-by: Jiajie Zhong <[email protected]>
---
.../controller/ProcessDefinitionController.java | 4 +-
.../api/controller/WorkFlowLineageController.java | 41 ++++
.../apache/dolphinscheduler/api/enums/Status.java | 5 +-
.../api/service/ProcessDefinitionService.java | 4 +-
.../api/service/WorkFlowLineageService.java | 23 ++
.../service/impl/ProcessDefinitionServiceImpl.java | 84 ++++++-
.../service/impl/WorkFlowLineageServiceImpl.java | 44 ++++
.../api/service/ProcessDefinitionServiceTest.java | 57 ++++-
.../apache/dolphinscheduler/common/Constants.java | 1 +
.../dolphinscheduler/dao/entity/TaskMainInfo.java | 109 +--------
.../dolphinscheduler/dao/entity/TaskRecord.java | 257 ---------------------
.../dao/mapper/WorkFlowLineageMapper.java | 41 ++++
.../dao/mapper/WorkFlowLineageMapper.xml | 82 +++++++
13 files changed, 362 insertions(+), 390 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
index de62c740f5..2d9e4d834e 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
@@ -195,7 +195,7 @@ public class ProcessDefinitionController extends
BaseController {
}
/**
- * update process definition
+ * update process definition, with whole process definition object
including task definition, task relation and location.
*
* @param loginUser login user
* @param projectCode project code
@@ -772,7 +772,7 @@ public class ProcessDefinitionController extends
BaseController {
}
/**
- * update process definition basic info
+ * update process definition basic info, not including task definition,
task relation and location.
*
* @param loginUser login user
* @param projectCode project code
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java
index f89906c6f3..8319acc2ad 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java
@@ -18,17 +18,23 @@
package org.apache.dolphinscheduler.api.controller;
import static
org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKFLOW_LINEAGE_ERROR;
+import static
org.apache.dolphinscheduler.api.enums.Status.TASK_WITH_DEPENDENT_ERROR;
import static org.apache.dolphinscheduler.common.Constants.SESSION_USER;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ApiException;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.WorkFlowLineageService;
import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +42,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@@ -43,6 +50,8 @@ import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import springfox.documentation.annotations.ApiIgnore;
@@ -106,4 +115,36 @@ public class WorkFlowLineageController extends
BaseController {
return error(QUERY_WORKFLOW_LINEAGE_ERROR.getCode(),
QUERY_WORKFLOW_LINEAGE_ERROR.getMsg());
}
}
+
+ /**
+ * Whether task can be deleted or not, avoiding task depend on other task
of process definition delete by accident.
+ *
+ * @param loginUser login user
+ * @param projectCode project codes which taskCode belong
+ * @param processDefinitionCode project code which taskCode belong
+ * @param taskCode task definition code
+ * @return Result of task can be delete or not
+ */
+ @ApiOperation(value = "verifyTaskCanDelete", notes =
"VERIFY_TASK_CAN_DELETE")
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "projectCode", value =
"PROCESS_DEFINITION_NAME", required = true, type = "Long"),
+ @ApiImplicitParam(name = "processDefinitionCode", value =
"PROCESS_DEFINITION_CODE", required = true, type = "processDefinitionCode"),
+ @ApiImplicitParam(name = "taskCode", value = "TASK_DEFINITION_CODE",
required = true, dataType = "Long", example = "123456789"),
+ })
+ @PostMapping(value = "/tasks/verify-delete")
+ @ResponseStatus(HttpStatus.OK)
+ @ApiException(TASK_WITH_DEPENDENT_ERROR)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public Result verifyTaskCanDelete(@ApiIgnore @RequestAttribute(value =
Constants.SESSION_USER) User loginUser,
+ @ApiParam(name = "projectCode", value =
"PROJECT_CODE", required = true) @PathVariable long projectCode,
+ @RequestParam(value =
"processDefinitionCode", required = true) long processDefinitionCode,
+ @RequestParam(value = "taskCode",
required = true) long taskCode) {
+ Result result = new Result();
+ Optional<String> taskDepMsg =
workFlowLineageService.taskDepOnTaskMsg(projectCode, processDefinitionCode,
taskCode);
+ if (taskDepMsg.isPresent()) {
+ throw new ServiceException(taskDepMsg.get());
+ }
+ putMsg(result, Status.SUCCESS);
+ return result;
+ }
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index a73c0f4614..c83e278340 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -193,7 +193,7 @@ public enum Status {
BATCH_MOVE_PROCESS_DEFINITION_ERROR(10160, "batch move process definition
error", "移动工作流错误"),
QUERY_WORKFLOW_LINEAGE_ERROR(10161, "query workflow lineage error",
"查询血缘失败"),
QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_ERROR(10162, "query authorized
and user created project error error", "查询授权的和用户创建的项目错误"),
- DELETE_PROCESS_DEFINITION_BY_CODE_FAIL(10163, "delete process definition
by code fail, for there are {0} process instances in executing using it",
"删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"),
+ DELETE_PROCESS_DEFINITION_EXECUTING_FAIL(10163, "delete process definition
by code fail, for there are {0} process instances in executing using it",
"删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"),
CHECK_OS_TENANT_CODE_ERROR(10164, "Tenant code invalid, should follow
linux's users naming conventions", "非法的租户名,需要遵守 Linux 用户命名规范"),
FORCE_TASK_SUCCESS_ERROR(10165, "force task success error", "强制成功任务实例错误"),
TASK_INSTANCE_STATE_OPERATION_ERROR(10166, "the status of task instance
{0} is {1},Cannot perform force success operation",
"任务实例[{0}]的状态是[{1}],无法执行强制成功操作"),
@@ -217,6 +217,9 @@ public enum Status {
PROJECT_NOT_EXIST(10190, "This project was not found. Please refresh
page.", "该项目不存在,请刷新页面"),
TASK_INSTANCE_HOST_IS_NULL(10191, "task instance host is null",
"任务实例host为空"),
QUERY_EXECUTING_WORKFLOW_ERROR(10192, "query executing workflow error",
"查询运行的工作流实例错误"),
+ DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL(10193, "delete process
definition fail, cause used by other tasks: {0}", "删除工作流定时失败,被其他任务引用:{0}"),
+ DELETE_TASK_USE_BY_OTHER_FAIL(10194, "delete task {0} fail, cause used by
other tasks: {1}", "删除任务 {0} 失败,被其他任务引用:{1}"),
+ TASK_WITH_DEPENDENT_ERROR(10195, "task used in other tasks", "删除被其他任务引用"),
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
index 4bcdfe22c6..dcd94f7a23 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
@@ -153,7 +153,7 @@ public interface ProcessDefinitionService {
long targetProjectCode);
/**
- * update process definition
+ * update process definition, with whole process definition object
including task definition, task relation and location.
*
* @param loginUser login user
* @param projectCode project code
@@ -393,7 +393,7 @@ public interface ProcessDefinitionService {
ProcessExecutionTypeEnum
executionType);
/**
- * update process definition basic info
+ * update process definition basic info, not including task definition,
task relation and location.
*
* @param loginUser login user
* @param projectCode project code
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java
index f4c32b68f6..04902a4d0d 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java
@@ -17,7 +17,11 @@
package org.apache.dolphinscheduler.api.service;
+import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
+
import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
/**
* work flow lineage service
@@ -29,4 +33,23 @@ public interface WorkFlowLineageService {
Map<String, Object> queryWorkFlowLineageByCode(long projectCode, long
workFlowCode);
Map<String, Object> queryWorkFlowLineage(long projectCode);
+
+ /**
+ * Query tasks depend on process definition, include upstream or downstream
+ *
+ * @param projectCode Project code want to query tasks dependence
+ * @param processDefinitionCode Process definition code want to query
tasks dependence
+ * @return Set of TaskMainInfo
+ */
+ Set<TaskMainInfo> queryTaskDepOnProcess(long projectCode, long
processDefinitionCode);
+
+ /**
+ * Query and return tasks dependence with string format, is a wrapper of
queryTaskDepOnTask and task query method.
+ *
+ * @param projectCode Project code want to query tasks dependence
+ * @param processDefinitionCode Process definition code want to query
tasks dependence
+ * @param taskCode Task code want to query tasks dependence
+ * @return dependent process definition
+ */
+ Optional<String> taskDepOnTaskMsg(long projectCode, long
processDefinitionCode, long taskCode);
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 003ec7f5e1..06736d3be4 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -34,6 +34,7 @@ import
org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.SchedulerService;
+import org.apache.dolphinscheduler.api.service.WorkFlowLineageService;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.FileUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@@ -70,6 +71,7 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
@@ -109,6 +111,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -194,6 +198,9 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
@Autowired
private TaskPluginManager taskPluginManager;
+ @Autowired
+ private WorkFlowLineageService workFlowLineageService;
+
/**
* create process definition
*
@@ -600,7 +607,32 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return updateDagDefine(loginUser, taskRelationList, processDefinition,
processDefinitionDeepCopy, taskDefinitionLogs);
}
- private Map<String, Object> updateDagDefine(User loginUser,
+ /**
+ * Task want to delete whether used in other task, should throw exception
when have be used.
+ *
+ * This function avoid delete task already dependencies by other tasks by
accident.
+ *
+ * @param processDefinition ProcessDefinition you change task definition
and task relation
+ * @param taskRelationList All the latest task relation list from process
definition
+ */
+ private void taskUsedInOtherTaskValid(ProcessDefinition processDefinition,
List<ProcessTaskRelationLog> taskRelationList) {
+ List<ProcessTaskRelation> oldProcessTaskRelationList =
processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(),
processDefinition.getCode());
+ Set<ProcessTaskRelationLog> oldProcessTaskRelationSet =
oldProcessTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toSet());
+ StringBuilder sb = new StringBuilder();
+ for (ProcessTaskRelationLog oldProcessTaskRelation:
oldProcessTaskRelationSet) {
+ boolean oldTaskExists =
taskRelationList.stream().anyMatch(relation ->
oldProcessTaskRelation.getPostTaskCode() == relation.getPostTaskCode());
+ if (!oldTaskExists) {
+ Optional<String> taskDepMsg =
workFlowLineageService.taskDepOnTaskMsg(
+ processDefinition.getProjectCode(),
oldProcessTaskRelation.getProcessDefinitionCode(),
oldProcessTaskRelation.getPostTaskCode());
+ taskDepMsg.ifPresent(sb::append);
+ }
+ if (sb.length() != 0) {
+ throw new ServiceException(sb.toString());
+ }
+ }
+ }
+
+ protected Map<String, Object> updateDagDefine(User loginUser,
List<ProcessTaskRelationLog>
taskRelationList,
ProcessDefinition
processDefinition,
ProcessDefinition
processDefinitionDeepCopy,
@@ -641,6 +673,8 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new
ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
+
+ taskUsedInOtherTaskValid(processDefinition, taskRelationList);
int insertResult = processService.saveTaskRelation(loginUser,
processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion, taskRelationList,
taskDefinitionLogs, Boolean.TRUE);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
@@ -686,6 +720,35 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
+ /**
+ * Process definition want to delete whether used in other task, should
throw exception when have be used.
+ *
+ * This function avoid delete process definition already dependencies by
other tasks by accident.
+ *
+ * @param processDefinition ProcessDefinition you change task definition
and task relation
+ */
+ private void processDefinitionUsedInOtherTaskValid(ProcessDefinition
processDefinition) {
+ // check process definition is already online
+ if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
+ throw new ServiceException(Status.PROCESS_DEFINE_STATE_ONLINE,
processDefinition.getName());
+ }
+
+ // check process instances is already running
+ List<ProcessInstance> processInstances =
processInstanceService.queryByProcessDefineCodeAndStatus(processDefinition.getCode(),
Constants.NOT_TERMINATED_STATES);
+ if (CollectionUtils.isNotEmpty(processInstances)) {
+ throw new
ServiceException(Status.DELETE_PROCESS_DEFINITION_EXECUTING_FAIL,
processInstances.size());
+ }
+
+ // check process used by other task, including subprocess and
dependent task type
+ Set<TaskMainInfo> taskDepOnProcess =
workFlowLineageService.queryTaskDepOnProcess(processDefinition.getProjectCode(),
processDefinition.getCode());
+ if (CollectionUtils.isNotEmpty(taskDepOnProcess)) {
+ String taskDepDetail = taskDepOnProcess.stream()
+ .map(task -> String.format(Constants.FORMAT_S_S_COLON,
task.getProcessDefinitionName(), task.getTaskName()))
+ .collect(Collectors.joining(Constants.COMMA));
+ throw new
ServiceException(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL,
taskDepDetail);
+ }
+ }
+
/**
* delete process definition by code
*
@@ -715,17 +778,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
- // check process definition is already online
- if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
- putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE,
processDefinition.getName());
- return result;
- }
- // check process instances is already running
- List<ProcessInstance> processInstances =
processInstanceService.queryByProcessDefineCodeAndStatus(processDefinition.getCode(),
Constants.NOT_TERMINATED_STATES);
- if (CollectionUtils.isNotEmpty(processInstances)) {
- putMsg(result, Status.DELETE_PROCESS_DEFINITION_BY_CODE_FAIL,
processInstances.size());
- return result;
- }
+ processDefinitionUsedInOtherTaskValid(processDefinition);
// get the timing according to the process definition
Schedule scheduleObj =
scheduleMapper.queryByProcessDefinitionCode(code);
@@ -928,6 +981,13 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
@Transactional
public Map<String, Object> importSqlProcessDefinition(User loginUser, long
projectCode, MultipartFile file) {
Map<String, Object> result = new HashMap<>();
+
+ Project project = projectMapper.queryByCode(projectCode);
+ result = projectService.checkProjectAndAuth(loginUser, project,
projectCode);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
+ }
+
String processDefinitionName = file.getOriginalFilename() == null ?
file.getName() : file.getOriginalFilename();
int index = processDefinitionName.lastIndexOf(".");
if (index > 0) {
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
index 3b9d38ec02..836ac3be29 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
@@ -28,22 +28,26 @@ import
org.apache.dolphinscheduler.dao.entity.ProcessLineage;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -66,6 +70,9 @@ public class WorkFlowLineageServiceImpl extends
BaseServiceImpl implements WorkF
@Autowired
private TaskDefinitionLogMapper taskDefinitionLogMapper;
+ @Autowired
+ private TaskDefinitionMapper taskDefinitionMapper;
+
@Override
public Map<String, Object> queryWorkFlowLineageByName(long projectCode,
String workFlowName) {
Map<String, Object> result = new HashMap<>();
@@ -223,4 +230,41 @@ public class WorkFlowLineageServiceImpl extends
BaseServiceImpl implements WorkF
}
return sourceWorkFlowCodes;
}
+
+ /**
+ * Query and return tasks dependence with string format, is a wrapper of
queryTaskDepOnTask and task query method.
+ *
+ * @param projectCode Project code want to query tasks dependence
+ * @param processDefinitionCode Process definition code want to query
tasks dependence
+ * @param taskCode Task code want to query tasks dependence
+ * @return Optional of formatter message
+ */
+ @Override
+ public Optional<String> taskDepOnTaskMsg(long projectCode, long
processDefinitionCode, long taskCode) {
+ List<TaskMainInfo> tasksDep =
workFlowLineageMapper.queryTaskDepOnTask(projectCode, processDefinitionCode,
taskCode);
+ if (CollectionUtils.isEmpty(tasksDep)) {
+ return Optional.empty();
+ }
+
+ String taskDepStr = tasksDep.stream().map(task ->
String.format(Constants.FORMAT_S_S_COLON, task.getProcessDefinitionName(),
task.getTaskName())).collect(Collectors.joining(Constants.COMMA));
+ TaskDefinition taskDefinition =
taskDefinitionMapper.queryByCode(taskCode);
+ return
Optional.of(MessageFormat.format(Status.DELETE_TASK_USE_BY_OTHER_FAIL.getMsg(),
taskDefinition.getName(), taskDepStr));
+ }
+
+ /**
+ * Query tasks depend on process definition, include upstream or downstream
+ *
+ * @param projectCode Project code want to query tasks dependence
+ * @param processDefinitionCode Process definition code want to query
tasks dependence
+ * @return Set of TaskMainInfo
+ */
+ @Override
+ public Set<TaskMainInfo> queryTaskDepOnProcess(long projectCode, long
processDefinitionCode) {
+ Set<TaskMainInfo> taskMainInfos = new HashSet<>();
+ List<TaskMainInfo> taskDependents =
workFlowLineageMapper.queryTaskDependentDepOnProcess(projectCode,
processDefinitionCode);
+ List<TaskMainInfo> taskSubProcess =
workFlowLineageMapper.queryTaskSubProcessDepOnProcess(projectCode,
processDefinitionCode);
+ taskMainInfos.addAll(taskDependents);
+ taskMainInfos.addAll(taskSubProcess);
+ return taskMainInfos;
+ }
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 96cfee49e0..44d65f731c 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.api.service;
import static org.powermock.api.mockito.PowerMockito.mock;
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import
org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
@@ -40,6 +41,7 @@ import
org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
@@ -47,7 +49,6 @@ import
org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
-import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -59,6 +60,7 @@ import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -73,6 +75,7 @@ import javax.servlet.http.HttpServletResponse;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
@@ -82,6 +85,7 @@ import org.springframework.mock.web.MockMultipartFile;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
/**
@@ -94,13 +98,13 @@ public class ProcessDefinitionServiceTest {
+
"\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
+
"\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]";
- private static final String taskDefinitionJson =
"[{\"code\":123456789,\"name\":\"test1\",\"version\":1,\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\","
+
-
"\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo
1\",\"dependence\":{},\"conditionResult\":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{},"
+
-
"\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\","
+
-
"\"timeoutNotifyStrategy\":null,\"timeout\":0,\"environmentCode\":-1},{\"code\":123451234,\"name\":\"test2\",\"version\":1,\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\","
+
-
"\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo
2\",\"dependence\":{},\"conditionResult\":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{},"
+
-
"\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\","
+
-
"\"timeoutNotifyStrategy\":\"WARN\",\"timeout\":0,\"environmentCode\":-1}]";
+ private static final String taskDefinitionJson =
"[{\"code\":123456789,\"name\":\"test1\",\"version\":1,\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\","
+ +
"\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo
1\",\"dependence\":{},\"conditionResult\":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{},"
+ +
"\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\","
+ +
"\"timeoutNotifyStrategy\":null,\"timeout\":0,\"environmentCode\":-1},{\"code\":123451234,\"name\":\"test2\",\"version\":1,\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\","
+ +
"\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo
2\",\"dependence\":{},\"conditionResult\":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{},"
+ +
"\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\","
+ +
"\"timeoutNotifyStrategy\":\"WARN\",\"timeout\":0,\"environmentCode\":-1}]";
@InjectMocks
private ProcessDefinitionServiceImpl processDefinitionService;
@@ -126,14 +130,15 @@ public class ProcessDefinitionServiceTest {
@Mock
private ProcessInstanceService processInstanceService;
- @Mock
- private TaskInstanceMapper taskInstanceMapper;
@Mock
private TenantMapper tenantMapper;
@Mock
private DataSourceMapper dataSourceMapper;
+ @Mock
+ private WorkFlowLineageService workFlowLineageService;
+
@Test
public void testQueryProcessDefinitionList() {
long projectCode = 1L;
@@ -389,8 +394,9 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectService.checkProjectAndAuth(loginUser, project,
projectCode)).thenReturn(result);
processDefinition.setReleaseState(ReleaseState.ONLINE);
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
- Map<String, Object> dfOnlineRes =
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode,
46L);
- Assert.assertEquals(Status.PROCESS_DEFINE_STATE_ONLINE,
dfOnlineRes.get(Constants.STATUS));
+ Throwable exception = Assertions.assertThrows(ServiceException.class,
() -> processDefinitionService.deleteProcessDefinitionByCode(loginUser,
projectCode, 46L));
+ String formatter =
MessageFormat.format(Status.PROCESS_DEFINE_STATE_ONLINE.getMsg(),
processDefinition.getName());
+ Assertions.assertEquals(formatter, exception.getMessage());
//scheduler list elements > 1
processDefinition.setReleaseState(ReleaseState.OFFLINE);
@@ -401,6 +407,7 @@ public class ProcessDefinitionServiceTest {
Mockito.when(scheduleMapper.deleteById(46)).thenReturn(1);
Mockito.when(processDefineMapper.deleteById(processDefinition.getId())).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(),
processDefinition.getCode())).thenReturn(1);
+
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(),
processDefinition.getCode())).thenReturn(Collections.emptySet());
Map<String, Object> schedulerGreaterThanOneRes =
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode,
46L);
Assert.assertEquals(Status.SUCCESS,
schedulerGreaterThanOneRes.get(Constants.STATUS));
@@ -410,15 +417,26 @@ public class ProcessDefinitionServiceTest {
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project,
projectCode)).thenReturn(result);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedule);
+
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(),
processDefinition.getCode())).thenReturn(Collections.emptySet());
Map<String, Object> schedulerOnlineRes =
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode,
46L);
Assert.assertEquals(Status.SCHEDULE_CRON_STATE_ONLINE,
schedulerOnlineRes.get(Constants.STATUS));
+ //process used by other task, sub process
+ loginUser.setUserType(UserType.ADMIN_USER);
+ putMsg(result, Status.SUCCESS, projectCode);
+ TaskMainInfo taskMainInfo = getTaskMainInfo().get(0);
+
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(),
processDefinition.getCode())).thenReturn(ImmutableSet.copyOf(getTaskMainInfo()));
+ exception = Assertions.assertThrows(ServiceException.class, () ->
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode,
46L));
+ formatter =
MessageFormat.format(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL.getMsg(),
String.format("%s:%s", taskMainInfo.getProcessDefinitionName(),
taskMainInfo.getTaskName()));
+ Assertions.assertEquals(formatter, exception.getMessage());
+
//delete success
schedule.setReleaseState(ReleaseState.OFFLINE);
Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1);
Mockito.when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(),
processDefinition.getCode())).thenReturn(1);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule());
+
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(),
processDefinition.getCode())).thenReturn(Collections.emptySet());
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project,
projectCode)).thenReturn(result);
Map<String, Object> deleteSuccess =
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode,
46L);
@@ -801,4 +819,19 @@ public class ProcessDefinitionServiceTest {
result.put(Constants.MSG, status.getMsg());
}
}
+
+ /**
+ * get mock task main info
+ *
+ * @return schedule
+ */
+ private List<TaskMainInfo> getTaskMainInfo() {
+ List<TaskMainInfo> taskMainInfos = new ArrayList<>();
+ TaskMainInfo taskMainInfo = new TaskMainInfo();
+ taskMainInfo.setId(1);
+ taskMainInfo.setProcessDefinitionName("process");
+ taskMainInfo.setTaskName("task");
+ taskMainInfos.add(taskMainInfo);
+ return taskMainInfos;
+ }
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index fcd6147421..26e611fe88 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -53,6 +53,7 @@ public final class Constants {
public static final String FORMAT_SS = "%s%s";
public static final String FORMAT_S_S = "%s/%s";
+ public static final String FORMAT_S_S_COLON = "%s:%s";
public static final String FOLDER_SEPARATOR = "/";
public static final String RESOURCE_TYPE_FILE = "resources";
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java
index 6fefd47990..7b5492b75b 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java
@@ -22,11 +22,16 @@ import
org.apache.dolphinscheduler.common.enums.ReleaseState;
import java.util.Date;
import java.util.Map;
+import lombok.Data;
+
/**
* task main info
*/
+@Data
public class TaskMainInfo {
+ private long id;
+
/**
* task name
*/
@@ -91,108 +96,4 @@ public class TaskMainInfo {
* upstreamTaskName
*/
private String upstreamTaskName;
-
- public String getTaskName() {
- return taskName;
- }
-
- public void setTaskName(String taskName) {
- this.taskName = taskName;
- }
-
- public long getTaskCode() {
- return taskCode;
- }
-
- public void setTaskCode(long taskCode) {
- this.taskCode = taskCode;
- }
-
- public int getTaskVersion() {
- return taskVersion;
- }
-
- public void setTaskVersion(int taskVersion) {
- this.taskVersion = taskVersion;
- }
-
- public String getTaskType() {
- return taskType;
- }
-
- public void setTaskType(String taskType) {
- this.taskType = taskType;
- }
-
- public Date getTaskCreateTime() {
- return taskCreateTime;
- }
-
- public void setTaskCreateTime(Date taskCreateTime) {
- this.taskCreateTime = taskCreateTime;
- }
-
- public Date getTaskUpdateTime() {
- return taskUpdateTime;
- }
-
- public void setTaskUpdateTime(Date taskUpdateTime) {
- this.taskUpdateTime = taskUpdateTime;
- }
-
- public long getProcessDefinitionCode() {
- return processDefinitionCode;
- }
-
- public void setProcessDefinitionCode(long processDefinitionCode) {
- this.processDefinitionCode = processDefinitionCode;
- }
-
- public int getProcessDefinitionVersion() {
- return processDefinitionVersion;
- }
-
- public void setProcessDefinitionVersion(int processDefinitionVersion) {
- this.processDefinitionVersion = processDefinitionVersion;
- }
-
- public String getProcessDefinitionName() {
- return processDefinitionName;
- }
-
- public void setProcessDefinitionName(String processDefinitionName) {
- this.processDefinitionName = processDefinitionName;
- }
-
- public ReleaseState getProcessReleaseState() {
- return processReleaseState;
- }
-
- public void setProcessReleaseState(ReleaseState processReleaseState) {
- this.processReleaseState = processReleaseState;
- }
-
- public Map<Long, String> getUpstreamTaskMap() {
- return upstreamTaskMap;
- }
-
- public void setUpstreamTaskMap(Map<Long, String> upstreamTaskMap) {
- this.upstreamTaskMap = upstreamTaskMap;
- }
-
- public long getUpstreamTaskCode() {
- return upstreamTaskCode;
- }
-
- public void setUpstreamTaskCode(long upstreamTaskCode) {
- this.upstreamTaskCode = upstreamTaskCode;
- }
-
- public String getUpstreamTaskName() {
- return upstreamTaskName;
- }
-
- public void setUpstreamTaskName(String upstreamTaskName) {
- this.upstreamTaskName = upstreamTaskName;
- }
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskRecord.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskRecord.java
deleted file mode 100644
index 5cfa095cec..0000000000
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskRecord.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.dao.entity;
-
-import java.util.Date;
-
-/**
- * task record for qianfan
- */
-public class TaskRecord {
-
- /**
- * id
- */
- private int id;
-
- /**
- * process id
- */
- private int procId;
-
- /**
- * procedure name
- */
- private String procName;
-
- /**
- * procedure date
- */
- private String procDate;
-
- /**
- * start date
- */
- private Date startTime;
-
- /**
- * end date
- */
- private Date endTime;
-
- /**
- * result
- */
- private String result;
-
- /**
- * duration unit: second
- */
- private int duration;
-
- /**
- * note
- */
- private String note;
-
- /**
- * schema
- */
- private String schema;
-
- /**
- * job id
- */
- private String jobId;
-
-
- /**
- * source tab
- */
- private String sourceTab;
-
- /**
- * source row count
- */
- private Long sourceRowCount;
-
- /**
- * target tab
- */
- private String targetTab;
-
- /**
- * target row count
- */
- private Long targetRowCount;
-
- /**
- * error code
- */
- private String errorCode;
-
- public int getId() {
- return id;
- }
-
- public void setId(int id) {
- this.id = id;
- }
-
- public int getProcId() {
- return procId;
- }
-
- public void setProcId(int procId) {
- this.procId = procId;
- }
-
- public String getProcName() {
- return procName;
- }
-
- public void setProcName(String procName) {
- this.procName = procName;
- }
-
- public String getProcDate() {
- return procDate;
- }
-
- public void setProcDate(String procDate) {
- this.procDate = procDate;
- }
-
- public Date getStartTime() {
- return startTime;
- }
-
- public void setStartTime(Date startTime) {
- this.startTime = startTime;
- }
-
- public Date getEndTime() {
- return endTime;
- }
-
- public void setEndTime(Date endTime) {
- this.endTime = endTime;
- }
-
- public String getResult() {
- return result;
- }
-
- public void setResult(String result) {
- this.result = result;
- }
-
- public int getDuration() {
- return duration;
- }
-
- public void setDuration(int duration) {
- this.duration = duration;
- }
-
- public String getNote() {
- return note;
- }
-
- public void setNote(String note) {
- this.note = note;
- }
-
- public String getSchema() {
- return schema;
- }
-
- public void setSchema(String schema) {
- this.schema = schema;
- }
-
- public String getJobId() {
- return jobId;
- }
-
- public void setJobId(String jobId) {
- this.jobId = jobId;
- }
-
- public String getSourceTab() {
- return sourceTab;
- }
-
- public void setSourceTab(String sourceTab) {
- this.sourceTab = sourceTab;
- }
-
- public Long getSourceRowCount() {
- return sourceRowCount;
- }
-
- public void setSourceRowCount(Long sourceRowCount) {
- this.sourceRowCount = sourceRowCount;
- }
-
- public String getTargetTab() {
- return targetTab;
- }
-
- public void setTargetTab(String targetTab) {
- this.targetTab = targetTab;
- }
-
- public Long getTargetRowCount() {
- return targetRowCount;
- }
-
- public void setTargetRowCount(Long targetRowCount) {
- this.targetRowCount = targetRowCount;
- }
-
- public String getErrorCode() {
- return errorCode;
- }
-
- public void setErrorCode(String errorCode) {
- this.errorCode = errorCode;
- }
-
- @Override
- public String toString() {
- return "task record, id:" + id
- + " proc id:" + procId
- + " proc name:" + procName
- + " proc date: " + procDate
- + " start date:" + startTime
- + " end date:" + endTime
- + " result : " + result
- + " duration : " + duration
- + " note : " + note
- + " schema : " + schema
- + " job id : " + jobId
- + " source table : " + sourceTab
- + " source row count: " + sourceRowCount
- + " target table : " + targetTab
- + " target row count: " + targetRowCount
- + " error code: " + errorCode
- ;
- }
-
-}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
index dfdcafa640..951f12a14a 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
+import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.ibatis.annotations.Param;
@@ -103,4 +104,44 @@ public interface WorkFlowLineageMapper {
List<DependentProcessDefinition>
queryUpstreamDependentParamsByProcessDefinitionCode(@Param("code") long code,
@Param("taskType") String taskType);
+ /**
+ * Query all tasks type sub process depend on process definition.
+ *
+ * Query all upstream tasks from task type sub process.
+ *
+ * @param projectCode Project code want to query tasks dependence
+ * @param processDefinitionCode Process definition code want to query
tasks dependence
+ * @return List of TaskMainInfo
+ */
+ List<TaskMainInfo> queryTaskSubProcessDepOnProcess(@Param("projectCode")
long projectCode,
+
@Param("processDefinitionCode") long processDefinitionCode);
+
+ /**
+ * Query all tasks type dependent depend on process definition.
+ *
+ * Query all downstream tasks from task type dependent, method
`queryTaskDepOnTask` is a proper subset of
+ * current method `queryTaskDepOnProcess`. Which mean with the same
parameter processDefinitionCode, all tasks in
+ * `queryTaskDepOnTask` are in the result of method
`queryTaskDepOnProcess`.
+ *
+ * @param projectCode Project code want to query tasks dependence
+ * @param processDefinitionCode Process definition code want to query
tasks dependence
+ * @return List of TaskMainInfo
+ */
+ List<TaskMainInfo> queryTaskDependentDepOnProcess(@Param("projectCode")
long projectCode,
+
@Param("processDefinitionCode") long processDefinitionCode);
+
+ /**
+ * Query all tasks depend on task, only downstream task support
currently(from dependent task type).
+ *
+ * In case of dependent task type, method `queryTaskDepOnTask` is a proper
subset of `queryTaskDepOnProcess`. Which
+ * mean with the same processDefinitionCode, all tasks in
`queryTaskDepOnTask` are in method `queryTaskDepOnProcess`.
+ *
+ * @param projectCode Project code want to query tasks dependence
+ * @param processDefinitionCode Process definition code want to query
tasks dependence
+ * @param taskCode Task code want to query tasks dependence
+ * @return dependent process definition
+ */
+ List<TaskMainInfo> queryTaskDepOnTask(@Param("projectCode") long
projectCode,
+ @Param("processDefinitionCode") long
processDefinitionCode,
+ @Param("taskCode") long taskCode);
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
index 1815cfcd78..2689b6d50f 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
@@ -162,4 +162,86 @@
</if>
AND a.task_type = 'DEPENDENT'
</select>
+
+ <select id="queryTaskSubProcessDepOnProcess"
resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
+ select td.id
+ , td.name as taskName
+ , td.code as taskCode
+ , td.version as taskVersion
+ , td.task_type as taskType
+ , ptr.process_definition_code as processDefinitionCode
+ , pd.name as processDefinitionName
+ , pd.version as processDefinitionVersion
+ , pd.release_state as processReleaseState
+ from t_ds_task_definition td
+ join t_ds_process_task_relation ptr on ptr.post_task_code = td.code
and td.version = ptr.post_task_version
+ join t_ds_process_definition pd on pd.code =
ptr.process_definition_code and pd.version = ptr.process_definition_version
+ <where>
+ <if test="projectCode != 0">
+ and ptr.project_code = #{projectCode}
+ </if>
+ <!-- ptr.process_definition_code != #{processDefinitionCode} query
task not in current workflow -->
+ <!-- For subprocess task type, using
`concat('%"processDefinitionCode":', #{processDefinitionCode}, '%')` -->
+ <if test="processDefinitionCode != 0">
+ and td.task_type = 'SUB_PROCESS'
+ and ptr.process_definition_code != #{processDefinitionCode}
+ and td.task_params like concat('%"processDefinitionCode":',
#{processDefinitionCode}, '%')
+ </if>
+ </where>
+ </select>
+
+ <select id="queryTaskDependentDepOnProcess"
resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
+ select td.id
+ , td.name as taskName
+ , td.code as taskCode
+ , td.version as taskVersion
+ , td.task_type as taskType
+ , ptr.process_definition_code as processDefinitionCode
+ , pd.name as processDefinitionName
+ , pd.version as processDefinitionVersion
+ , pd.release_state as processReleaseState
+ from t_ds_task_definition td
+ join t_ds_process_task_relation ptr on ptr.post_task_code = td.code
and td.version = ptr.post_task_version
+ join t_ds_process_definition pd on pd.code =
ptr.process_definition_code and pd.version = ptr.process_definition_version
+ <where>
+ <if test="projectCode != 0">
+ and ptr.project_code = #{projectCode}
+ </if>
+ <!-- ptr.process_definition_code != #{processDefinitionCode} query
task not in current workflow -->
+ <!-- For dependnet task type, using `like
concat('%"definitionCode":', #{processDefinitionCode}, '%')` -->
+ <if test="processDefinitionCode != 0">
+ and td.task_type = 'DEPENDENT'
+ and ptr.process_definition_code != #{processDefinitionCode}
+ and td.task_params like concat('%"definitionCode":',
#{processDefinitionCode}, '%')
+ </if>
+ </where>
+ </select>
+
+ <select id="queryTaskDepOnTask"
resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
+ select td.id
+ , td.name as taskName
+ , td.code as taskCode
+ , td.version as taskVersion
+ , td.task_type as taskType
+ , ptr.process_definition_code as processDefinitionCode
+ , pd.name as processDefinitionName
+ , pd.version as processDefinitionVersion
+ , pd.release_state as processReleaseState
+ from t_ds_task_definition td
+ join t_ds_process_task_relation ptr on ptr.post_task_code = td.code
and td.version = ptr.post_task_version
+ join t_ds_process_definition pd on pd.code =
ptr.process_definition_code and pd.version = ptr.process_definition_version
+ <where>
+ <if test="projectCode != 0">
+ and ptr.project_code = #{projectCode}
+ </if>
+ <!-- ptr.process_definition_code != #{processDefinitionCode} query
task not in current workflow -->
+ <if test="processDefinitionCode != 0">
+ and ptr.process_definition_code != #{processDefinitionCode}
+ and td.task_params like concat('%"definitionCode":',
#{processDefinitionCode}, '%')
+ </if>
+ <if test="taskCode != 0">
+ and td.task_params like concat('%"depTaskCode":', #{taskCode},
'%')
+ </if>
+ </where>
+ </select>
</mapper>