This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 0db9bbd538 [fix] delete workflow or task used by other tasks (#10873)
0db9bbd538 is described below
commit 0db9bbd538b0a2301345e1f0cae3e3623039f1af
Author: Jiajie Zhong <[email protected]>
AuthorDate: Tue Jul 12 21:22:06 2022 +0800
[fix] delete workflow or task used by other tasks (#10873)
currently, users can delete process definitions used
in other sub-process tasks or in other dependent tasks.
but this change will break the dependence of those task
and failed DAG, this patch add validation of those
delete behavior to avoid this error
---
.../controller/ProcessDefinitionController.java | 4 +-
.../api/controller/WorkFlowLineageController.java | 41 ++++
.../apache/dolphinscheduler/api/enums/Status.java | 5 +-
.../api/service/ProcessDefinitionService.java | 4 +-
.../api/service/WorkFlowLineageService.java | 23 ++
.../service/impl/ProcessDefinitionServiceImpl.java | 183 ++++++++++-----
.../service/impl/WorkFlowLineageServiceImpl.java | 44 ++++
.../api/service/ProcessDefinitionServiceTest.java | 117 ++++++----
.../apache/dolphinscheduler/common/Constants.java | 1 +
.../dolphinscheduler/dao/entity/TaskMainInfo.java | 109 +--------
.../dolphinscheduler/dao/entity/TaskRecord.java | 257 ---------------------
.../dao/mapper/WorkFlowLineageMapper.java | 41 ++++
.../dao/mapper/WorkFlowLineageMapper.xml | 82 +++++++
13 files changed, 441 insertions(+), 470 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
index e5ffc69732..89bdf3f086 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
@@ -208,7 +208,7 @@ public class ProcessDefinitionController extends
BaseController {
}
/**
- * update process definition
+ * update process definition, with whole process definition object
including task definition, task relation and location.
*
* @param loginUser login user
* @param projectCode project code
@@ -791,7 +791,7 @@ public class ProcessDefinitionController extends
BaseController {
}
/**
- * update process definition basic info
+ * update process definition basic info, not including task definition,
task relation and location.
*
* @param loginUser login user
* @param projectCode project code
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java
index f89906c6f3..8319acc2ad 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java
@@ -18,17 +18,23 @@
package org.apache.dolphinscheduler.api.controller;
import static
org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKFLOW_LINEAGE_ERROR;
+import static
org.apache.dolphinscheduler.api.enums.Status.TASK_WITH_DEPENDENT_ERROR;
import static org.apache.dolphinscheduler.common.Constants.SESSION_USER;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ApiException;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.WorkFlowLineageService;
import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +42,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@@ -43,6 +50,8 @@ import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import springfox.documentation.annotations.ApiIgnore;
@@ -106,4 +115,36 @@ public class WorkFlowLineageController extends
BaseController {
return error(QUERY_WORKFLOW_LINEAGE_ERROR.getCode(),
QUERY_WORKFLOW_LINEAGE_ERROR.getMsg());
}
}
+
+ /**
+ * Whether task can be deleted or not, avoiding task depend on other task
of process definition delete by accident.
+ *
+ * @param loginUser login user
+ * @param projectCode project codes which taskCode belong
+ * @param processDefinitionCode project code which taskCode belong
+ * @param taskCode task definition code
+ * @return Result of task can be delete or not
+ */
+ @ApiOperation(value = "verifyTaskCanDelete", notes =
"VERIFY_TASK_CAN_DELETE")
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "projectCode", value =
"PROCESS_DEFINITION_NAME", required = true, type = "Long"),
+ @ApiImplicitParam(name = "processDefinitionCode", value =
"PROCESS_DEFINITION_CODE", required = true, type = "processDefinitionCode"),
+ @ApiImplicitParam(name = "taskCode", value = "TASK_DEFINITION_CODE",
required = true, dataType = "Long", example = "123456789"),
+ })
+ @PostMapping(value = "/tasks/verify-delete")
+ @ResponseStatus(HttpStatus.OK)
+ @ApiException(TASK_WITH_DEPENDENT_ERROR)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public Result verifyTaskCanDelete(@ApiIgnore @RequestAttribute(value =
Constants.SESSION_USER) User loginUser,
+ @ApiParam(name = "projectCode", value =
"PROJECT_CODE", required = true) @PathVariable long projectCode,
+ @RequestParam(value =
"processDefinitionCode", required = true) long processDefinitionCode,
+ @RequestParam(value = "taskCode",
required = true) long taskCode) {
+ Result result = new Result();
+ Optional<String> taskDepMsg =
workFlowLineageService.taskDepOnTaskMsg(projectCode, processDefinitionCode,
taskCode);
+ if (taskDepMsg.isPresent()) {
+ throw new ServiceException(taskDepMsg.get());
+ }
+ putMsg(result, Status.SUCCESS);
+ return result;
+ }
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 25212caafb..e54c23e906 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -193,7 +193,7 @@ public enum Status {
BATCH_MOVE_PROCESS_DEFINITION_ERROR(10160, "batch move process definition
error", "移动工作流错误"),
QUERY_WORKFLOW_LINEAGE_ERROR(10161, "query workflow lineage error",
"查询血缘失败"),
QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_ERROR(10162, "query authorized
and user created project error error", "查询授权的和用户创建的项目错误"),
- DELETE_PROCESS_DEFINITION_BY_CODE_FAIL(10163, "delete process definition
by code fail, for there are {0} process instances in executing using it",
"删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"),
+ DELETE_PROCESS_DEFINITION_EXECUTING_FAIL(10163, "delete process definition
by code fail, for there are {0} process instances in executing using it",
"删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"),
CHECK_OS_TENANT_CODE_ERROR(10164, "Tenant code invalid, should follow
linux's users naming conventions", "非法的租户名,需要遵守 Linux 用户命名规范"),
FORCE_TASK_SUCCESS_ERROR(10165, "force task success error", "强制成功任务实例错误"),
TASK_INSTANCE_STATE_OPERATION_ERROR(10166, "the status of task instance
{0} is {1},Cannot perform force success operation",
"任务实例[{0}]的状态是[{1}],无法执行强制成功操作"),
@@ -217,6 +217,9 @@ public enum Status {
PROJECT_NOT_EXIST(10190, "This project was not found. Please refresh
page.", "该项目不存在,请刷新页面"),
TASK_INSTANCE_HOST_IS_NULL(10191, "task instance host is null",
"任务实例host为空"),
QUERY_EXECUTING_WORKFLOW_ERROR(10192, "query executing workflow error",
"查询运行的工作流实例错误"),
+ DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL(10193, "delete process
definition fail, cause used by other tasks: {0}", "删除工作流定时失败,被其他任务引用:{0}"),
+ DELETE_TASK_USE_BY_OTHER_FAIL(10194, "delete task {0} fail, cause used by
other tasks: {1}", "删除任务 {0} 失败,被其他任务引用:{1}"),
+ TASK_WITH_DEPENDENT_ERROR(10195, "task used in other tasks", "删除被其他任务引用"),
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
index 49e3be075c..8ba7efb463 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
@@ -158,7 +158,7 @@ public interface ProcessDefinitionService {
long targetProjectCode);
/**
- * update process definition
+ * update process definition, with whole process definition object
including task definition, task relation and location.
*
* @param loginUser login user
* @param projectCode project code
@@ -398,7 +398,7 @@ public interface ProcessDefinitionService {
ProcessExecutionTypeEnum
executionType);
/**
- * update process definition basic info
+ * update process definition basic info, not including task definition,
task relation and location.
*
* @param loginUser login user
* @param projectCode project code
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java
index f4c32b68f6..04902a4d0d 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java
@@ -17,7 +17,11 @@
package org.apache.dolphinscheduler.api.service;
+import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
+
import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
/**
* work flow lineage service
@@ -29,4 +33,23 @@ public interface WorkFlowLineageService {
Map<String, Object> queryWorkFlowLineageByCode(long projectCode, long
workFlowCode);
Map<String, Object> queryWorkFlowLineage(long projectCode);
+
+ /**
+ * Query tasks depend on process definition, include upstream or downstream
+ *
+ * @param projectCode Project code want to query tasks dependence
+ * @param processDefinitionCode Process definition code want to query
tasks dependence
+ * @return Set of TaskMainInfo
+ */
+ Set<TaskMainInfo> queryTaskDepOnProcess(long projectCode, long
processDefinitionCode);
+
+ /**
+ * Query and return tasks dependence with string format, is a wrapper of
queryTaskDepOnTask and task query method.
+ *
+ * @param projectCode Project code want to query tasks dependence
+ * @param processDefinitionCode Process definition code want to query
tasks dependence
+ * @param taskCode Task code want to query tasks dependence
+ * @return dependent process definition
+ */
+ Optional<String> taskDepOnTaskMsg(long projectCode, long
processDefinitionCode, long taskCode);
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index d2e974052f..a33b7668e6 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -17,40 +17,26 @@
package org.apache.dolphinscheduler.api.service.impl;
-import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
-
-import javax.servlet.ServletOutputStream;
-import javax.servlet.http.HttpServletResponse;
-
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_MOVE;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_DELETE;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_LIST;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_BATCH_COPY;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_CREATE;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_DELETE;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_EXPORT;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_EXPORT;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_IMPORT;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_ONLINE_OFFLINE;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_TREE_VIEW;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
+import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static org.apache.dolphinscheduler.common.Constants.EMPTY_STRING;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.COMPLEX_TASK_TYPES;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SQL;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.api.dto.DagDataSchedule;
import org.apache.dolphinscheduler.api.dto.ScheduleParam;
import org.apache.dolphinscheduler.api.dto.treeview.Instance;
@@ -61,6 +47,7 @@ import
org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.SchedulerService;
+import org.apache.dolphinscheduler.api.service.WorkFlowLineageService;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.FileUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@@ -96,6 +83,7 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
@@ -116,6 +104,36 @@ import
org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletResponse;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -123,25 +141,13 @@ import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_MOVE;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_DELETE;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_LIST;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_BATCH_COPY;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_CREATE;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_DELETE;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_EXPORT;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_EXPORT;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_IMPORT;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_ONLINE_OFFLINE;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_TREE_VIEW;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE;
-import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
-import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-import static org.apache.dolphinscheduler.common.Constants.EMPTY_STRING;
-import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.COMPLEX_TASK_TYPES;
-import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SQL;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
/**
* process definition service impl
@@ -204,6 +210,9 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
@Autowired
private TaskPluginManager taskPluginManager;
+ @Autowired
+ private WorkFlowLineageService workFlowLineageService;
+
/**
* create process definition
*
@@ -614,6 +623,31 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return updateDagDefine(loginUser, taskRelationList, processDefinition,
processDefinitionDeepCopy, taskDefinitionLogs, otherParamsJson);
}
+ /**
+ * Task want to delete whether used in other task, should throw exception
when have be used.
+ *
+ * This function avoid delete task already dependencies by other tasks by
accident.
+ *
+ * @param processDefinition ProcessDefinition you change task definition
and task relation
+ * @param taskRelationList All the latest task relation list from process
definition
+ */
+ private void taskUsedInOtherTaskValid(ProcessDefinition processDefinition,
List<ProcessTaskRelationLog> taskRelationList) {
+ List<ProcessTaskRelation> oldProcessTaskRelationList =
processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(),
processDefinition.getCode());
+ Set<ProcessTaskRelationLog> oldProcessTaskRelationSet =
oldProcessTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toSet());
+ StringBuilder sb = new StringBuilder();
+ for (ProcessTaskRelationLog oldProcessTaskRelation:
oldProcessTaskRelationSet) {
+ boolean oldTaskExists =
taskRelationList.stream().anyMatch(relation ->
oldProcessTaskRelation.getPostTaskCode() == relation.getPostTaskCode());
+ if (!oldTaskExists) {
+ Optional<String> taskDepMsg =
workFlowLineageService.taskDepOnTaskMsg(
+ processDefinition.getProjectCode(),
oldProcessTaskRelation.getProcessDefinitionCode(),
oldProcessTaskRelation.getPostTaskCode());
+ taskDepMsg.ifPresent(sb::append);
+ }
+ if (sb.length() != 0) {
+ throw new ServiceException(sb.toString());
+ }
+ }
+ }
+
protected Map<String, Object> updateDagDefine(User loginUser,
List<ProcessTaskRelationLog>
taskRelationList,
ProcessDefinition
processDefinition,
@@ -656,6 +690,8 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new
ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
+
+ taskUsedInOtherTaskValid(processDefinition, taskRelationList);
int insertResult = processService.saveTaskRelation(loginUser,
processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion, taskRelationList,
taskDefinitionLogs, Boolean.TRUE);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
@@ -698,6 +734,35 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
+ /**
+ * Process definition want to delete whether used in other task, should
throw exception when have be used.
+ *
+ * This function avoid delete process definition already dependencies by
other tasks by accident.
+ *
+ * @param processDefinition ProcessDefinition you change task definition
and task relation
+ */
+ private void processDefinitionUsedInOtherTaskValid(ProcessDefinition
processDefinition) {
+ // check process definition is already online
+ if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
+ throw new ServiceException(Status.PROCESS_DEFINE_STATE_ONLINE,
processDefinition.getName());
+ }
+
+ // check process instances is already running
+ List<ProcessInstance> processInstances =
processInstanceService.queryByProcessDefineCodeAndStatus(processDefinition.getCode(),
Constants.NOT_TERMINATED_STATES);
+ if (CollectionUtils.isNotEmpty(processInstances)) {
+ throw new
ServiceException(Status.DELETE_PROCESS_DEFINITION_EXECUTING_FAIL,
processInstances.size());
+ }
+
+ // check process used by other task, including subprocess and
dependent task type
+ Set<TaskMainInfo> taskDepOnProcess =
workFlowLineageService.queryTaskDepOnProcess(processDefinition.getProjectCode(),
processDefinition.getCode());
+ if (CollectionUtils.isNotEmpty(taskDepOnProcess)) {
+ String taskDepDetail = taskDepOnProcess.stream()
+ .map(task -> String.format(Constants.FORMAT_S_S_COLON,
task.getProcessDefinitionName(), task.getTaskName()))
+ .collect(Collectors.joining(Constants.COMMA));
+ throw new
ServiceException(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL,
taskDepDetail);
+ }
+ }
+
/**
* delete process definition by code
*
@@ -727,17 +792,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
- // check process definition is already online
- if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
- putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE,
processDefinition.getName());
- return result;
- }
- // check process instances is already running
- List<ProcessInstance> processInstances =
processInstanceService.queryByProcessDefineCodeAndStatus(processDefinition.getCode(),
Constants.NOT_TERMINATED_STATES);
- if (CollectionUtils.isNotEmpty(processInstances)) {
- putMsg(result, Status.DELETE_PROCESS_DEFINITION_BY_CODE_FAIL,
processInstances.size());
- return result;
- }
+ processDefinitionUsedInOtherTaskValid(processDefinition);
// get the timing according to the process definition
Schedule scheduleObj =
scheduleMapper.queryByProcessDefinitionCode(code);
@@ -946,7 +1001,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
public Map<String, Object> importSqlProcessDefinition(User loginUser, long
projectCode, MultipartFile file) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByCode(projectCode);
- result = projectService.checkProjectAndAuth(loginUser, project,
projectCode,WORKFLOW_IMPORT);
+ result = projectService.checkProjectAndAuth(loginUser, project,
projectCode,WORKFLOW_IMPORT);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
index 3b9d38ec02..836ac3be29 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
@@ -28,22 +28,26 @@ import
org.apache.dolphinscheduler.dao.entity.ProcessLineage;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -66,6 +70,9 @@ public class WorkFlowLineageServiceImpl extends
BaseServiceImpl implements WorkF
@Autowired
private TaskDefinitionLogMapper taskDefinitionLogMapper;
+ @Autowired
+ private TaskDefinitionMapper taskDefinitionMapper;
+
@Override
public Map<String, Object> queryWorkFlowLineageByName(long projectCode,
String workFlowName) {
Map<String, Object> result = new HashMap<>();
@@ -223,4 +230,41 @@ public class WorkFlowLineageServiceImpl extends
BaseServiceImpl implements WorkF
}
return sourceWorkFlowCodes;
}
+
+ /**
+ * Query and return tasks dependence with string format, is a wrapper of
queryTaskDepOnTask and task query method.
+ *
+ * @param projectCode Project code want to query tasks dependence
+ * @param processDefinitionCode Process definition code want to query
tasks dependence
+ * @param taskCode Task code want to query tasks dependence
+ * @return Optional of formatter message
+ */
+ @Override
+ public Optional<String> taskDepOnTaskMsg(long projectCode, long
processDefinitionCode, long taskCode) {
+ List<TaskMainInfo> tasksDep =
workFlowLineageMapper.queryTaskDepOnTask(projectCode, processDefinitionCode,
taskCode);
+ if (CollectionUtils.isEmpty(tasksDep)) {
+ return Optional.empty();
+ }
+
+ String taskDepStr = tasksDep.stream().map(task ->
String.format(Constants.FORMAT_S_S_COLON, task.getProcessDefinitionName(),
task.getTaskName())).collect(Collectors.joining(Constants.COMMA));
+ TaskDefinition taskDefinition =
taskDefinitionMapper.queryByCode(taskCode);
+ return
Optional.of(MessageFormat.format(Status.DELETE_TASK_USE_BY_OTHER_FAIL.getMsg(),
taskDefinition.getName(), taskDepStr));
+ }
+
+ /**
+ * Query tasks depend on process definition, include upstream or downstream
+ *
+ * @param projectCode Project code want to query tasks dependence
+ * @param processDefinitionCode Process definition code want to query
tasks dependence
+ * @return Set of TaskMainInfo
+ */
+ @Override
+ public Set<TaskMainInfo> queryTaskDepOnProcess(long projectCode, long
processDefinitionCode) {
+ Set<TaskMainInfo> taskMainInfos = new HashSet<>();
+ List<TaskMainInfo> taskDependents =
workFlowLineageMapper.queryTaskDependentDepOnProcess(projectCode,
processDefinitionCode);
+ List<TaskMainInfo> taskSubProcess =
workFlowLineageMapper.queryTaskSubProcessDepOnProcess(projectCode,
processDefinitionCode);
+ taskMainInfos.addAll(taskDependents);
+ taskMainInfos.addAll(taskSubProcess);
+ return taskMainInfos;
+ }
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 4eb7daaa6e..f18530196e 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -17,28 +17,19 @@
package org.apache.dolphinscheduler.api.service;
-import java.io.ByteArrayOutputStream;
-import java.nio.charset.StandardCharsets;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipOutputStream;
-
-import javax.servlet.http.HttpServletResponse;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_MOVE;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_BATCH_COPY;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_CREATE;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_DELETE;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_IMPORT;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_TREE_VIEW;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE;
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import com.google.common.collect.Lists;
+import static org.powermock.api.mockito.PowerMockito.mock;
-import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import
org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
@@ -58,6 +49,7 @@ import
org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
@@ -65,28 +57,44 @@ import
org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
-import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.enums.DbType;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import javax.servlet.http.HttpServletResponse;
+
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.mock.web.MockMultipartFile;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_MOVE;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_BATCH_COPY;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_CREATE;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_DELETE;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_IMPORT;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_TREE_VIEW;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE;
-import static org.powermock.api.mockito.PowerMockito.mock;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
/**
* process definition service test
@@ -98,13 +106,13 @@ public class ProcessDefinitionServiceTest {
+
"\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
+
"\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]";
- private static final String taskDefinitionJson =
"[{\"code\":123456789,\"name\":\"test1\",\"version\":1,\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\","
+
-
"\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo
1\",\"dependence\":{},\"conditionResult\":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{},"
+
-
"\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\","
+
-
"\"timeoutNotifyStrategy\":null,\"timeout\":0,\"environmentCode\":-1},{\"code\":123451234,\"name\":\"test2\",\"version\":1,\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\","
+
-
"\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo
2\",\"dependence\":{},\"conditionResult\":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{},"
+
-
"\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\","
+
-
"\"timeoutNotifyStrategy\":\"WARN\",\"timeout\":0,\"environmentCode\":-1}]";
+ private static final String taskDefinitionJson =
"[{\"code\":123456789,\"name\":\"test1\",\"version\":1,\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\","
+ +
"\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo
1\",\"dependence\":{},\"conditionResult\":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{},"
+ +
"\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\","
+ +
"\"timeoutNotifyStrategy\":null,\"timeout\":0,\"environmentCode\":-1},{\"code\":123451234,\"name\":\"test2\",\"version\":1,\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\","
+ +
"\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo
2\",\"dependence\":{},\"conditionResult\":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{},"
+ +
"\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\","
+ +
"\"timeoutNotifyStrategy\":\"WARN\",\"timeout\":0,\"environmentCode\":-1}]";
@InjectMocks
private ProcessDefinitionServiceImpl processDefinitionService;
@@ -130,14 +138,15 @@ public class ProcessDefinitionServiceTest {
@Mock
private ProcessInstanceService processInstanceService;
- @Mock
- private TaskInstanceMapper taskInstanceMapper;
@Mock
private TenantMapper tenantMapper;
@Mock
private DataSourceMapper dataSourceMapper;
+ @Mock
+ private WorkFlowLineageService workFlowLineageService;
+
@Test
public void testQueryProcessDefinitionList() {
long projectCode = 1L;
@@ -392,8 +401,9 @@ public class ProcessDefinitionServiceTest {
putMsg(result, Status.SUCCESS, projectCode);
processDefinition.setReleaseState(ReleaseState.ONLINE);
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
- Map<String, Object> dfOnlineRes =
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode,
46L);
- Assert.assertEquals(Status.PROCESS_DEFINE_STATE_ONLINE,
dfOnlineRes.get(Constants.STATUS));
+ Throwable exception = Assertions.assertThrows(ServiceException.class,
() -> processDefinitionService.deleteProcessDefinitionByCode(loginUser,
projectCode, 46L));
+ String formatter =
MessageFormat.format(Status.PROCESS_DEFINE_STATE_ONLINE.getMsg(),
processDefinition.getName());
+ Assertions.assertEquals(formatter, exception.getMessage());
//scheduler list elements > 1
processDefinition.setReleaseState(ReleaseState.OFFLINE);
@@ -403,6 +413,7 @@ public class ProcessDefinitionServiceTest {
Mockito.when(scheduleMapper.deleteById(46)).thenReturn(1);
Mockito.when(processDefineMapper.deleteById(processDefinition.getId())).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(),
processDefinition.getCode())).thenReturn(1);
+
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(),
processDefinition.getCode())).thenReturn(Collections.emptySet());
Map<String, Object> schedulerGreaterThanOneRes =
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode,
46L);
Assert.assertEquals(Status.SUCCESS,
schedulerGreaterThanOneRes.get(Constants.STATUS));
@@ -411,15 +422,26 @@ public class ProcessDefinitionServiceTest {
schedule.setReleaseState(ReleaseState.ONLINE);
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedule);
+
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(),
processDefinition.getCode())).thenReturn(Collections.emptySet());
Map<String, Object> schedulerOnlineRes =
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode,
46L);
Assert.assertEquals(Status.SCHEDULE_CRON_STATE_ONLINE,
schedulerOnlineRes.get(Constants.STATUS));
+ //process used by other task, sub process
+ loginUser.setUserType(UserType.ADMIN_USER);
+ putMsg(result, Status.SUCCESS, projectCode);
+ TaskMainInfo taskMainInfo = getTaskMainInfo().get(0);
+
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(),
processDefinition.getCode())).thenReturn(ImmutableSet.copyOf(getTaskMainInfo()));
+ exception = Assertions.assertThrows(ServiceException.class, () ->
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode,
46L));
+ formatter =
MessageFormat.format(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL.getMsg(),
String.format("%s:%s", taskMainInfo.getProcessDefinitionName(),
taskMainInfo.getTaskName()));
+ Assertions.assertEquals(formatter, exception.getMessage());
+
//delete success
schedule.setReleaseState(ReleaseState.OFFLINE);
Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1);
Mockito.when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(),
processDefinition.getCode())).thenReturn(1);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule());
+
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(),
processDefinition.getCode())).thenReturn(Collections.emptySet());
putMsg(result, Status.SUCCESS, projectCode);
Map<String, Object> deleteSuccess =
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode,
46L);
Assert.assertEquals(Status.SUCCESS,
deleteSuccess.get(Constants.STATUS));
@@ -812,4 +834,19 @@ public class ProcessDefinitionServiceTest {
result.put(Constants.MSG, status.getMsg());
}
}
+
+ /**
+ * get mock task main info
+ *
+ * @return schedule
+ */
+ private List<TaskMainInfo> getTaskMainInfo() {
+ List<TaskMainInfo> taskMainInfos = new ArrayList<>();
+ TaskMainInfo taskMainInfo = new TaskMainInfo();
+ taskMainInfo.setId(1);
+ taskMainInfo.setProcessDefinitionName("process");
+ taskMainInfo.setTaskName("task");
+ taskMainInfos.add(taskMainInfo);
+ return taskMainInfos;
+ }
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 1ea6bc0d8f..099620778e 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -52,6 +52,7 @@ public final class Constants {
public static final String
REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS =
"/lock/failover/startup-masters";
public static final String FORMAT_SS = "%s%s";
public static final String FORMAT_S_S = "%s/%s";
+ public static final String FORMAT_S_S_COLON = "%s:%s";
public static final String FOLDER_SEPARATOR = "/";
public static final String RESOURCE_TYPE_FILE = "resources";
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java
index 6fefd47990..7b5492b75b 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java
@@ -22,11 +22,16 @@ import
org.apache.dolphinscheduler.common.enums.ReleaseState;
import java.util.Date;
import java.util.Map;
+import lombok.Data;
+
/**
* task main info
*/
+@Data
public class TaskMainInfo {
+ private long id;
+
/**
* task name
*/
@@ -91,108 +96,4 @@ public class TaskMainInfo {
* upstreamTaskName
*/
private String upstreamTaskName;
-
- public String getTaskName() {
- return taskName;
- }
-
- public void setTaskName(String taskName) {
- this.taskName = taskName;
- }
-
- public long getTaskCode() {
- return taskCode;
- }
-
- public void setTaskCode(long taskCode) {
- this.taskCode = taskCode;
- }
-
- public int getTaskVersion() {
- return taskVersion;
- }
-
- public void setTaskVersion(int taskVersion) {
- this.taskVersion = taskVersion;
- }
-
- public String getTaskType() {
- return taskType;
- }
-
- public void setTaskType(String taskType) {
- this.taskType = taskType;
- }
-
- public Date getTaskCreateTime() {
- return taskCreateTime;
- }
-
- public void setTaskCreateTime(Date taskCreateTime) {
- this.taskCreateTime = taskCreateTime;
- }
-
- public Date getTaskUpdateTime() {
- return taskUpdateTime;
- }
-
- public void setTaskUpdateTime(Date taskUpdateTime) {
- this.taskUpdateTime = taskUpdateTime;
- }
-
- public long getProcessDefinitionCode() {
- return processDefinitionCode;
- }
-
- public void setProcessDefinitionCode(long processDefinitionCode) {
- this.processDefinitionCode = processDefinitionCode;
- }
-
- public int getProcessDefinitionVersion() {
- return processDefinitionVersion;
- }
-
- public void setProcessDefinitionVersion(int processDefinitionVersion) {
- this.processDefinitionVersion = processDefinitionVersion;
- }
-
- public String getProcessDefinitionName() {
- return processDefinitionName;
- }
-
- public void setProcessDefinitionName(String processDefinitionName) {
- this.processDefinitionName = processDefinitionName;
- }
-
- public ReleaseState getProcessReleaseState() {
- return processReleaseState;
- }
-
- public void setProcessReleaseState(ReleaseState processReleaseState) {
- this.processReleaseState = processReleaseState;
- }
-
- public Map<Long, String> getUpstreamTaskMap() {
- return upstreamTaskMap;
- }
-
- public void setUpstreamTaskMap(Map<Long, String> upstreamTaskMap) {
- this.upstreamTaskMap = upstreamTaskMap;
- }
-
- public long getUpstreamTaskCode() {
- return upstreamTaskCode;
- }
-
- public void setUpstreamTaskCode(long upstreamTaskCode) {
- this.upstreamTaskCode = upstreamTaskCode;
- }
-
- public String getUpstreamTaskName() {
- return upstreamTaskName;
- }
-
- public void setUpstreamTaskName(String upstreamTaskName) {
- this.upstreamTaskName = upstreamTaskName;
- }
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskRecord.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskRecord.java
deleted file mode 100644
index 5cfa095cec..0000000000
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskRecord.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.dao.entity;
-
-import java.util.Date;
-
-/**
- * task record for qianfan
- */
-public class TaskRecord {
-
- /**
- * id
- */
- private int id;
-
- /**
- * process id
- */
- private int procId;
-
- /**
- * procedure name
- */
- private String procName;
-
- /**
- * procedure date
- */
- private String procDate;
-
- /**
- * start date
- */
- private Date startTime;
-
- /**
- * end date
- */
- private Date endTime;
-
- /**
- * result
- */
- private String result;
-
- /**
- * duration unit: second
- */
- private int duration;
-
- /**
- * note
- */
- private String note;
-
- /**
- * schema
- */
- private String schema;
-
- /**
- * job id
- */
- private String jobId;
-
-
- /**
- * source tab
- */
- private String sourceTab;
-
- /**
- * source row count
- */
- private Long sourceRowCount;
-
- /**
- * target tab
- */
- private String targetTab;
-
- /**
- * target row count
- */
- private Long targetRowCount;
-
- /**
- * error code
- */
- private String errorCode;
-
- public int getId() {
- return id;
- }
-
- public void setId(int id) {
- this.id = id;
- }
-
- public int getProcId() {
- return procId;
- }
-
- public void setProcId(int procId) {
- this.procId = procId;
- }
-
- public String getProcName() {
- return procName;
- }
-
- public void setProcName(String procName) {
- this.procName = procName;
- }
-
- public String getProcDate() {
- return procDate;
- }
-
- public void setProcDate(String procDate) {
- this.procDate = procDate;
- }
-
- public Date getStartTime() {
- return startTime;
- }
-
- public void setStartTime(Date startTime) {
- this.startTime = startTime;
- }
-
- public Date getEndTime() {
- return endTime;
- }
-
- public void setEndTime(Date endTime) {
- this.endTime = endTime;
- }
-
- public String getResult() {
- return result;
- }
-
- public void setResult(String result) {
- this.result = result;
- }
-
- public int getDuration() {
- return duration;
- }
-
- public void setDuration(int duration) {
- this.duration = duration;
- }
-
- public String getNote() {
- return note;
- }
-
- public void setNote(String note) {
- this.note = note;
- }
-
- public String getSchema() {
- return schema;
- }
-
- public void setSchema(String schema) {
- this.schema = schema;
- }
-
- public String getJobId() {
- return jobId;
- }
-
- public void setJobId(String jobId) {
- this.jobId = jobId;
- }
-
- public String getSourceTab() {
- return sourceTab;
- }
-
- public void setSourceTab(String sourceTab) {
- this.sourceTab = sourceTab;
- }
-
- public Long getSourceRowCount() {
- return sourceRowCount;
- }
-
- public void setSourceRowCount(Long sourceRowCount) {
- this.sourceRowCount = sourceRowCount;
- }
-
- public String getTargetTab() {
- return targetTab;
- }
-
- public void setTargetTab(String targetTab) {
- this.targetTab = targetTab;
- }
-
- public Long getTargetRowCount() {
- return targetRowCount;
- }
-
- public void setTargetRowCount(Long targetRowCount) {
- this.targetRowCount = targetRowCount;
- }
-
- public String getErrorCode() {
- return errorCode;
- }
-
- public void setErrorCode(String errorCode) {
- this.errorCode = errorCode;
- }
-
- @Override
- public String toString() {
- return "task record, id:" + id
- + " proc id:" + procId
- + " proc name:" + procName
- + " proc date: " + procDate
- + " start date:" + startTime
- + " end date:" + endTime
- + " result : " + result
- + " duration : " + duration
- + " note : " + note
- + " schema : " + schema
- + " job id : " + jobId
- + " source table : " + sourceTab
- + " source row count: " + sourceRowCount
- + " target table : " + targetTab
- + " target row count: " + targetRowCount
- + " error code: " + errorCode
- ;
- }
-
-}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
index dfdcafa640..951f12a14a 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
+import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.ibatis.annotations.Param;
@@ -103,4 +104,44 @@ public interface WorkFlowLineageMapper {
List<DependentProcessDefinition>
queryUpstreamDependentParamsByProcessDefinitionCode(@Param("code") long code,
@Param("taskType") String taskType);
+ /**
+ * Query all tasks type sub process depend on process definition.
+ *
+ * Query all upstream tasks from task type sub process.
+ *
+ * @param projectCode Project code want to query tasks dependence
+ * @param processDefinitionCode Process definition code want to query
tasks dependence
+ * @return List of TaskMainInfo
+ */
+ List<TaskMainInfo> queryTaskSubProcessDepOnProcess(@Param("projectCode")
long projectCode,
+
@Param("processDefinitionCode") long processDefinitionCode);
+
+ /**
+ * Query all tasks type dependent depend on process definition.
+ *
+ * Query all downstream tasks from task type dependent, method
`queryTaskDepOnTask` is a proper subset of
+ * current method `queryTaskDepOnProcess`. Which mean with the same
parameter processDefinitionCode, all tasks in
+ * `queryTaskDepOnTask` are in the result of method
`queryTaskDepOnProcess`.
+ *
+ * @param projectCode Project code want to query tasks dependence
+ * @param processDefinitionCode Process definition code want to query
tasks dependence
+ * @return List of TaskMainInfo
+ */
+ List<TaskMainInfo> queryTaskDependentDepOnProcess(@Param("projectCode")
long projectCode,
+
@Param("processDefinitionCode") long processDefinitionCode);
+
+ /**
+ * Query all tasks depend on task, only downstream task support
currently(from dependent task type).
+ *
+ * In case of dependent task type, method `queryTaskDepOnTask` is a proper
subset of `queryTaskDepOnProcess`. Which
+ * mean with the same processDefinitionCode, all tasks in
`queryTaskDepOnTask` are in method `queryTaskDepOnProcess`.
+ *
+ * @param projectCode Project code want to query tasks dependence
+ * @param processDefinitionCode Process definition code want to query
tasks dependence
+ * @param taskCode Task code want to query tasks dependence
+ * @return dependent process definition
+ */
+ List<TaskMainInfo> queryTaskDepOnTask(@Param("projectCode") long
projectCode,
+ @Param("processDefinitionCode") long
processDefinitionCode,
+ @Param("taskCode") long taskCode);
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
index 6b5e487c0a..b17499bb60 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
@@ -161,4 +161,86 @@
</if>
AND a.task_type = 'DEPENDENT'
</select>
+
+ <select id="queryTaskSubProcessDepOnProcess"
resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
+ select td.id
+ , td.name as taskName
+ , td.code as taskCode
+ , td.version as taskVersion
+ , td.task_type as taskType
+ , ptr.process_definition_code as processDefinitionCode
+ , pd.name as processDefinitionName
+ , pd.version as processDefinitionVersion
+ , pd.release_state as processReleaseState
+ from t_ds_task_definition td
+ join t_ds_process_task_relation ptr on ptr.post_task_code = td.code
and td.version = ptr.post_task_version
+ join t_ds_process_definition pd on pd.code =
ptr.process_definition_code and pd.version = ptr.process_definition_version
+ <where>
+ <if test="projectCode != 0">
+ and ptr.project_code = #{projectCode}
+ </if>
+ <!-- ptr.process_definition_code != #{processDefinitionCode} query
task not in current workflow -->
+ <!-- For subprocess task type, using
`concat('%"processDefinitionCode":', #{processDefinitionCode}, '%')` -->
+ <if test="processDefinitionCode != 0">
+ and td.task_type = 'SUB_PROCESS'
+ and ptr.process_definition_code != #{processDefinitionCode}
+ and td.task_params like concat('%"processDefinitionCode":',
#{processDefinitionCode}, '%')
+ </if>
+ </where>
+ </select>
+
+ <select id="queryTaskDependentDepOnProcess"
resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
+ select td.id
+ , td.name as taskName
+ , td.code as taskCode
+ , td.version as taskVersion
+ , td.task_type as taskType
+ , ptr.process_definition_code as processDefinitionCode
+ , pd.name as processDefinitionName
+ , pd.version as processDefinitionVersion
+ , pd.release_state as processReleaseState
+ from t_ds_task_definition td
+ join t_ds_process_task_relation ptr on ptr.post_task_code = td.code
and td.version = ptr.post_task_version
+ join t_ds_process_definition pd on pd.code =
ptr.process_definition_code and pd.version = ptr.process_definition_version
+ <where>
+ <if test="projectCode != 0">
+ and ptr.project_code = #{projectCode}
+ </if>
+ <!-- ptr.process_definition_code != #{processDefinitionCode} query
task not in current workflow -->
+ <!-- For dependnet task type, using `like
concat('%"definitionCode":', #{processDefinitionCode}, '%')` -->
+ <if test="processDefinitionCode != 0">
+ and td.task_type = 'DEPENDENT'
+ and ptr.process_definition_code != #{processDefinitionCode}
+ and td.task_params like concat('%"definitionCode":',
#{processDefinitionCode}, '%')
+ </if>
+ </where>
+ </select>
+
+ <select id="queryTaskDepOnTask"
resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
+ select td.id
+ , td.name as taskName
+ , td.code as taskCode
+ , td.version as taskVersion
+ , td.task_type as taskType
+ , ptr.process_definition_code as processDefinitionCode
+ , pd.name as processDefinitionName
+ , pd.version as processDefinitionVersion
+ , pd.release_state as processReleaseState
+ from t_ds_task_definition td
+ join t_ds_process_task_relation ptr on ptr.post_task_code = td.code
and td.version = ptr.post_task_version
+ join t_ds_process_definition pd on pd.code =
ptr.process_definition_code and pd.version = ptr.process_definition_version
+ <where>
+ <if test="projectCode != 0">
+ and ptr.project_code = #{projectCode}
+ </if>
+ <!-- ptr.process_definition_code != #{processDefinitionCode} query
task not in current workflow -->
+ <if test="processDefinitionCode != 0">
+ and ptr.process_definition_code != #{processDefinitionCode}
+ and td.task_params like concat('%"definitionCode":',
#{processDefinitionCode}, '%')
+ </if>
+ <if test="taskCode != 0">
+ and td.task_params like concat('%"depTaskCode":', #{taskCode},
'%')
+ </if>
+ </where>
+ </select>
</mapper>