This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b265297 [Improvement][API] fix 6772 new api bug (#7098)
b265297 is described below
commit b265297fc86108501ce8ca306a38fcfed1eb2fe5
Author: JinYong Li <[email protected]>
AuthorDate: Thu Dec 2 14:06:39 2021 +0800
[Improvement][API] fix 6772 new api bug (#7098)
* add processDefinition releaseWorkflowAndSchedule
* add ProcessTaskRelationServiceImpl.moveTaskProcessRelation
* add dependency and subprocess judgement
* fix api bug
* fix codestyle
---
.../controller/ProcessTaskRelationController.java | 2 +-
.../service/impl/ProcessDefinitionServiceImpl.java | 120 ++++------
.../impl/ProcessTaskRelationServiceImpl.java | 249 ++++++++++++---------
.../service/impl/TaskDefinitionServiceImpl.java | 37 ++-
.../api/service/ProcessDefinitionServiceTest.java | 7 +
.../service/ProcessTaskRelationServiceTest.java | 91 +++++---
.../api/service/TaskDefinitionServiceImplTest.java | 27 ++-
.../dao/mapper/ProcessTaskRelationLogMapper.java | 9 +
.../dao/mapper/ProcessTaskRelationMapper.java | 16 +-
.../dao/mapper/ProcessTaskRelationLogMapper.xml | 16 +-
.../dao/mapper/ProcessTaskRelationMapper.xml | 28 +--
.../service/process/ProcessService.java | 3 +
12 files changed, 346 insertions(+), 259 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java
index cd9f563..de911e1 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java
@@ -142,7 +142,7 @@ public class ProcessTaskRelationController extends
BaseController {
}
/**
- * delete process task relation
+ * delete process task relation (delete task from workflow)
*
* @param loginUser login user
* @param projectCode project code
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index abe0706..0cc1f3b 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -233,7 +233,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
ProcessDefinition processDefinition = new
ProcessDefinition(projectCode, name, processDefinitionCode, description,
- globalParams, locations, timeout, loginUser.getId(), tenantId);
+ globalParams, locations, timeout, loginUser.getId(), tenantId);
processDefinition.setExecutionType(executionType);
return createDagDefine(loginUser, taskRelationList, processDefinition,
taskDefinitionLogs);
@@ -300,8 +300,8 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
List<ProcessTaskRelation> processTaskRelations =
taskRelationList.stream()
- .map(processTaskRelationLog ->
JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog),
ProcessTaskRelation.class))
- .collect(Collectors.toList());
+ .map(processTaskRelationLog ->
JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog),
ProcessTaskRelation.class))
+ .collect(Collectors.toList());
List<TaskNode> taskNodeList =
processService.transformTask(processTaskRelations, taskDefinitionLogs);
if (taskNodeList.size() != taskRelationList.size()) {
Set<Long> postTaskCodes =
taskRelationList.stream().map(ProcessTaskRelationLog::getPostTaskCode).collect(Collectors.toSet());
@@ -412,7 +412,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
Page<ProcessDefinition> page = new Page<>(pageNo, pageSize);
IPage<ProcessDefinition> processDefinitionIPage =
processDefinitionMapper.queryDefineListPaging(
- page, searchVal, userId, project.getCode(),
isAdmin(loginUser));
+ page, searchVal, userId, project.getCode(), isAdmin(loginUser));
List<ProcessDefinition> records = processDefinitionIPage.getRecords();
for (ProcessDefinition pd : records) {
@@ -591,7 +591,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
int insertResult = processService.saveTaskRelation(loginUser,
processDefinition.getProjectCode(),
- processDefinition.getCode(), insertVersion, taskRelationList,
taskDefinitionLogs);
+ processDefinition.getCode(), insertVersion, taskRelationList,
taskDefinitionLogs);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
@@ -685,11 +685,14 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
int delete =
processDefinitionMapper.deleteById(processDefinition.getId());
- int deleteRelation =
processTaskRelationMapper.deleteByCode(project.getCode(),
processDefinition.getCode());
- if (delete == 0 || deleteRelation == 0) {
+ if (delete == 0) {
putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
throw new
ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
}
+ int deleteRelation =
processTaskRelationMapper.deleteByCode(project.getCode(),
processDefinition.getCode());
+ if (deleteRelation == 0) {
+ logger.warn("The process definition has not relation, it will be
delete successfully");
+ }
putMsg(result, Status.SUCCESS);
return result;
}
@@ -1156,7 +1159,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
processInstanceList.forEach(processInstance ->
processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(),
processInstance.getEndTime())));
List<TaskDefinitionLog> taskDefinitionList =
processService.genTaskDefineList(processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(),
processDefinition.getCode()));
Map<Long, TaskDefinitionLog> taskDefinitionMap =
taskDefinitionList.stream()
- .collect(Collectors.toMap(TaskDefinitionLog::getCode,
taskDefinitionLog -> taskDefinitionLog));
+ .collect(Collectors.toMap(TaskDefinitionLog::getCode,
taskDefinitionLog -> taskDefinitionLog));
if (limit > processInstanceList.size()) {
limit = processInstanceList.size();
@@ -1171,8 +1174,8 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
ProcessInstance processInstance = processInstanceList.get(i);
Date endTime = processInstance.getEndTime() == null ? new Date() :
processInstance.getEndTime();
parentTreeViewDto.getInstances().add(new
Instance(processInstance.getId(), processInstance.getName(),
processInstance.getProcessDefinitionCode(),
- "", processInstance.getState().toString(),
processInstance.getStartTime(), endTime, processInstance.getHost(),
- DateUtils.format2Readable(endTime.getTime() -
processInstance.getStartTime().getTime())));
+ "", processInstance.getState().toString(),
processInstance.getStartTime(), endTime, processInstance.getHost(),
+ DateUtils.format2Readable(endTime.getTime() -
processInstance.getStartTime().getTime())));
}
List<TreeViewDto> parentTreeViewDtoList = new ArrayList<>();
@@ -1363,7 +1366,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
diffCode.forEach(code -> failedProcessList.add(code + "[null]"));
for (ProcessDefinition processDefinition : processDefinitionList) {
List<ProcessTaskRelation> processTaskRelations =
-
processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(),
processDefinition.getCode());
+
processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(),
processDefinition.getCode());
List<ProcessTaskRelationLog> taskRelationList =
processTaskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
processDefinition.setProjectCode(targetProjectCode);
if (isCopy) {
@@ -1532,13 +1535,13 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* create empty process definition
*
- * @param loginUser login user
- * @param projectCode project code
- * @param name process definition name
- * @param description description
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param name process definition name
+ * @param description description
* @param globalParams globalParams
- * @param timeout timeout
- * @param tenantCode tenantCode
+ * @param timeout timeout
+ * @param tenantCode tenantCode
* @param scheduleJson scheduleJson
* @return process definition code
*/
@@ -1584,7 +1587,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
ProcessDefinition processDefinition = new
ProcessDefinition(projectCode, name, processDefinitionCode, description,
- globalParams, "", timeout, loginUser.getId(), tenantId);
+ globalParams, "", timeout, loginUser.getId(), tenantId);
processDefinition.setExecutionType(executionType);
result = createEmptyDagDefine(loginUser, processDefinition);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
@@ -1596,7 +1599,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
// save dag schedule
- Map<String, Object> scheduleResult = createDagSchedule(loginUser,
project, processDefinition, scheduleJson);
+ Map<String, Object> scheduleResult = createDagSchedule(loginUser,
processDefinition, scheduleJson);
if (scheduleResult.get(Constants.STATUS) != Status.SUCCESS) {
Status scheduleResultStatus = (Status)
scheduleResult.get(Constants.STATUS);
putMsg(result, scheduleResultStatus);
@@ -1617,10 +1620,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
- private Map<String, Object> createDagSchedule(User loginUser,
- Project project,
- ProcessDefinition
processDefinition,
- String scheduleJson) {
+ private Map<String, Object> createDagSchedule(User loginUser,
ProcessDefinition processDefinition, String scheduleJson) {
Map<String, Object> result = new HashMap<>();
Schedule scheduleObj = JSONUtils.parseObject(scheduleJson,
Schedule.class);
if (scheduleObj == null) {
@@ -1651,12 +1651,6 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
scheduleObj.setEnvironmentCode(scheduleObj.getEnvironmentCode() ==
null ? -1 : scheduleObj.getEnvironmentCode());
scheduleMapper.insert(scheduleObj);
- /**
- * updateProcessInstance receivers and cc by process definition id
- */
- processDefinition.setWarningGroupId(scheduleObj.getWarningGroupId());
- processDefinitionMapper.updateById(processDefinition);
-
putMsg(result, Status.SUCCESS);
result.put("scheduleId", scheduleObj.getId());
return result;
@@ -1665,15 +1659,15 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* update process definition basic info
*
- * @param loginUser login user
- * @param projectCode project code
- * @param name process definition name
- * @param code process definition code
- * @param description description
- * @param globalParams globalParams
- * @param timeout timeout
- * @param tenantCode tenantCode
- * @param scheduleJson scheduleJson
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param name process definition name
+ * @param code process definition code
+ * @param description description
+ * @param globalParams globalParams
+ * @param timeout timeout
+ * @param tenantCode tenantCode
+ * @param scheduleJson scheduleJson
* @param executionType executionType
* @return update result code
*/
@@ -1728,7 +1722,8 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
ProcessDefinition processDefinitionDeepCopy =
JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition),
ProcessDefinition.class);
processDefinition.set(projectCode, name, description, globalParams,
"", timeout, tenantId);
processDefinition.setExecutionType(executionType);
- result = updateDagDefineBasicInfo(loginUser, processDefinition,
processDefinitionDeepCopy);
+ List<ProcessTaskRelationLog> taskRelationList =
processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(),
processDefinition.getVersion());
+ result = updateDagDefine(loginUser, taskRelationList,
processDefinition, processDefinitionDeepCopy, Lists.newArrayList());
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@@ -1746,26 +1741,6 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
- private Map<String, Object> updateDagDefineBasicInfo(User loginUser,
- ProcessDefinition
processDefinition,
- ProcessDefinition
processDefinitionDeepCopy) {
- Map<String, Object> result = new HashMap<>();
- int insertVersion;
- if (processDefinition.equals(processDefinitionDeepCopy)) {
- insertVersion = processDefinitionDeepCopy.getVersion();
- } else {
- processDefinition.setUpdateTime(new Date());
- insertVersion = processService.saveProcessDefine(loginUser,
processDefinition, true);
- }
- if (insertVersion == 0) {
- putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
- throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
- }
- putMsg(result, Status.SUCCESS);
- result.put(Constants.DATA_LIST, processDefinition);
- return result;
- }
-
private Map<String, Object> updateDagSchedule(User loginUser,
long projectCode,
long processDefinitionCode,
@@ -1791,24 +1766,24 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
param.setTimezoneId(schedule.getTimezoneId());
return schedulerService.updateScheduleByProcessDefinitionCode(
- loginUser,
- projectCode,
- processDefinitionCode,
- JSONUtils.toJsonString(param),
- warningType,
- warningGroupId,
- failureStrategy,
- processInstancePriority,
- workerGroup,
- environmentCode);
+ loginUser,
+ projectCode,
+ processDefinitionCode,
+ JSONUtils.toJsonString(param),
+ warningType,
+ warningGroupId,
+ failureStrategy,
+ processInstancePriority,
+ workerGroup,
+ environmentCode);
}
/**
* release process definition and schedule
*
- * @param loginUser login user
- * @param projectCode project code
- * @param code process definition code
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param code process definition code
* @param releaseState releaseState
* @return update result code
*/
@@ -1846,6 +1821,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
processDefinition.setReleaseState(releaseState);
processDefinitionMapper.updateById(processDefinition);
+ scheduleObj.setReleaseState(ReleaseState.ONLINE);
scheduleMapper.updateById(scheduleObj);
break;
case OFFLINE:
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
index d6c606c..b49a089 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
@@ -24,6 +24,7 @@ import
org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@@ -90,11 +91,11 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
/**
* create process task relation
*
- * @param loginUser login user
- * @param projectCode project code
+ * @param loginUser login user
+ * @param projectCode project code
* @param processDefinitionCode processDefinitionCode
- * @param preTaskCode preTaskCode
- * @param postTaskCode postTaskCode
+ * @param preTaskCode preTaskCode
+ * @param postTaskCode postTaskCode
* @return create result code
*/
@Transactional(rollbackFor = RuntimeException.class)
@@ -162,6 +163,9 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
int insertLog =
processTaskRelationLogMapper.batchInsert(processTaskRelationLogs);
if ((insert & insertLog) > 0) {
putMsg(result, Status.SUCCESS);
+ } else {
+ putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+ throw new
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
}
return result;
}
@@ -173,6 +177,8 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
processTaskRelationLog.setPostTaskCode(taskDefinition.getCode());
processTaskRelationLog.setPostTaskVersion(taskDefinition.getVersion());
+ processTaskRelationLog.setConditionType(ConditionType.NONE);
+ processTaskRelationLog.setConditionParams("{}");
processTaskRelationLog.setCreateTime(now);
processTaskRelationLog.setUpdateTime(now);
processTaskRelationLog.setOperator(userId);
@@ -183,13 +189,14 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
/**
* move task to other processDefinition
*
- * @param loginUser login user info
- * @param projectCode project code
- * @param processDefinitionCode process definition code
+ * @param loginUser login user info
+ * @param projectCode project code
+ * @param processDefinitionCode process definition code
* @param targetProcessDefinitionCode target process definition code
- * @param taskCode the current task code (the post task code)
+ * @param taskCode the current task code (the post task
code)
* @return move result code
*/
+ @Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> moveTaskProcessRelation(User loginUser, long
projectCode, long processDefinitionCode, long targetProcessDefinitionCode, long
taskCode) {
Project project = projectMapper.queryByCode(projectCode);
@@ -263,12 +270,22 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
}
Date now = new Date();
ProcessTaskRelation processTaskRelation = upstreamList.get(0);
+ ProcessTaskRelationLog processTaskRelationLog =
processTaskRelationLogMapper.queryRelationLogByRelation(processTaskRelation);
processTaskRelation.setProcessDefinitionCode(processDefinition.getCode());
processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion());
processTaskRelation.setUpdateTime(now);
+
processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode());
+
processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
+ processTaskRelationLog.setUpdateTime(now);
+ processTaskRelationLog.setOperator(loginUser.getId());
+ processTaskRelationLog.setOperateTime(now);
int update = processTaskRelationMapper.updateById(processTaskRelation);
- if (update == 0) {
+ int updateLog =
processTaskRelationLogMapper.updateById(processTaskRelationLog);
+ if (update == 0 || updateLog == 0) {
putMsg(result, Status.MOVE_PROCESS_TASK_RELATION_ERROR);
+ throw new
ServiceException(Status.MOVE_PROCESS_TASK_RELATION_ERROR);
+ } else {
+ putMsg(result, Status.SUCCESS);
}
return result;
}
@@ -282,6 +299,7 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
* @param taskCode the post task code
* @return delete result code
*/
+ @Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> deleteTaskProcessRelation(User loginUser, long
projectCode, long processDefinitionCode, long taskCode) {
Project project = projectMapper.queryByCode(projectCode);
@@ -294,38 +312,48 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR);
return result;
}
+ ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(processDefinitionCode);
+ if (processDefinition == null) {
+ putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST,
processDefinitionCode);
+ return result;
+ }
+ TaskDefinition taskDefinition =
taskDefinitionMapper.queryByCode(taskCode);
+ if (null == taskDefinition) {
+ putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
+ return result;
+ }
List<ProcessTaskRelation> downstreamList =
processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode,
taskCode, 0L);
if (CollectionUtils.isNotEmpty(downstreamList)) {
Set<Long> postTaskCodes = downstreamList
- .stream()
- .map(ProcessTaskRelation::getPostTaskCode)
- .collect(Collectors.toSet());
+ .stream()
+ .map(ProcessTaskRelation::getPostTaskCode)
+ .collect(Collectors.toSet());
putMsg(result, Status.TASK_HAS_DOWNSTREAM,
org.apache.commons.lang.StringUtils.join(postTaskCodes, ","));
return result;
}
ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog();
processTaskRelationLog.setProjectCode(projectCode);
- processTaskRelationLog.setPreTaskCode(taskCode);
+ processTaskRelationLog.setPostTaskCode(taskCode);
+ processTaskRelationLog.setPostTaskVersion(taskDefinition.getVersion());
processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
+
processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
int deleteRelation =
processTaskRelationMapper.deleteRelation(processTaskRelationLog);
- if (0 == deleteRelation) {
+ int deleteRelationLog =
processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
+ if (0 == deleteRelation || 0 == deleteRelationLog) {
putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR);
- }
-
- TaskDefinition taskDefinition =
taskDefinitionMapper.queryByCode(taskCode);
- if (null == taskDefinition) {
- putMsg(result, Status.DATA_IS_NULL, "taskDefinition");
- return result;
+ throw new
ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
}
if (TaskType.CONDITIONS.getDesc().equals(taskDefinition.getTaskType())
- ||
TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType())
- ||
TaskType.SUB_PROCESS.getDesc().equals(taskDefinition.getTaskType())) {
+ ||
TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType())
+ ||
TaskType.SUB_PROCESS.getDesc().equals(taskDefinition.getTaskType())) {
int deleteTaskDefinition =
taskDefinitionMapper.deleteByCode(taskCode);
if (0 == deleteTaskDefinition) {
- putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR);
+ putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
+ throw new
ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
}
}
+ putMsg(result, Status.SUCCESS);
return result;
}
@@ -338,6 +366,7 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
* @param taskCode the post task code
* @return delete result code
*/
+ @Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> deleteUpstreamRelation(User loginUser, long
projectCode, String preTaskCodes, long taskCode) {
Project project = projectMapper.queryByCode(projectCode);
@@ -347,11 +376,11 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
return result;
}
if (StringUtils.isEmpty(preTaskCodes)) {
- putMsg(result,Status.DATA_IS_NULL,"preTaskCodes");
+ putMsg(result, Status.DATA_IS_NULL, "preTaskCodes");
return result;
}
- Set<Long> preTaskCodesSet =
Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
- Status status = deleteUpstreamRelation(projectCode,
preTaskCodesSet.toArray(new Long[0]), taskCode);
+ Status status = deleteUpstreamRelation(loginUser.getId(), projectCode,
+
Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).distinct().toArray(Long[]::new),
taskCode);
if (status != Status.SUCCESS) {
putMsg(result, status);
}
@@ -367,6 +396,7 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
* @param taskCode the pre task code
* @return delete result code
*/
+ @Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> deleteDownstreamRelation(User loginUser, long
projectCode, String postTaskCodes, long taskCode) {
Project project = projectMapper.queryByCode(projectCode);
@@ -376,26 +406,28 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
return result;
}
if (StringUtils.isEmpty(postTaskCodes)) {
- putMsg(result,Status.DATA_IS_NULL,"postTaskCodes");
+ putMsg(result, Status.DATA_IS_NULL, "postTaskCodes");
return result;
}
+ List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode);
+ Map<Long, ProcessTaskRelationLog> taskRelationLogMap =
+ processTaskRelationList.stream()
+ .map(ProcessTaskRelationLog::new)
+
.collect(Collectors.toMap(ProcessTaskRelationLog::getPostTaskCode,
processTaskRelationLog -> processTaskRelationLog));
Set<Long> postTaskCodesSet =
Lists.newArrayList(postTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
- List<Long> deleteFailedCodeList = new ArrayList<>();
- postTaskCodesSet.stream().forEach(
- postTaskCode -> {
- try {
- Status status = deleteUpstreamRelation(projectCode,
new Long[]{taskCode}, postTaskCode);
- if (Status.SUCCESS != status) {
- deleteFailedCodeList.add(postTaskCode);
- }
- } catch (Exception e) {
- deleteFailedCodeList.add(postTaskCode);
- }
-
- }
- );
- if (!deleteFailedCodeList.isEmpty()) {
- putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR,
String.join(",", deleteFailedCodeList.stream().map(o -> o +
"").collect(Collectors.toList())));
+ int delete = 0;
+ int deleteLog = 0;
+ for (long postTaskCode : postTaskCodesSet) {
+ ProcessTaskRelationLog processTaskRelationLog =
taskRelationLogMap.get(postTaskCode);
+ if (processTaskRelationLog != null) {
+ delete +=
processTaskRelationMapper.deleteRelation(processTaskRelationLog);
+ deleteLog +=
processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
+ }
+ }
+ if ((delete & deleteLog) == 0) {
+ throw new
ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
+ } else {
+ putMsg(result, Status.SUCCESS);
}
return result;
}
@@ -420,15 +452,15 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(processTaskRelationList)) {
Set<TaskDefinition> taskDefinitions = processTaskRelationList
- .stream()
- .map(processTaskRelation -> {
- TaskDefinition taskDefinition = buildTaskDefinition();
-
taskDefinition.setProjectCode(processTaskRelation.getProjectCode());
-
taskDefinition.setCode(processTaskRelation.getPreTaskCode());
-
taskDefinition.setVersion(processTaskRelation.getPreTaskVersion());
- return taskDefinition;
- })
- .collect(Collectors.toSet());
+ .stream()
+ .map(processTaskRelation -> {
+ TaskDefinition taskDefinition = buildTaskDefinition();
+
taskDefinition.setProjectCode(processTaskRelation.getProjectCode());
+
taskDefinition.setCode(processTaskRelation.getPreTaskCode());
+
taskDefinition.setVersion(processTaskRelation.getPreTaskVersion());
+ return taskDefinition;
+ })
+ .collect(Collectors.toSet());
taskDefinitionLogList =
taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitions);
}
result.put(Constants.DATA_LIST, taskDefinitionLogList);
@@ -456,15 +488,15 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(processTaskRelationList)) {
Set<TaskDefinition> taskDefinitions = processTaskRelationList
- .stream()
- .map(processTaskRelation -> {
- TaskDefinition taskDefinition = buildTaskDefinition();
-
taskDefinition.setProjectCode(processTaskRelation.getProjectCode());
-
taskDefinition.setCode(processTaskRelation.getPostTaskCode());
-
taskDefinition.setVersion(processTaskRelation.getPostTaskVersion());
- return taskDefinition;
- })
- .collect(Collectors.toSet());
+ .stream()
+ .map(processTaskRelation -> {
+ TaskDefinition taskDefinition = buildTaskDefinition();
+
taskDefinition.setProjectCode(processTaskRelation.getProjectCode());
+
taskDefinition.setCode(processTaskRelation.getPostTaskCode());
+
taskDefinition.setVersion(processTaskRelation.getPostTaskVersion());
+ return taskDefinition;
+ })
+ .collect(Collectors.toSet());
taskDefinitionLogList =
taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitions);
}
result.put(Constants.DATA_LIST, taskDefinitionLogList);
@@ -478,8 +510,8 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionCode process definition code
- * @param preTaskCode pre task code
- * @param postTaskCode post task code
+ * @param preTaskCode pre task code
+ * @param postTaskCode post task code
* @return delete result code
*/
@Override
@@ -501,7 +533,7 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
}
ProcessTaskRelation processTaskRelation =
processTaskRelationList.get(0);
int upstreamCount = processTaskRelationMapper.countByCode(projectCode,
processTaskRelation.getProcessDefinitionCode(),
- 0L, processTaskRelation.getPostTaskCode());
+ 0L, processTaskRelation.getPostTaskCode());
if (upstreamCount == 0) {
putMsg(result, Status.DATA_IS_NULL, "upstreamCount");
@@ -541,8 +573,8 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
}
TaskDefinition that = (TaskDefinition) o;
return getCode() == that.getCode()
- && getVersion() == that.getVersion()
- && getProjectCode() == that.getProjectCode();
+ && getVersion() == that.getVersion()
+ && getProjectCode() == that.getProjectCode();
}
@Override
@@ -560,52 +592,61 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
* @param taskCode pre task code
* @return status
*/
- private Status deleteUpstreamRelation(long projectCode, Long[]
preTaskCodes, long taskCode) {
+ private Status deleteUpstreamRelation(int userId, long projectCode, Long[]
preTaskCodes, long taskCode) {
List<ProcessTaskRelation> upstreamList =
processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode,
preTaskCodes);
if (CollectionUtils.isEmpty(upstreamList)) {
return Status.SUCCESS;
}
- Map<Long, List<ProcessTaskRelation>>
processTaskRelationListGroupByProcessDefinitionCode = upstreamList.stream()
-
.collect(Collectors.groupingBy(ProcessTaskRelation::getProcessDefinitionCode));
+ List<ProcessTaskRelationLog> upstreamLogList = new ArrayList<>();
+ Date now = new Date();
+ for (ProcessTaskRelation processTaskRelation : upstreamList) {
+ ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog(processTaskRelation);
+ processTaskRelationLog.setOperator(userId);
+ processTaskRelationLog.setOperateTime(now);
+ processTaskRelationLog.setUpdateTime(now);
+ upstreamLogList.add(processTaskRelationLog);
+ }
+ Map<Long, List<ProcessTaskRelationLog>>
processTaskRelationListGroupByProcessDefinitionCode = upstreamLogList.stream()
+
.collect(Collectors.groupingBy(ProcessTaskRelationLog::getProcessDefinitionCode));
// count upstream relation group by process definition code
- List<Map<Long, Integer>> countListGroupByProcessDefinitionCode =
processTaskRelationMapper
- .countUpstreamByCodeGroupByProcessDefinitionCode(projectCode,
processTaskRelationListGroupByProcessDefinitionCode.keySet().toArray(new
Long[0]), taskCode);
-
- List<ProcessTaskRelation> deletes = new ArrayList<>();
- List<ProcessTaskRelation> updates = new ArrayList<>();
-
- countListGroupByProcessDefinitionCode.stream().forEach(
- processDefinitionCodeUpstreamCountMap ->
-
processDefinitionCodeUpstreamCountMap.entrySet().stream().forEach(
- o -> {
- Long processDefinitionCode = o.getKey();
- Integer count = o.getValue();
- List<ProcessTaskRelation>
processTaskRelationList =
processTaskRelationListGroupByProcessDefinitionCode.get(processDefinitionCode);
- if (count <=
processTaskRelationList.size()) {
- ProcessTaskRelation
processTaskRelation = processTaskRelationList.remove(0);
- if
(processTaskRelation.getPreTaskCode() != 0) {
-
processTaskRelation.setPreTaskCode(0);
-
processTaskRelation.setPreTaskVersion(0);
- updates.add(processTaskRelation);
- }
- }
- if (!processTaskRelationList.isEmpty()) {
-
deletes.addAll(processTaskRelationList);
- }
- }
- )
- );
-
- int update = 0;
- if (!updates.isEmpty()) {
- update =
processTaskRelationMapper.batchUpdateProcessTaskRelationPreTask(updates);
+ List<Map<String, Long>> countListGroupByProcessDefinitionCode =
processTaskRelationMapper
+ .countUpstreamByCodeGroupByProcessDefinitionCode(projectCode,
processTaskRelationListGroupByProcessDefinitionCode.keySet().toArray(new
Long[0]), taskCode);
+
+ List<ProcessTaskRelationLog> deletes = new ArrayList<>();
+ List<ProcessTaskRelationLog> updates = new ArrayList<>();
+ for (Map<String, Long> codeCountMap :
countListGroupByProcessDefinitionCode) {
+ long processDefinitionCode =
codeCountMap.get("processDefinitionCode");
+ long countValue = codeCountMap.get("countValue");
+ List<ProcessTaskRelationLog> processTaskRelationLogList =
processTaskRelationListGroupByProcessDefinitionCode.get(processDefinitionCode);
+ if (countValue <= processTaskRelationLogList.size()) {
+ ProcessTaskRelationLog processTaskRelationLog =
processTaskRelationLogList.remove(0);
+ if (processTaskRelationLog.getPreTaskCode() != 0) {
+ processTaskRelationLog.setPreTaskCode(0);
+ processTaskRelationLog.setPreTaskVersion(0);
+ updates.add(processTaskRelationLog);
+ }
+ }
+ if (!processTaskRelationLogList.isEmpty()) {
+ deletes.addAll(processTaskRelationLogList);
+ }
}
+ deletes.addAll(updates);
int delete = 0;
- if (!deletes.isEmpty()) {
- delete =
processTaskRelationMapper.deleteBatchIds(deletes.stream().map(ProcessTaskRelation::getId).collect(Collectors.toList()));
+ int deleteLog = 0;
+ for (ProcessTaskRelationLog processTaskRelationLog : deletes) {
+ delete +=
processTaskRelationMapper.deleteRelation(processTaskRelationLog);
+ deleteLog +=
processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
}
- if (update < 0 || delete < 0) {
- return Status.DELETE_TASK_PROCESS_RELATION_ERROR;
+ if ((delete & deleteLog) == 0) {
+ throw new
ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
+ } else {
+ if (!updates.isEmpty()) {
+ int insert = processTaskRelationMapper.batchInsert(updates);
+ int insertLog =
processTaskRelationLogMapper.batchInsert(updates);
+ if ((insert & insertLog) == 0) {
+ throw new
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+ }
+ }
}
return Status.SUCCESS;
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 5655f33..ea01525 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -32,10 +32,12 @@ import
org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import
org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
@@ -90,6 +92,9 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired
+ private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
+
+ @Autowired
private ProcessService processService;
@Autowired
@@ -255,7 +260,7 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
taskDefinitionToUpdate.setId(taskDefinition.getId());
taskDefinitionToUpdate.setProjectCode(projectCode);
taskDefinitionToUpdate.setUserId(taskDefinition.getUserId());
- taskDefinitionToUpdate.setVersion(version + 1);
+ taskDefinitionToUpdate.setVersion(++version);
taskDefinitionToUpdate.setTaskType(taskDefinitionToUpdate.getTaskType().toUpperCase());
taskDefinitionToUpdate.setResourceIds(processService.getResourceIds(taskDefinitionToUpdate));
taskDefinitionToUpdate.setUpdateTime(now);
@@ -268,6 +273,36 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
}
+ List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByTaskCode(taskCode);
+ if (!processTaskRelationList.isEmpty()) {
+ List<ProcessTaskRelationLog> processTaskRelationLogList = new
ArrayList<>();
+ int delete = 0;
+ int deleteLog = 0;
+ for (ProcessTaskRelation processTaskRelation :
processTaskRelationList) {
+ ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog(processTaskRelation);
+ delete +=
processTaskRelationMapper.deleteRelation(processTaskRelationLog);
+ deleteLog +=
processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
+ if (processTaskRelationLog.getPreTaskCode() == taskCode) {
+ processTaskRelationLog.setPreTaskVersion(version);
+ }
+ if (processTaskRelationLog.getPostTaskCode() == taskCode) {
+ processTaskRelationLog.setPostTaskVersion(version);
+ }
+ processTaskRelationLog.setOperator(loginUser.getId());
+ processTaskRelationLog.setOperateTime(now);
+ processTaskRelationLog.setUpdateTime(now);
+ processTaskRelationLogList.add(processTaskRelationLog);
+ }
+ if ((delete & deleteLog) == 0) {
+ throw new
ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
+ } else {
+ int insertRelation =
processTaskRelationMapper.batchInsert(processTaskRelationLogList);
+ int insertRelationLog =
processTaskRelationLogMapper.batchInsert(processTaskRelationLogList);
+ if ((insertRelation & insertRelationLog) == 0) {
+ throw new
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+ }
+ }
+ }
result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS, update);
return result;
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 22413c5..a7358e4 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -429,6 +429,13 @@ public class ProcessDefinitionServiceTest {
// project check auth success, processs definition online
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(getProcessDefinition());
+ List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
+ ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+ processTaskRelation.setProjectCode(projectCode);
+ processTaskRelation.setProcessDefinitionCode(46L);
+ processTaskRelation.setPostTaskCode(123L);
+ processTaskRelationList.add(processTaskRelation);
+ Mockito.when(processService.findRelationByCode(projectCode,
46L)).thenReturn(processTaskRelationList);
Map<String, Object> onlineRes =
processDefinitionService.releaseProcessDefinition(
loginUser, projectCode, 46, ReleaseState.ONLINE);
Assert.assertEquals(Status.SUCCESS, onlineRes.get(Constants.STATUS));
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
index 0de0c91..37a4bb1 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
@@ -192,13 +192,13 @@ public class ProcessTaskRelationServiceTest {
processTaskRelationUpstream1.setPostTaskCode(taskCode);
processTaskRelationUpstream1.setPreTaskVersion(1);
processTaskRelationUpstream1.setPreTaskCode(123);
- processTaskRelationUpstream0.setProcessDefinitionCode(124);
+ processTaskRelationUpstream1.setProcessDefinitionCode(124);
processTaskRelationUpstream1.setProjectCode(projectCode);
ProcessTaskRelation processTaskRelationUpstream2 = new
ProcessTaskRelation();
processTaskRelationUpstream2.setPostTaskCode(taskCode);
processTaskRelationUpstream2.setPreTaskVersion(2);
- processTaskRelationUpstream1.setPreTaskCode(123);
- processTaskRelationUpstream0.setProcessDefinitionCode(125);
+ processTaskRelationUpstream2.setPreTaskCode(123);
+ processTaskRelationUpstream2.setProcessDefinitionCode(125);
processTaskRelationUpstream2.setProjectCode(projectCode);
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
processTaskRelationList.add(processTaskRelationUpstream0);
@@ -238,6 +238,7 @@ public class ProcessTaskRelationServiceTest {
processDefinition.setTenantId(1);
processDefinition.setDescription("");
processDefinition.setCode(1L);
+ processDefinition.setVersion(1);
return processDefinition;
}
@@ -312,8 +313,17 @@ public class ProcessTaskRelationServiceTest {
processTaskRelation.setPostTaskCode(taskCode);
processTaskRelation.setPostTaskVersion(1);
processTaskRelationList.add(processTaskRelation);
+ ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog();
+ processTaskRelationLog.setProjectCode(projectCode);
+ processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
+ processTaskRelationLog.setPreTaskCode(0L);
+ processTaskRelationLog.setPreTaskVersion(0);
+ processTaskRelationLog.setPostTaskCode(taskCode);
+ processTaskRelationLog.setPostTaskVersion(1);
Mockito.when(processTaskRelationMapper.queryByCode(projectCode,
processDefinitionCode, 0L, taskCode)).thenReturn(processTaskRelationList);
+
Mockito.when(processTaskRelationLogMapper.queryRelationLogByRelation(processTaskRelation)).thenReturn(processTaskRelationLog);
Mockito.when(processTaskRelationMapper.updateById(processTaskRelation)).thenReturn(1);
+
Mockito.when(processTaskRelationLogMapper.updateById(processTaskRelationLog)).thenReturn(1);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@@ -435,27 +445,17 @@ public class ProcessTaskRelationServiceTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project,
projectCode)).thenReturn(result);
- List<ProcessTaskRelation> processTaskRelationList =
getProcessTaskUpstreamRelationList(projectCode, taskCode);
-
Mockito.when(processTaskRelationMapper.queryUpstreamByCodes(projectCode,
taskCode, new Long[]{123L})).thenReturn(processTaskRelationList);
- List<Map<Long, Integer>> countListGroupByProcessDefinitionCode = new
ArrayList<>();
- countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>()
{
- {
- put(123L, 2);
- }
- });
- countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>()
{
- {
- put(124L, 1);
- }
- });
- countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>()
{
- {
- put(125L, 3);
- }
- });
-
Mockito.when(processTaskRelationMapper.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode,
new Long[]{123L, 124L, 125L},
2)).thenReturn(countListGroupByProcessDefinitionCode);
-
Mockito.when(processTaskRelationMapper.batchUpdateProcessTaskRelationPreTask(new
ArrayList())).thenReturn(3);
- Mockito.when(processTaskRelationMapper.deleteBatchIds(new
ArrayList())).thenReturn(3);
+ List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
+ ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+ processTaskRelation.setProjectCode(projectCode);
+ processTaskRelation.setProcessDefinitionCode(1L);
+ processTaskRelation.setPreTaskCode(taskCode);
+ processTaskRelation.setPostTaskCode(123L);
+ processTaskRelationList.add(processTaskRelation);
+
Mockito.when(processTaskRelationMapper.queryDownstreamByCode(projectCode,
taskCode)).thenReturn(processTaskRelationList);
+ ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog(processTaskRelation);
+
Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
+
Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
Map<String, Object> result1 =
processTaskRelationService.deleteDownstreamRelation(loginUser, projectCode,
"123", taskCode);
Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS));
}
@@ -473,27 +473,35 @@ public class ProcessTaskRelationServiceTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project,
projectCode)).thenReturn(result);
- List<ProcessTaskRelation> processTaskRelationList =
getProcessTaskUpstreamRelationList(projectCode, taskCode);
-
Mockito.when(processTaskRelationMapper.queryUpstreamByCodes(projectCode,
taskCode, new Long[]{123L})).thenReturn(processTaskRelationList);
- List<Map<Long, Integer>> countListGroupByProcessDefinitionCode = new
ArrayList<>();
- countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>()
{
+
Mockito.when(processTaskRelationMapper.queryUpstreamByCodes(projectCode,
taskCode, new Long[]{123L})).thenReturn(Lists.newArrayList());
+ List<Map<String, Long>> countListGroupByProcessDefinitionCode = new
ArrayList<>();
+ countListGroupByProcessDefinitionCode.add(new HashMap<String, Long>() {
{
- put(123L, 2);
+ put("processDefinitionCode", 123L);
+ put("countValue", 2L);
}
});
- countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>()
{
+ countListGroupByProcessDefinitionCode.add(new HashMap<String, Long>() {
{
- put(124L, 1);
+ put("processDefinitionCode", 124L);
+ put("countValue", 1L);
}
});
- countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>()
{
+ countListGroupByProcessDefinitionCode.add(new HashMap<String, Long>() {
{
- put(125L, 3);
+ put("processDefinitionCode", 125L);
+ put("countValue", 3L);
}
});
+ ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog();
+ processTaskRelationLog.setProjectCode(projectCode);
+ processTaskRelationLog.setPreTaskCode(0L);
+ processTaskRelationLog.setPreTaskVersion(0);
+ processTaskRelationLog.setPostTaskCode(taskCode);
+ processTaskRelationLog.setPostTaskVersion(2);
Mockito.when(processTaskRelationMapper.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode,
new Long[]{123L, 124L, 125L},
2)).thenReturn(countListGroupByProcessDefinitionCode);
-
Mockito.when(processTaskRelationMapper.batchUpdateProcessTaskRelationPreTask(new
ArrayList())).thenReturn(3);
- Mockito.when(processTaskRelationMapper.deleteBatchIds(new
ArrayList())).thenReturn(3);
+
Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
+
Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
Map<String, Object> result1 =
processTaskRelationService.deleteUpstreamRelation(loginUser, projectCode,
"123", taskCode);
Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS));
}
@@ -501,8 +509,8 @@ public class ProcessTaskRelationServiceTest {
@Test
public void testDeleteTaskProcessRelation() {
long projectCode = 1L;
- long taskCode = 2L;
- long processDefinitionCode = 3L;
+ long taskCode = 1L;
+ long processDefinitionCode = 1L;
long preTaskCode = 4L;
long postTaskCode = 5L;
Project project = getProject(projectCode);
@@ -520,10 +528,19 @@ public class ProcessTaskRelationServiceTest {
processTaskRelationLog.setPreTaskCode(taskCode);
processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
+
Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(getProcessDefinition());
+
Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(getTaskDefinition());
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setTaskType(TaskType.CONDITIONS.getDesc());
Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(taskDefinition);
Mockito.when(taskDefinitionMapper.deleteByCode(taskCode)).thenReturn(1);
+ processTaskRelationLog = new ProcessTaskRelationLog();
+ processTaskRelationLog.setProjectCode(projectCode);
+ processTaskRelationLog.setPostTaskCode(taskCode);
+ processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
+ processTaskRelationLog.setProcessDefinitionVersion(1);
+
Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
+
Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
result =
processTaskRelationService.deleteTaskProcessRelation(loginUser, projectCode,
processDefinitionCode, taskCode);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
index 6defcc5..94f2ad5 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
@@ -22,6 +22,7 @@ import
org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.TaskDefinitionServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -42,7 +43,6 @@ import java.util.List;
import java.util.Map;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
@@ -50,7 +50,6 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
-@Ignore
@RunWith(MockitoJUnitRunner.class)
public class TaskDefinitionServiceImplTest {
@@ -74,7 +73,6 @@ public class TaskDefinitionServiceImplTest {
@Mock
private ProcessTaskRelationMapper processTaskRelationMapper;
- ;
@Test
public void createTaskDefinition() {
@@ -166,6 +164,7 @@ public class TaskDefinitionServiceImplTest {
@Test
public void deleteTaskDefinitionByCode() {
long projectCode = 1L;
+ long taskCode = 1L;
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
@@ -177,13 +176,14 @@ public class TaskDefinitionServiceImplTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project,
projectCode)).thenReturn(result);
-
Mockito.when(processTaskRelationMapper.queryByTaskCode(Mockito.anyLong()))
+
Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(getTaskDefinition());
+
Mockito.when(processTaskRelationMapper.queryDownstreamByTaskCode(taskCode))
.thenReturn(new ArrayList<>());
- Mockito.when(taskDefinitionMapper.deleteByCode(Mockito.anyLong()))
+ Mockito.when(taskDefinitionMapper.deleteByCode(taskCode))
.thenReturn(1);
Map<String, Object> relation = taskDefinitionService
- .deleteTaskDefinitionByCode(loginUser, projectCode,
Mockito.anyLong());
+ .deleteTaskDefinitionByCode(loginUser, projectCode, taskCode);
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
}
@@ -241,6 +241,15 @@ public class TaskDefinitionServiceImplTest {
return project;
}
+ private TaskDefinition getTaskDefinition() {
+ TaskDefinition taskDefinition = new TaskDefinition();
+ taskDefinition.setProjectCode(1L);
+ taskDefinition.setCode(1L);
+ taskDefinition.setVersion(1);
+ taskDefinition.setTaskType(TaskType.SHELL.getDesc());
+ return taskDefinition;
+ }
+
@Test
public void checkJson() {
String taskDefinitionJson =
"[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
@@ -305,12 +314,8 @@ public class TaskDefinitionServiceImplTest {
Map<String, Object> onlineTaskResult =
taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode,
ReleaseState.ONLINE);
Assert.assertEquals(Status.SUCCESS,
onlineTaskResult.get(Constants.STATUS));
- // process definition online, resource does not exist
- Map<String, Object> onlineResResult =
taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode,
ReleaseState.ONLINE);
- Assert.assertEquals(Status.SUCCESS,
onlineResResult.get(Constants.STATUS));
-
// release error code
Map<String, Object> failResult =
taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode,
ReleaseState.getEnum(2));
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR,
failResult.get(Constants.STATUS));
}
-}
+}
\ No newline at end of file
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
index b086377..e4ffa49 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao.mapper;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.ibatis.annotations.Param;
@@ -65,4 +66,12 @@ public interface ProcessTaskRelationLogMapper extends
BaseMapper<ProcessTaskRela
* @return int
*/
int deleteRelation(@Param("processTaskRelationLog") ProcessTaskRelationLog
processTaskRelationLog);
+
+ /**
+ * query process task relation log
+ *
+ * @param processTaskRelation processTaskRelation
+ * @return process task relation log
+ */
+ ProcessTaskRelationLog
queryRelationLogByRelation(@Param("processTaskRelation") ProcessTaskRelation
processTaskRelation);
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
index c0aa1c7..5b74f46 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
@@ -105,16 +105,6 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
/**
* query task relation by codes
*
- * @param projectCode projectCode
- * @param taskCode taskCode
- * @param postTaskCodes postTaskCodes list
- * @return ProcessTaskRelation
- */
- List<ProcessTaskRelation> queryDownstreamByCodes(@Param("projectCode")
long projectCode, @Param("taskCode") long taskCode,@Param("postTaskCodes")
Long[] postTaskCodes);
-
- /**
- * query task relation by codes
- *
* @param projectCode projectCode
* @param taskCode taskCode
* @param preTaskCodes preTaskCode list
@@ -130,8 +120,9 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
* @param processDefinitionCodes processDefinitionCodes
* @return upstream count list group by process definition code
*/
- List<Map<Long, Integer>>
countUpstreamByCodeGroupByProcessDefinitionCode(@Param("projectCode") long
projectCode,
-
@Param("processDefinitionCodes") Long[] processDefinitionCodes,
@Param("taskCode") long taskCode);
+ List<Map<String, Long>>
countUpstreamByCodeGroupByProcessDefinitionCode(@Param("projectCode") long
projectCode,
+
@Param("processDefinitionCodes") Long[] processDefinitionCodes,
+
@Param("taskCode") long taskCode);
/**
* batch update process task relation pre task
@@ -176,4 +167,5 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
@Param("processDefinitionCode") long processDefinitionCode,
@Param("preTaskCode") long preTaskCode,
@Param("postTaskCode") long postTaskCode);
+
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
index c056dbf..c3c0579 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
@@ -51,9 +51,23 @@
WHERE project_code = #{processTaskRelationLog.projectCode}
and process_definition_code =
#{processTaskRelationLog.processDefinitionCode}
and process_definition_version =
#{processTaskRelationLog.processDefinitionVersion}
+ <if test="processTaskRelationLog.preTaskCode !=0 and
processTaskRelationLog.preTaskVersion != 0">
and pre_task_code = #{processTaskRelationLog.preTaskCode}
and pre_task_version = #{processTaskRelationLog.preTaskVersion}
+ </if>
and post_task_code = #{processTaskRelationLog.postTaskCode}
- and post_task_version = #{processTaskRelationLog.post_task_version}
+ and post_task_version = #{processTaskRelationLog.postTaskVersion}
</delete>
+ <select id="queryRelationLogByRelation"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog">
+ select
+ <include refid="baseSql"/>
+ from t_ds_process_task_relation_log
+ WHERE project_code = #{processTaskRelation.projectCode}
+ and process_definition_code =
#{processTaskRelation.processDefinitionCode}
+ and process_definition_version =
#{processTaskRelation.processDefinitionVersion}
+ and pre_task_code = #{processTaskRelation.preTaskCode}
+ and pre_task_version = #{processTaskRelation.preTaskVersion}
+ and post_task_code = #{processTaskRelation.postTaskCode}
+ and post_task_version = #{processTaskRelation.postTaskVersion}
+ </select>
</mapper>
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
index 66633d3..4598ec0 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
@@ -92,20 +92,6 @@
and post_task_code = #{taskCode}
</select>
- <select id="queryDownstreamByCodes"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
- select
- <include refid="baseSql"/>
- from t_ds_process_task_relation
- WHERE project_code = #{projectCode}
- and pre_task_code = #{taskCode}
- <if test="postTaskCodes != null and postTaskCodes.length != 0">
- and post_task_code in
- <foreach collection="postTaskCodes" index="index" item="i"
open="(" separator="," close=")">
- #{i}
- </foreach>
- </if>
- </select>
-
<select id="queryUpstreamByCodes"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select
<include refid="baseSql"/>
@@ -120,9 +106,9 @@
</if>
</select>
- <select id="countUpstreamByCodeGroupByProcessDefinitionCode"
resultType="java.lang.Integer">
- select process_definition_code,
- count(0)
+ <select id="countUpstreamByCodeGroupByProcessDefinitionCode"
resultType="java.util.HashMap">
+ select process_definition_code as processDefinitionCode,
+ count(0) as countValue
from t_ds_process_task_relation
WHERE project_code = #{projectCode}
and post_task_code = #{taskCode}
@@ -172,10 +158,12 @@
WHERE project_code = #{processTaskRelationLog.projectCode}
and process_definition_code =
#{processTaskRelationLog.processDefinitionCode}
and process_definition_version =
#{processTaskRelationLog.processDefinitionVersion}
- and pre_task_code = #{processTaskRelationLog.preTaskCode}
- and pre_task_version = #{processTaskRelationLog.preTaskVersion}
+ <if test="processTaskRelationLog.preTaskCode !=0 and
processTaskRelationLog.preTaskVersion != 0">
+ and pre_task_code = #{processTaskRelationLog.preTaskCode}
+ and pre_task_version = #{processTaskRelationLog.preTaskVersion}
+ </if>
and post_task_code = #{processTaskRelationLog.postTaskCode}
- and post_task_version = #{processTaskRelationLog.post_task_version}
+ and post_task_version = #{processTaskRelationLog.postTaskVersion}
</delete>
<select id="countByCode" resultType="int">
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index ee3cca4..994262a 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -2310,6 +2310,9 @@ public class ProcessService {
*/
public int saveTaskRelation(User operator, long projectCode, long
processDefinitionCode, int processDefinitionVersion,
List<ProcessTaskRelationLog> taskRelationList,
List<TaskDefinitionLog> taskDefinitionLogs) {
+ if (taskRelationList.isEmpty()) {
+ return Constants.EXIT_CODE_SUCCESS;
+ }
Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
taskDefinitionLogMap = taskDefinitionLogs.stream()