This is an automated email from the ASF dual-hosted git repository.

chufenggao pushed a commit to branch 3.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.0.2-prepare by this push:
     new e500dc5b9f [fix] Validate before deleting workflow or task used by 
other tasks (#10873) (#12731)
e500dc5b9f is described below

commit e500dc5b9fede022bac5ed6bc37755b2535cffe5
Author: Eric Gao <[email protected]>
AuthorDate: Sun Nov 6 00:06:34 2022 +0800

    [fix] Validate before deleting workflow or task used by other tasks 
(#10873) (#12731)
    
    Co-authored-by: Jiajie Zhong <[email protected]>
---
 .../controller/ProcessDefinitionController.java    |   4 +-
 .../api/controller/WorkFlowLineageController.java  |  41 ++++
 .../apache/dolphinscheduler/api/enums/Status.java  |   5 +-
 .../api/service/ProcessDefinitionService.java      |   4 +-
 .../api/service/WorkFlowLineageService.java        |  23 ++
 .../service/impl/ProcessDefinitionServiceImpl.java |  84 ++++++-
 .../service/impl/WorkFlowLineageServiceImpl.java   |  44 ++++
 .../api/service/ProcessDefinitionServiceTest.java  |  57 ++++-
 .../apache/dolphinscheduler/common/Constants.java  |   1 +
 .../dolphinscheduler/dao/entity/TaskMainInfo.java  | 109 +--------
 .../dolphinscheduler/dao/entity/TaskRecord.java    | 257 ---------------------
 .../dao/mapper/WorkFlowLineageMapper.java          |  41 ++++
 .../dao/mapper/WorkFlowLineageMapper.xml           |  82 +++++++
 13 files changed, 362 insertions(+), 390 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
index de62c740f5..2d9e4d834e 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
@@ -195,7 +195,7 @@ public class ProcessDefinitionController extends 
BaseController {
     }
 
     /**
-     * update process definition
+     * update process definition, with whole process definition object 
including task definition, task relation and location.
      *
      * @param loginUser login user
      * @param projectCode project code
@@ -772,7 +772,7 @@ public class ProcessDefinitionController extends 
BaseController {
     }
 
     /**
-     * update process definition basic info
+     * update process definition basic info, not including task definition, 
task relation and location.
      *
      * @param loginUser login user
      * @param projectCode project code
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java
index f89906c6f3..8319acc2ad 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java
@@ -18,17 +18,23 @@
 package org.apache.dolphinscheduler.api.controller;
 
 import static 
org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKFLOW_LINEAGE_ERROR;
+import static 
org.apache.dolphinscheduler.api.enums.Status.TASK_WITH_DEPENDENT_ERROR;
 import static org.apache.dolphinscheduler.common.Constants.SESSION_USER;
 
 import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ApiException;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.api.service.WorkFlowLineageService;
 import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,6 +42,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestAttribute;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestParam;
@@ -43,6 +50,8 @@ import org.springframework.web.bind.annotation.ResponseStatus;
 import org.springframework.web.bind.annotation.RestController;
 
 import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import springfox.documentation.annotations.ApiIgnore;
@@ -106,4 +115,36 @@ public class WorkFlowLineageController extends 
BaseController {
             return error(QUERY_WORKFLOW_LINEAGE_ERROR.getCode(), 
QUERY_WORKFLOW_LINEAGE_ERROR.getMsg());
         }
     }
+
+    /**
+     * Whether task can be deleted or not, avoiding task depend on other task 
of process definition delete by accident.
+     *
+     * @param loginUser login user
+     * @param projectCode project codes which taskCode belong
+     * @param processDefinitionCode project code which taskCode belong
+     * @param taskCode task definition code
+     * @return Result of task can be delete or not
+     */
+    @ApiOperation(value = "verifyTaskCanDelete", notes = 
"VERIFY_TASK_CAN_DELETE")
+    @ApiImplicitParams({
+        @ApiImplicitParam(name = "projectCode", value = 
"PROCESS_DEFINITION_NAME", required = true, type = "Long"),
+        @ApiImplicitParam(name = "processDefinitionCode", value = 
"PROCESS_DEFINITION_CODE", required = true, type = "processDefinitionCode"),
+        @ApiImplicitParam(name = "taskCode", value = "TASK_DEFINITION_CODE", 
required = true, dataType = "Long", example = "123456789"),
+    })
+    @PostMapping(value = "/tasks/verify-delete")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(TASK_WITH_DEPENDENT_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result verifyTaskCanDelete(@ApiIgnore @RequestAttribute(value = 
Constants.SESSION_USER) User loginUser,
+                                      @ApiParam(name = "projectCode", value = 
"PROJECT_CODE", required = true) @PathVariable long projectCode,
+                                      @RequestParam(value = 
"processDefinitionCode", required = true) long processDefinitionCode,
+                                      @RequestParam(value = "taskCode", 
required = true) long taskCode) {
+        Result result = new Result();
+        Optional<String> taskDepMsg = 
workFlowLineageService.taskDepOnTaskMsg(projectCode, processDefinitionCode, 
taskCode);
+        if (taskDepMsg.isPresent()) {
+            throw new ServiceException(taskDepMsg.get());
+        }
+        putMsg(result, Status.SUCCESS);
+        return result;
+    }
 }
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index a73c0f4614..c83e278340 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -193,7 +193,7 @@ public enum Status {
     BATCH_MOVE_PROCESS_DEFINITION_ERROR(10160, "batch move process definition 
error", "移动工作流错误"),
     QUERY_WORKFLOW_LINEAGE_ERROR(10161, "query workflow lineage error", 
"查询血缘失败"),
     QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_ERROR(10162, "query authorized 
and user created project error error", "查询授权的和用户创建的项目错误"),
-    DELETE_PROCESS_DEFINITION_BY_CODE_FAIL(10163, "delete process definition 
by code fail, for there are {0} process instances in executing using it", 
"删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"),
+    DELETE_PROCESS_DEFINITION_EXECUTING_FAIL(10163, "delete process definition 
by code fail, for there are {0} process instances in executing using it", 
"删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"),
     CHECK_OS_TENANT_CODE_ERROR(10164, "Tenant code invalid, should follow 
linux's users naming conventions", "非法的租户名,需要遵守 Linux 用户命名规范"),
     FORCE_TASK_SUCCESS_ERROR(10165, "force task success error", "强制成功任务实例错误"),
     TASK_INSTANCE_STATE_OPERATION_ERROR(10166, "the status of task instance 
{0} is {1},Cannot perform force success operation", 
"任务实例[{0}]的状态是[{1}],无法执行强制成功操作"),
@@ -217,6 +217,9 @@ public enum Status {
     PROJECT_NOT_EXIST(10190, "This project was not found. Please refresh 
page.", "该项目不存在,请刷新页面"),
     TASK_INSTANCE_HOST_IS_NULL(10191, "task instance host is null", 
"任务实例host为空"),
     QUERY_EXECUTING_WORKFLOW_ERROR(10192, "query executing workflow error", 
"查询运行的工作流实例错误"),
+    DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL(10193, "delete process 
definition fail, cause used by other tasks: {0}", "删除工作流定时失败,被其他任务引用:{0}"),
+    DELETE_TASK_USE_BY_OTHER_FAIL(10194, "delete task {0} fail, cause used by 
other tasks: {1}", "删除任务 {0} 失败,被其他任务引用:{1}"),
+    TASK_WITH_DEPENDENT_ERROR(10195, "task used in other tasks", "删除被其他任务引用"),
 
     UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
     UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
index 4bcdfe22c6..dcd94f7a23 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
@@ -153,7 +153,7 @@ public interface ProcessDefinitionService {
                                                    long targetProjectCode);
 
     /**
-     * update  process definition
+     * update process definition, with whole process definition object 
including task definition, task relation and location.
      *
      * @param loginUser login user
      * @param projectCode project code
@@ -393,7 +393,7 @@ public interface ProcessDefinitionService {
                                                      ProcessExecutionTypeEnum 
executionType);
 
     /**
-     * update process definition basic info
+     * update process definition basic info, not including task definition, 
task relation and location.
      *
      * @param loginUser login user
      * @param projectCode project code
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java
index f4c32b68f6..04902a4d0d 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java
@@ -17,7 +17,11 @@
 
 package org.apache.dolphinscheduler.api.service;
 
+import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
+
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 
 /**
  * work flow lineage service
@@ -29,4 +33,23 @@ public interface WorkFlowLineageService {
     Map<String, Object> queryWorkFlowLineageByCode(long projectCode, long 
workFlowCode);
 
     Map<String, Object> queryWorkFlowLineage(long projectCode);
+
+    /**
+     * Query tasks depend on process definition, include upstream or downstream
+     *
+     * @param projectCode Project code want to query tasks dependence
+     * @param processDefinitionCode Process definition code want to query 
tasks dependence
+     * @return Set of TaskMainInfo
+     */
+    Set<TaskMainInfo> queryTaskDepOnProcess(long projectCode, long 
processDefinitionCode);
+
+    /**
+     * Query and return tasks dependence with string format, is a wrapper of 
queryTaskDepOnTask and task query method.
+     *
+     * @param projectCode Project code want to query tasks dependence
+     * @param processDefinitionCode Process definition code want to query 
tasks dependence
+     * @param taskCode Task code want to query tasks dependence
+     * @return dependent process definition
+     */
+    Optional<String> taskDepOnTaskMsg(long projectCode, long 
processDefinitionCode, long taskCode);
 }
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 003ec7f5e1..06736d3be4 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -34,6 +34,7 @@ import 
org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
 import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
 import org.apache.dolphinscheduler.api.service.ProjectService;
 import org.apache.dolphinscheduler.api.service.SchedulerService;
+import org.apache.dolphinscheduler.api.service.WorkFlowLineageService;
 import org.apache.dolphinscheduler.api.utils.CheckUtils;
 import org.apache.dolphinscheduler.api.utils.FileUtils;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
@@ -70,6 +71,7 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
@@ -109,6 +111,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
@@ -194,6 +198,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     @Autowired
     private TaskPluginManager taskPluginManager;
 
+    @Autowired
+    private WorkFlowLineageService workFlowLineageService;
+
     /**
      * create process definition
      *
@@ -600,7 +607,32 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         return updateDagDefine(loginUser, taskRelationList, processDefinition, 
processDefinitionDeepCopy, taskDefinitionLogs);
     }
 
-    private Map<String, Object> updateDagDefine(User loginUser,
+    /**
+     * Task want to delete whether used in other task, should throw exception 
when have be used.
+     *
+     * This function avoid delete task already dependencies by other tasks by 
accident.
+     *
+     * @param processDefinition ProcessDefinition you change task definition 
and task relation
+     * @param taskRelationList All the latest task relation list from process 
definition
+     */
+    private void taskUsedInOtherTaskValid(ProcessDefinition processDefinition, 
List<ProcessTaskRelationLog> taskRelationList) {
+        List<ProcessTaskRelation> oldProcessTaskRelationList = 
processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(),
 processDefinition.getCode());
+        Set<ProcessTaskRelationLog> oldProcessTaskRelationSet = 
oldProcessTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toSet());
+        StringBuilder sb = new StringBuilder();
+        for (ProcessTaskRelationLog oldProcessTaskRelation: 
oldProcessTaskRelationSet) {
+            boolean oldTaskExists = 
taskRelationList.stream().anyMatch(relation -> 
oldProcessTaskRelation.getPostTaskCode() == relation.getPostTaskCode());
+            if (!oldTaskExists) {
+                Optional<String> taskDepMsg = 
workFlowLineageService.taskDepOnTaskMsg(
+                        processDefinition.getProjectCode(), 
oldProcessTaskRelation.getProcessDefinitionCode(), 
oldProcessTaskRelation.getPostTaskCode());
+                taskDepMsg.ifPresent(sb::append);
+            }
+            if (sb.length() != 0) {
+                throw new ServiceException(sb.toString());
+            }
+        }
+    }
+
+    protected Map<String, Object> updateDagDefine(User loginUser,
                                                 List<ProcessTaskRelationLog> 
taskRelationList,
                                                 ProcessDefinition 
processDefinition,
                                                 ProcessDefinition 
processDefinitionDeepCopy,
@@ -641,6 +673,8 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
                 putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
                 throw new 
ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
             }
+
+            taskUsedInOtherTaskValid(processDefinition, taskRelationList);
             int insertResult = processService.saveTaskRelation(loginUser, 
processDefinition.getProjectCode(),
                 processDefinition.getCode(), insertVersion, taskRelationList, 
taskDefinitionLogs, Boolean.TRUE);
             if (insertResult == Constants.EXIT_CODE_SUCCESS) {
@@ -686,6 +720,35 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         return result;
     }
 
+    /**
+     * Process definition want to delete whether used in other task, should 
throw exception when have be used.
+     *
+     * This function avoid delete process definition already dependencies by 
other tasks by accident.
+     *
+     * @param processDefinition ProcessDefinition you change task definition 
and task relation
+     */
+    private void processDefinitionUsedInOtherTaskValid(ProcessDefinition 
processDefinition) {
+        // check process definition is already online
+        if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
+            throw new ServiceException(Status.PROCESS_DEFINE_STATE_ONLINE, 
processDefinition.getName());
+        }
+
+        // check process instances is already running
+        List<ProcessInstance> processInstances = 
processInstanceService.queryByProcessDefineCodeAndStatus(processDefinition.getCode(),
 Constants.NOT_TERMINATED_STATES);
+        if (CollectionUtils.isNotEmpty(processInstances)) {
+            throw new 
ServiceException(Status.DELETE_PROCESS_DEFINITION_EXECUTING_FAIL, 
processInstances.size());
+        }
+
+        // check process used by other task, including subprocess and 
dependent task type
+        Set<TaskMainInfo> taskDepOnProcess = 
workFlowLineageService.queryTaskDepOnProcess(processDefinition.getProjectCode(),
 processDefinition.getCode());
+        if (CollectionUtils.isNotEmpty(taskDepOnProcess)) {
+            String taskDepDetail = taskDepOnProcess.stream()
+                    .map(task -> String.format(Constants.FORMAT_S_S_COLON, 
task.getProcessDefinitionName(), task.getTaskName()))
+                    .collect(Collectors.joining(Constants.COMMA));
+            throw new 
ServiceException(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL, 
taskDepDetail);
+        }
+    }
+
     /**
      * delete process definition by code
      *
@@ -715,17 +778,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             return result;
         }
 
-        // check process definition is already online
-        if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
-            putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, 
processDefinition.getName());
-            return result;
-        }
-        // check process instances is already running
-        List<ProcessInstance> processInstances = 
processInstanceService.queryByProcessDefineCodeAndStatus(processDefinition.getCode(),
 Constants.NOT_TERMINATED_STATES);
-        if (CollectionUtils.isNotEmpty(processInstances)) {
-            putMsg(result, Status.DELETE_PROCESS_DEFINITION_BY_CODE_FAIL, 
processInstances.size());
-            return result;
-        }
+        processDefinitionUsedInOtherTaskValid(processDefinition);
 
         // get the timing according to the process definition
         Schedule scheduleObj = 
scheduleMapper.queryByProcessDefinitionCode(code);
@@ -928,6 +981,13 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     @Transactional
     public Map<String, Object> importSqlProcessDefinition(User loginUser, long 
projectCode, MultipartFile file) {
         Map<String, Object> result = new HashMap<>();
+
+        Project project = projectMapper.queryByCode(projectCode);
+        result = projectService.checkProjectAndAuth(loginUser, project, 
projectCode);
+        if (result.get(Constants.STATUS) != Status.SUCCESS) {
+            return result;
+        }
+
         String processDefinitionName = file.getOriginalFilename() == null ? 
file.getName() : file.getOriginalFilename();
         int index = processDefinitionName.lastIndexOf(".");
         if (index > 0) {
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
index 3b9d38ec02..836ac3be29 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
@@ -28,22 +28,26 @@ import 
org.apache.dolphinscheduler.dao.entity.ProcessLineage;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
 import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
 import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
 import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
 import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
+import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -66,6 +70,9 @@ public class WorkFlowLineageServiceImpl extends 
BaseServiceImpl implements WorkF
     @Autowired
     private TaskDefinitionLogMapper taskDefinitionLogMapper;
 
+    @Autowired
+    private TaskDefinitionMapper taskDefinitionMapper;
+
     @Override
     public Map<String, Object> queryWorkFlowLineageByName(long projectCode, 
String workFlowName) {
         Map<String, Object> result = new HashMap<>();
@@ -223,4 +230,41 @@ public class WorkFlowLineageServiceImpl extends 
BaseServiceImpl implements WorkF
         }
         return sourceWorkFlowCodes;
     }
+
+    /**
+     * Query and return tasks dependence with string format, is a wrapper of 
queryTaskDepOnTask and task query method.
+     *
+     * @param projectCode Project code want to query tasks dependence
+     * @param processDefinitionCode Process definition code want to query 
tasks dependence
+     * @param taskCode Task code want to query tasks dependence
+     * @return Optional of formatter message
+     */
+    @Override
+    public Optional<String> taskDepOnTaskMsg(long projectCode, long 
processDefinitionCode, long taskCode) {
+        List<TaskMainInfo> tasksDep = 
workFlowLineageMapper.queryTaskDepOnTask(projectCode, processDefinitionCode, 
taskCode);
+        if (CollectionUtils.isEmpty(tasksDep)) {
+            return Optional.empty();
+        }
+
+        String taskDepStr = tasksDep.stream().map(task -> 
String.format(Constants.FORMAT_S_S_COLON, task.getProcessDefinitionName(), 
task.getTaskName())).collect(Collectors.joining(Constants.COMMA));
+        TaskDefinition taskDefinition = 
taskDefinitionMapper.queryByCode(taskCode);
+        return 
Optional.of(MessageFormat.format(Status.DELETE_TASK_USE_BY_OTHER_FAIL.getMsg(), 
taskDefinition.getName(), taskDepStr));
+    }
+
+    /**
+     * Query tasks depend on process definition, include upstream or downstream
+     *
+     * @param projectCode Project code want to query tasks dependence
+     * @param processDefinitionCode Process definition code want to query 
tasks dependence
+     * @return Set of TaskMainInfo
+     */
+    @Override
+    public Set<TaskMainInfo> queryTaskDepOnProcess(long projectCode, long 
processDefinitionCode) {
+        Set<TaskMainInfo> taskMainInfos = new HashSet<>();
+        List<TaskMainInfo> taskDependents = 
workFlowLineageMapper.queryTaskDependentDepOnProcess(projectCode, 
processDefinitionCode);
+        List<TaskMainInfo> taskSubProcess = 
workFlowLineageMapper.queryTaskSubProcessDepOnProcess(projectCode, 
processDefinitionCode);
+        taskMainInfos.addAll(taskDependents);
+        taskMainInfos.addAll(taskSubProcess);
+        return taskMainInfos;
+    }
 }
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 96cfee49e0..44d65f731c 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.api.service;
 import static org.powermock.api.mockito.PowerMockito.mock;
 
 import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import 
org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl;
 import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
 import org.apache.dolphinscheduler.api.utils.Result;
@@ -40,6 +41,7 @@ import 
org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
@@ -47,7 +49,6 @@ import 
org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
-import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -59,6 +60,7 @@ import java.nio.charset.StandardCharsets;
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -73,6 +75,7 @@ import javax.servlet.http.HttpServletResponse;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
@@ -82,6 +85,7 @@ import org.springframework.mock.web.MockMultipartFile;
 
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 
 /**
@@ -94,13 +98,13 @@ public class ProcessDefinitionServiceTest {
             + 
"\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
             + 
"\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]";
 
-    private static final String taskDefinitionJson = 
"[{\"code\":123456789,\"name\":\"test1\",\"version\":1,\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\","
 +
-        
"\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 
1\",\"dependence\":{},\"conditionResult\":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{},"
 +
-        
"\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\","
 +
-        
"\"timeoutNotifyStrategy\":null,\"timeout\":0,\"environmentCode\":-1},{\"code\":123451234,\"name\":\"test2\",\"version\":1,\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\","
 +
-        
"\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 
2\",\"dependence\":{},\"conditionResult\":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{},"
 +
-        
"\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\","
 +
-        
"\"timeoutNotifyStrategy\":\"WARN\",\"timeout\":0,\"environmentCode\":-1}]";
+    private static final String taskDefinitionJson = 
"[{\"code\":123456789,\"name\":\"test1\",\"version\":1,\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\","
+            + 
"\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 
1\",\"dependence\":{},\"conditionResult\":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{},"
+            + 
"\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\","
+            + 
"\"timeoutNotifyStrategy\":null,\"timeout\":0,\"environmentCode\":-1},{\"code\":123451234,\"name\":\"test2\",\"version\":1,\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\","
+            + 
"\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 
2\",\"dependence\":{},\"conditionResult\":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{},"
+            + 
"\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\","
+            + 
"\"timeoutNotifyStrategy\":\"WARN\",\"timeout\":0,\"environmentCode\":-1}]";
 
     @InjectMocks
     private ProcessDefinitionServiceImpl processDefinitionService;
@@ -126,14 +130,15 @@ public class ProcessDefinitionServiceTest {
     @Mock
     private ProcessInstanceService processInstanceService;
 
-    @Mock
-    private TaskInstanceMapper taskInstanceMapper;
     @Mock
     private TenantMapper tenantMapper;
 
     @Mock
     private DataSourceMapper dataSourceMapper;
 
+    @Mock
+    private WorkFlowLineageService workFlowLineageService;
+
     @Test
     public void testQueryProcessDefinitionList() {
         long projectCode = 1L;
@@ -389,8 +394,9 @@ public class ProcessDefinitionServiceTest {
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, 
projectCode)).thenReturn(result);
         processDefinition.setReleaseState(ReleaseState.ONLINE);
         
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
-        Map<String, Object> dfOnlineRes = 
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 
46L);
-        Assert.assertEquals(Status.PROCESS_DEFINE_STATE_ONLINE, 
dfOnlineRes.get(Constants.STATUS));
+        Throwable exception = Assertions.assertThrows(ServiceException.class, 
() -> processDefinitionService.deleteProcessDefinitionByCode(loginUser, 
projectCode, 46L));
+        String formatter = 
MessageFormat.format(Status.PROCESS_DEFINE_STATE_ONLINE.getMsg(), 
processDefinition.getName());
+        Assertions.assertEquals(formatter, exception.getMessage());
 
         //scheduler list elements > 1
         processDefinition.setReleaseState(ReleaseState.OFFLINE);
@@ -401,6 +407,7 @@ public class ProcessDefinitionServiceTest {
         Mockito.when(scheduleMapper.deleteById(46)).thenReturn(1);
         
Mockito.when(processDefineMapper.deleteById(processDefinition.getId())).thenReturn(1);
         Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), 
processDefinition.getCode())).thenReturn(1);
+        
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), 
processDefinition.getCode())).thenReturn(Collections.emptySet());
         Map<String, Object> schedulerGreaterThanOneRes = 
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 
46L);
         Assert.assertEquals(Status.SUCCESS, 
schedulerGreaterThanOneRes.get(Constants.STATUS));
 
@@ -410,15 +417,26 @@ public class ProcessDefinitionServiceTest {
         putMsg(result, Status.SUCCESS, projectCode);
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, 
projectCode)).thenReturn(result);
         
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedule);
+        
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), 
processDefinition.getCode())).thenReturn(Collections.emptySet());
         Map<String, Object> schedulerOnlineRes = 
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 
46L);
         Assert.assertEquals(Status.SCHEDULE_CRON_STATE_ONLINE, 
schedulerOnlineRes.get(Constants.STATUS));
 
+        //process used by other task, sub process
+        loginUser.setUserType(UserType.ADMIN_USER);
+        putMsg(result, Status.SUCCESS, projectCode);
+        TaskMainInfo taskMainInfo = getTaskMainInfo().get(0);
+        
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), 
processDefinition.getCode())).thenReturn(ImmutableSet.copyOf(getTaskMainInfo()));
+        exception = Assertions.assertThrows(ServiceException.class, () -> 
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 
46L));
+        formatter = 
MessageFormat.format(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL.getMsg(),
 String.format("%s:%s", taskMainInfo.getProcessDefinitionName(), 
taskMainInfo.getTaskName()));
+        Assertions.assertEquals(formatter, exception.getMessage());
+
         //delete success
         schedule.setReleaseState(ReleaseState.OFFLINE);
         Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1);
         
Mockito.when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1);
         Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), 
processDefinition.getCode())).thenReturn(1);
         
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule());
+        
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), 
processDefinition.getCode())).thenReturn(Collections.emptySet());
         putMsg(result, Status.SUCCESS, projectCode);
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, 
projectCode)).thenReturn(result);
         Map<String, Object> deleteSuccess = 
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 
46L);
@@ -801,4 +819,19 @@ public class ProcessDefinitionServiceTest {
             result.put(Constants.MSG, status.getMsg());
         }
     }
+
+    /**
+     * get mock task main info
+     *
+     * @return schedule
+     */
+    private List<TaskMainInfo> getTaskMainInfo() {
+        List<TaskMainInfo> taskMainInfos = new ArrayList<>();
+        TaskMainInfo taskMainInfo = new TaskMainInfo();
+        taskMainInfo.setId(1);
+        taskMainInfo.setProcessDefinitionName("process");
+        taskMainInfo.setTaskName("task");
+        taskMainInfos.add(taskMainInfo);
+        return taskMainInfos;
+    }
 }
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index fcd6147421..26e611fe88 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -53,6 +53,7 @@ public final class Constants {
 
     public static final String FORMAT_SS = "%s%s";
     public static final String FORMAT_S_S = "%s/%s";
+    public static final String FORMAT_S_S_COLON = "%s:%s";
     public static final String FOLDER_SEPARATOR = "/";
 
     public static final String RESOURCE_TYPE_FILE = "resources";
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java
index 6fefd47990..7b5492b75b 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java
@@ -22,11 +22,16 @@ import 
org.apache.dolphinscheduler.common.enums.ReleaseState;
 import java.util.Date;
 import java.util.Map;
 
+import lombok.Data;
+
 /**
  * task main info
  */
+@Data
 public class TaskMainInfo {
 
+    private long id;
+
     /**
      * task name
      */
@@ -91,108 +96,4 @@ public class TaskMainInfo {
      * upstreamTaskName
      */
     private String upstreamTaskName;
-
-    public String getTaskName() {
-        return taskName;
-    }
-
-    public void setTaskName(String taskName) {
-        this.taskName = taskName;
-    }
-
-    public long getTaskCode() {
-        return taskCode;
-    }
-
-    public void setTaskCode(long taskCode) {
-        this.taskCode = taskCode;
-    }
-
-    public int getTaskVersion() {
-        return taskVersion;
-    }
-
-    public void setTaskVersion(int taskVersion) {
-        this.taskVersion = taskVersion;
-    }
-
-    public String getTaskType() {
-        return taskType;
-    }
-
-    public void setTaskType(String taskType) {
-        this.taskType = taskType;
-    }
-
-    public Date getTaskCreateTime() {
-        return taskCreateTime;
-    }
-
-    public void setTaskCreateTime(Date taskCreateTime) {
-        this.taskCreateTime = taskCreateTime;
-    }
-
-    public Date getTaskUpdateTime() {
-        return taskUpdateTime;
-    }
-
-    public void setTaskUpdateTime(Date taskUpdateTime) {
-        this.taskUpdateTime = taskUpdateTime;
-    }
-
-    public long getProcessDefinitionCode() {
-        return processDefinitionCode;
-    }
-
-    public void setProcessDefinitionCode(long processDefinitionCode) {
-        this.processDefinitionCode = processDefinitionCode;
-    }
-
-    public int getProcessDefinitionVersion() {
-        return processDefinitionVersion;
-    }
-
-    public void setProcessDefinitionVersion(int processDefinitionVersion) {
-        this.processDefinitionVersion = processDefinitionVersion;
-    }
-
-    public String getProcessDefinitionName() {
-        return processDefinitionName;
-    }
-
-    public void setProcessDefinitionName(String processDefinitionName) {
-        this.processDefinitionName = processDefinitionName;
-    }
-
-    public ReleaseState getProcessReleaseState() {
-        return processReleaseState;
-    }
-
-    public void setProcessReleaseState(ReleaseState processReleaseState) {
-        this.processReleaseState = processReleaseState;
-    }
-
-    public Map<Long, String> getUpstreamTaskMap() {
-        return upstreamTaskMap;
-    }
-
-    public void setUpstreamTaskMap(Map<Long, String> upstreamTaskMap) {
-        this.upstreamTaskMap = upstreamTaskMap;
-    }
-
-    public long getUpstreamTaskCode() {
-        return upstreamTaskCode;
-    }
-
-    public void setUpstreamTaskCode(long upstreamTaskCode) {
-        this.upstreamTaskCode = upstreamTaskCode;
-    }
-
-    public String getUpstreamTaskName() {
-        return upstreamTaskName;
-    }
-
-    public void setUpstreamTaskName(String upstreamTaskName) {
-        this.upstreamTaskName = upstreamTaskName;
-    }
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskRecord.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskRecord.java
deleted file mode 100644
index 5cfa095cec..0000000000
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskRecord.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.dao.entity;
-
-import java.util.Date;
-
-/**
- * task record for qianfan
- */
-public class TaskRecord {
-
-    /**
-     * id
-     */
-    private int id;
-
-    /**
-     * process id
-     */
-    private int procId;
-
-    /**
-     * procedure name
-     */
-    private String procName;
-
-    /**
-     * procedure date
-     */
-    private String procDate;
-
-    /**
-     * start date
-     */
-    private Date startTime;
-
-    /**
-     * end date
-     */
-    private Date endTime;
-
-    /**
-     * result
-     */
-    private String result;
-
-    /**
-     * duration unit: second
-     */
-    private int duration;
-
-    /**
-     * note
-     */
-    private String note;
-
-    /**
-     * schema
-     */
-    private String schema;
-
-    /**
-     * job id
-     */
-    private String jobId;
-
-
-    /**
-     * source tab
-     */
-    private String sourceTab;
-
-    /**
-     * source row count
-     */
-    private Long sourceRowCount;
-
-    /**
-     * target tab
-     */
-    private String targetTab;
-
-    /**
-     * target row count
-     */
-    private Long targetRowCount;
-
-    /**
-     * error code
-     */
-    private String errorCode;
-
-    public int getId() {
-        return id;
-    }
-
-    public void setId(int id) {
-        this.id = id;
-    }
-
-    public int getProcId() {
-        return procId;
-    }
-
-    public void setProcId(int procId) {
-        this.procId = procId;
-    }
-
-    public String getProcName() {
-        return procName;
-    }
-
-    public void setProcName(String procName) {
-        this.procName = procName;
-    }
-
-    public String getProcDate() {
-        return procDate;
-    }
-
-    public void setProcDate(String procDate) {
-        this.procDate = procDate;
-    }
-
-    public Date getStartTime() {
-        return startTime;
-    }
-
-    public void setStartTime(Date startTime) {
-        this.startTime = startTime;
-    }
-
-    public Date getEndTime() {
-        return endTime;
-    }
-
-    public void setEndTime(Date endTime) {
-        this.endTime = endTime;
-    }
-
-    public String getResult() {
-        return result;
-    }
-
-    public void setResult(String result) {
-        this.result = result;
-    }
-
-    public int getDuration() {
-        return duration;
-    }
-
-    public void setDuration(int duration) {
-        this.duration = duration;
-    }
-
-    public String getNote() {
-        return note;
-    }
-
-    public void setNote(String note) {
-        this.note = note;
-    }
-
-    public String getSchema() {
-        return schema;
-    }
-
-    public void setSchema(String schema) {
-        this.schema = schema;
-    }
-
-    public String getJobId() {
-        return jobId;
-    }
-
-    public void setJobId(String jobId) {
-        this.jobId = jobId;
-    }
-
-    public String getSourceTab() {
-        return sourceTab;
-    }
-
-    public void setSourceTab(String sourceTab) {
-        this.sourceTab = sourceTab;
-    }
-
-    public Long getSourceRowCount() {
-        return sourceRowCount;
-    }
-
-    public void setSourceRowCount(Long sourceRowCount) {
-        this.sourceRowCount = sourceRowCount;
-    }
-
-    public String getTargetTab() {
-        return targetTab;
-    }
-
-    public void setTargetTab(String targetTab) {
-        this.targetTab = targetTab;
-    }
-
-    public Long getTargetRowCount() {
-        return targetRowCount;
-    }
-
-    public void setTargetRowCount(Long targetRowCount) {
-        this.targetRowCount = targetRowCount;
-    }
-
-    public String getErrorCode() {
-        return errorCode;
-    }
-
-    public void setErrorCode(String errorCode) {
-        this.errorCode = errorCode;
-    }
-
-    @Override
-    public String toString() {
-        return "task record, id:" + id
-                + " proc id:" + procId
-                + " proc name:" + procName
-                + " proc date: " + procDate
-                + " start date:" + startTime
-                + " end date:" + endTime
-                + " result : " + result
-                + " duration : " + duration
-                + " note : " + note
-                + " schema : " + schema
-                + " job id : " + jobId
-                + " source table : " + sourceTab
-                + " source row count: " + sourceRowCount
-                + " target table : " + targetTab
-                + " target row count: " + targetRowCount
-                + " error code: " + errorCode
-                ;
-    }
-
-}
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
index dfdcafa640..951f12a14a 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.dao.mapper;
 
 import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
+import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
 import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
 
 import org.apache.ibatis.annotations.Param;
@@ -103,4 +104,44 @@ public interface WorkFlowLineageMapper {
     List<DependentProcessDefinition> 
queryUpstreamDependentParamsByProcessDefinitionCode(@Param("code") long code,
                                                                                
          @Param("taskType") String taskType);
 
+    /**
+     * Query all tasks type sub process depend on process definition.
+     *
+     * Query all upstream tasks from task type sub process.
+     *
+     * @param projectCode Project code want to query tasks dependence
+     * @param processDefinitionCode Process definition code want to query 
tasks dependence
+     * @return List of TaskMainInfo
+     */
+    List<TaskMainInfo> queryTaskSubProcessDepOnProcess(@Param("projectCode") 
long projectCode,
+                                                       
@Param("processDefinitionCode") long processDefinitionCode);
+
+    /**
+     * Query all tasks type dependent depend on process definition.
+     *
+     * Query all downstream tasks from task type dependent, method 
`queryTaskDepOnTask` is a proper subset of
+     * current method `queryTaskDepOnProcess`. Which mean with the same 
parameter processDefinitionCode, all tasks in
+     * `queryTaskDepOnTask` are in the result of method 
`queryTaskDepOnProcess`.
+     *
+     * @param projectCode Project code want to query tasks dependence
+     * @param processDefinitionCode Process definition code want to query 
tasks dependence
+     * @return List of TaskMainInfo
+     */
+    List<TaskMainInfo> queryTaskDependentDepOnProcess(@Param("projectCode") 
long projectCode,
+                                                      
@Param("processDefinitionCode") long processDefinitionCode);
+
+    /**
+     * Query all tasks depend on task, only downstream task support 
currently(from dependent task type).
+     *
+     * In case of dependent task type, method `queryTaskDepOnTask` is a proper 
subset of `queryTaskDepOnProcess`. Which
+     * mean with the same processDefinitionCode, all tasks in 
`queryTaskDepOnTask` are in method `queryTaskDepOnProcess`.
+     *
+     * @param projectCode Project code want to query tasks dependence
+     * @param processDefinitionCode Process definition code want to query 
tasks dependence
+     * @param taskCode Task code want to query tasks dependence
+     * @return dependent process definition
+     */
+    List<TaskMainInfo> queryTaskDepOnTask(@Param("projectCode") long 
projectCode,
+                                          @Param("processDefinitionCode") long 
processDefinitionCode,
+                                          @Param("taskCode") long taskCode);
 }
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
index 1815cfcd78..2689b6d50f 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
@@ -162,4 +162,86 @@
         </if>
         AND a.task_type = 'DEPENDENT'
     </select>
+
+    <select id="queryTaskSubProcessDepOnProcess" 
resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
+        select td.id
+            , td.name as taskName
+            , td.code as taskCode
+            , td.version as taskVersion
+            , td.task_type as taskType
+            , ptr.process_definition_code as processDefinitionCode
+            , pd.name as processDefinitionName
+            , pd.version as processDefinitionVersion
+            , pd.release_state as processReleaseState
+        from t_ds_task_definition td
+        join t_ds_process_task_relation ptr on ptr.post_task_code = td.code 
and td.version = ptr.post_task_version
+        join t_ds_process_definition pd on pd.code = 
ptr.process_definition_code and pd.version = ptr.process_definition_version
+        <where>
+            <if test="projectCode != 0">
+                and ptr.project_code = #{projectCode}
+            </if>
+            <!-- ptr.process_definition_code != #{processDefinitionCode} query 
task not in current workflow -->
+            <!-- For subprocess task type, using 
`concat('%"processDefinitionCode":', #{processDefinitionCode}, '%')` -->
+            <if test="processDefinitionCode != 0">
+                and td.task_type = 'SUB_PROCESS'
+                and ptr.process_definition_code != #{processDefinitionCode}
+                and td.task_params like concat('%"processDefinitionCode":', 
#{processDefinitionCode}, '%')
+            </if>
+        </where>
+    </select>
+
+    <select id="queryTaskDependentDepOnProcess" 
resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
+        select td.id
+        , td.name as taskName
+        , td.code as taskCode
+        , td.version as taskVersion
+        , td.task_type as taskType
+        , ptr.process_definition_code as processDefinitionCode
+        , pd.name as processDefinitionName
+        , pd.version as processDefinitionVersion
+        , pd.release_state as processReleaseState
+        from t_ds_task_definition td
+        join t_ds_process_task_relation ptr on ptr.post_task_code = td.code 
and td.version = ptr.post_task_version
+        join t_ds_process_definition pd on pd.code = 
ptr.process_definition_code and pd.version = ptr.process_definition_version
+        <where>
+            <if test="projectCode != 0">
+                and ptr.project_code = #{projectCode}
+            </if>
+            <!-- ptr.process_definition_code != #{processDefinitionCode} query 
task not in current workflow -->
+            <!-- For dependnet task type, using `like 
concat('%"definitionCode":', #{processDefinitionCode}, '%')` -->
+            <if test="processDefinitionCode != 0">
+                and td.task_type = 'DEPENDENT'
+                and ptr.process_definition_code != #{processDefinitionCode}
+                and td.task_params like concat('%"definitionCode":', 
#{processDefinitionCode}, '%')
+            </if>
+        </where>
+    </select>
+
+    <select id="queryTaskDepOnTask" 
resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
+        select td.id
+             , td.name as taskName
+             , td.code as taskCode
+             , td.version as taskVersion
+             , td.task_type as taskType
+             , ptr.process_definition_code as processDefinitionCode
+             , pd.name as processDefinitionName
+             , pd.version as processDefinitionVersion
+             , pd.release_state as processReleaseState
+        from t_ds_task_definition td
+        join t_ds_process_task_relation ptr on ptr.post_task_code = td.code 
and td.version = ptr.post_task_version
+        join t_ds_process_definition pd on pd.code = 
ptr.process_definition_code and pd.version = ptr.process_definition_version
+        <where>
+            <if test="projectCode != 0">
+                and ptr.project_code = #{projectCode}
+            </if>
+            <!-- ptr.process_definition_code != #{processDefinitionCode} query 
task not in current workflow -->
+            <if test="processDefinitionCode != 0">
+                and ptr.process_definition_code != #{processDefinitionCode}
+                and td.task_params like concat('%"definitionCode":', 
#{processDefinitionCode}, '%')
+            </if>
+            <if test="taskCode != 0">
+                and td.task_params like concat('%"depTaskCode":', #{taskCode}, 
'%')
+            </if>
+        </where>
+    </select>
 </mapper>

Reply via email to