This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 4a4e72b pick 8270/8308 (#8355)
4a4e72b is described below
commit 4a4e72b0d271dc2a5b6e488257ce1a28628f52f7
Author: JinYong Li <[email protected]>
AuthorDate: Mon Feb 14 12:03:12 2022 +0800
pick 8270/8308 (#8355)
---
.../api/controller/TaskDefinitionController.java | 4 +-
.../impl/ProcessTaskRelationServiceImpl.java | 117 ++++++------
.../service/impl/TaskDefinitionServiceImpl.java | 202 +++++++++------------
3 files changed, 136 insertions(+), 187 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java
index d00a052..9e59c39 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java
@@ -60,6 +60,8 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import springfox.documentation.annotations.ApiIgnore;
+import org.apache.commons.lang3.StringUtils;
+
/**
* task definition controller
*/
@@ -121,7 +123,7 @@ public class TaskDefinitionController extends
BaseController {
@RequestParam(value =
"processDefinitionCode", required = true) long processDefinitionCode,
@RequestParam(value =
"taskDefinitionJsonObj", required = true) String taskDefinitionJsonObj,
@RequestParam(value =
"upstreamCodes", required = false) String upstreamCodes) {
- Map<String, Object> result =
taskDefinitionService.createTaskBindsWorkFlow(loginUser, projectCode,
processDefinitionCode, taskDefinitionJsonObj, upstreamCodes);
+ Map<String, Object> result =
taskDefinitionService.createTaskBindsWorkFlow(loginUser, projectCode,
processDefinitionCode, taskDefinitionJsonObj,
StringUtils.defaultString(upstreamCodes));
return returnDataList(result);
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
index a864c61..5761584 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
@@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
@@ -73,9 +72,6 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired
- private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
-
- @Autowired
private TaskDefinitionLogMapper taskDefinitionLogMapper;
@Autowired
@@ -115,75 +111,59 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
putMsg(result, Status.PROJECT_PROCESS_NOT_MATCH);
return result;
}
- List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L,
postTaskCode);
+ updateProcessDefiniteVersion(loginUser, result, processDefinition);
+ List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinitionCode);
+ List<ProcessTaskRelation> processTaskRelations =
Lists.newArrayList(processTaskRelationList);
if (!processTaskRelations.isEmpty()) {
- Map<Long, ProcessTaskRelation> preTaskCodeMap =
processTaskRelations.stream()
+ Map<Long, ProcessTaskRelation> preTaskCodeMap =
processTaskRelations.stream().filter(r -> r.getPostTaskCode() == postTaskCode)
.collect(Collectors.toMap(ProcessTaskRelation::getPreTaskCode,
processTaskRelation -> processTaskRelation));
- if (preTaskCodeMap.containsKey(preTaskCode) ||
(!preTaskCodeMap.containsKey(0L) && preTaskCode == 0L)) {
- putMsg(result, Status.PROCESS_TASK_RELATION_EXIST,
processDefinitionCode);
- return result;
- }
- if (preTaskCodeMap.containsKey(0L) && preTaskCode != 0L) {
- ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog(preTaskCodeMap.get(0L));
- // delete no upstream
- int delete =
processTaskRelationMapper.deleteRelation(processTaskRelationLog);
- int deleteLog =
processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
- if ((delete & deleteLog) == 0) {
- putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
- throw new
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+ if (!preTaskCodeMap.isEmpty()) {
+ if (preTaskCodeMap.containsKey(preTaskCode) ||
(!preTaskCodeMap.containsKey(0L) && preTaskCode == 0L)) {
+ putMsg(result, Status.PROCESS_TASK_RELATION_EXIST,
processDefinitionCode);
+ return result;
+ }
+ if (preTaskCodeMap.containsKey(0L) && preTaskCode != 0L) {
+ // delete no upstream
+ processTaskRelations.remove(preTaskCodeMap.get(0L));
}
}
}
- updateProcessDefiniteVersion(loginUser, result, processDefinition);
- Date now = new Date();
- List<ProcessTaskRelationLog> processTaskRelationLogs = new
ArrayList<>();
+ TaskDefinition postTaskDefinition =
taskDefinitionMapper.queryByCode(postTaskCode);
+ ProcessTaskRelation processTaskRelation =
setRelation(processDefinition, postTaskDefinition);
if (preTaskCode != 0L) {
- // upstream is or not exist
- List<ProcessTaskRelation> upstreamProcessTaskRelations =
processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L,
preTaskCode);
TaskDefinition preTaskDefinition =
taskDefinitionMapper.queryByCode(preTaskCode);
- if (upstreamProcessTaskRelations.isEmpty()) {
- ProcessTaskRelationLog processTaskRelationLog =
setRelationLog(processDefinition, now, loginUser.getId(), preTaskDefinition);
- processTaskRelationLog.setPreTaskCode(0L);
- processTaskRelationLog.setPreTaskVersion(0);
- processTaskRelationLogs.add(processTaskRelationLog);
+ List<ProcessTaskRelation> upstreamTaskRelationList =
processTaskRelations.stream().filter(r -> r.getPostTaskCode() ==
preTaskCode).collect(Collectors.toList());
+ // upstream is or not exist
+ if (upstreamTaskRelationList.isEmpty()) {
+ ProcessTaskRelation preProcessTaskRelation =
setRelation(processDefinition, preTaskDefinition);
+ preProcessTaskRelation.setPreTaskCode(0L);
+ preProcessTaskRelation.setPreTaskVersion(0);
+ processTaskRelations.add(preProcessTaskRelation);
}
- TaskDefinition postTaskDefinition =
taskDefinitionMapper.queryByCode(postTaskCode);
- ProcessTaskRelationLog processTaskRelationLog =
setRelationLog(processDefinition, now, loginUser.getId(), postTaskDefinition);
- processTaskRelationLog.setPreTaskCode(preTaskDefinition.getCode());
-
processTaskRelationLog.setPreTaskVersion(preTaskDefinition.getVersion());
- processTaskRelationLogs.add(processTaskRelationLog);
- } else {
- TaskDefinition postTaskDefinition =
taskDefinitionMapper.queryByCode(postTaskCode);
- ProcessTaskRelationLog processTaskRelationLog =
setRelationLog(processDefinition, now, loginUser.getId(), postTaskDefinition);
- processTaskRelationLog.setPreTaskCode(0L);
- processTaskRelationLog.setPreTaskVersion(0);
- processTaskRelationLogs.add(processTaskRelationLog);
- }
- int insert =
processTaskRelationMapper.batchInsert(processTaskRelationLogs);
- int insertLog =
processTaskRelationLogMapper.batchInsert(processTaskRelationLogs);
- if ((insert & insertLog) > 0) {
- putMsg(result, Status.SUCCESS);
+ processTaskRelation.setPreTaskCode(preTaskDefinition.getCode());
+
processTaskRelation.setPreTaskVersion(preTaskDefinition.getVersion());
} else {
- putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
- throw new
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+ processTaskRelation.setPreTaskCode(0L);
+ processTaskRelation.setPreTaskVersion(0);
}
+ processTaskRelations.add(processTaskRelation);
+ updateRelation(loginUser, result, processDefinition,
processTaskRelations);
return result;
}
- private ProcessTaskRelationLog setRelationLog(ProcessDefinition
processDefinition, Date now, int userId, TaskDefinition taskDefinition) {
- ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog();
-
processTaskRelationLog.setProjectCode(processDefinition.getProjectCode());
-
processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode());
-
processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
- processTaskRelationLog.setPostTaskCode(taskDefinition.getCode());
- processTaskRelationLog.setPostTaskVersion(taskDefinition.getVersion());
- processTaskRelationLog.setConditionType(ConditionType.NONE);
- processTaskRelationLog.setConditionParams("{}");
- processTaskRelationLog.setCreateTime(now);
- processTaskRelationLog.setUpdateTime(now);
- processTaskRelationLog.setOperator(userId);
- processTaskRelationLog.setOperateTime(now);
- return processTaskRelationLog;
+ private ProcessTaskRelation setRelation(ProcessDefinition
processDefinition, TaskDefinition taskDefinition) {
+ Date now = new Date();
+ ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+ processTaskRelation.setProjectCode(processDefinition.getProjectCode());
+
processTaskRelation.setProcessDefinitionCode(processDefinition.getCode());
+
processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion());
+ processTaskRelation.setPostTaskCode(taskDefinition.getCode());
+ processTaskRelation.setPostTaskVersion(taskDefinition.getVersion());
+ processTaskRelation.setConditionType(ConditionType.NONE);
+ processTaskRelation.setConditionParams("{}");
+ processTaskRelation.setCreateTime(now);
+ processTaskRelation.setUpdateTime(now);
+ return processTaskRelation;
}
private void updateProcessDefiniteVersion(User loginUser, Map<String,
Object> result, ProcessDefinition processDefinition) {
@@ -227,7 +207,8 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
return result;
}
- List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinitionCode);
+ List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinitionCode);
+ List<ProcessTaskRelation> processTaskRelationList =
Lists.newArrayList(processTaskRelations);
if (CollectionUtils.isEmpty(processTaskRelationList)) {
putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList");
return result;
@@ -245,6 +226,7 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
putMsg(result, Status.TASK_HAS_DOWNSTREAM,
org.apache.commons.lang.StringUtils.join(downstreamList, ","));
return result;
}
+ updateProcessDefiniteVersion(loginUser, result, processDefinition);
updateRelation(loginUser, result, processDefinition,
processTaskRelationList);
if (TaskType.CONDITIONS.getDesc().equals(taskDefinition.getTaskType())
||
TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType())
@@ -261,7 +243,6 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
private void updateRelation(User loginUser, Map<String, Object> result,
ProcessDefinition processDefinition,
List<ProcessTaskRelation>
processTaskRelationList) {
- updateProcessDefiniteVersion(loginUser, result, processDefinition);
List<ProcessTaskRelationLog> relationLogs =
processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
int insertResult = processService.saveTaskRelation(loginUser,
processDefinition.getProjectCode(), processDefinition.getCode(),
processDefinition.getVersion(), relationLogs,
Lists.newArrayList(), Boolean.TRUE);
@@ -312,7 +293,8 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST,
upstreamList.get(0).getProcessDefinitionCode());
return result;
}
- List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinition.getCode());
+ List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinition.getCode());
+ List<ProcessTaskRelation> processTaskRelationList =
Lists.newArrayList(processTaskRelations);
List<ProcessTaskRelation> processTaskRelationWaitRemove =
Lists.newArrayList();
for (ProcessTaskRelation processTaskRelation :
processTaskRelationList) {
if (preTaskCodeList.size() > 1) {
@@ -331,6 +313,7 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
}
}
processTaskRelationList.removeAll(processTaskRelationWaitRemove);
+ updateProcessDefiniteVersion(loginUser, result, processDefinition);
updateRelation(loginUser, result, processDefinition,
processTaskRelationList);
return result;
}
@@ -372,8 +355,10 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST,
downstreamList.get(0).getProcessDefinitionCode());
return result;
}
- List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinition.getCode());
+ List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinition.getCode());
+ List<ProcessTaskRelation> processTaskRelationList =
Lists.newArrayList(processTaskRelations);
processTaskRelationList.removeIf(processTaskRelation ->
postTaskCodeList.contains(processTaskRelation.getPostTaskCode()) &&
processTaskRelation.getPreTaskCode() == taskCode);
+ updateProcessDefiniteVersion(loginUser, result, processDefinition);
updateRelation(loginUser, result, processDefinition,
processTaskRelationList);
return result;
}
@@ -474,7 +459,8 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST,
processDefinitionCode);
return result;
}
- List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinitionCode);
+ List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinitionCode);
+ List<ProcessTaskRelation> processTaskRelationList =
Lists.newArrayList(processTaskRelations);
if (CollectionUtils.isEmpty(processTaskRelationList)) {
putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList");
return result;
@@ -511,6 +497,7 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
processTaskRelation.setPreTaskCode(0L);
processTaskRelationList.add(processTaskRelation);
}
+ updateProcessDefiniteVersion(loginUser, result, processDefinition);
updateRelation(loginUser, result, processDefinition,
processTaskRelationList);
return result;
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 8b07435..5470969 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -42,7 +42,6 @@ import
org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
@@ -50,6 +49,7 @@ import
org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.service.permission.PermissionCheck;
import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
@@ -97,14 +97,11 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired
- private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
+ private ProcessDefinitionMapper processDefinitionMapper;
@Autowired
private ProcessService processService;
- @Autowired
- private ProcessDefinitionMapper processDefinitionMapper;
-
/**
* create task definition
*
@@ -314,17 +311,13 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
if (delete > 0) {
List<ProcessTaskRelation> taskRelationList =
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
if (!taskRelationList.isEmpty()) {
- int deleteRelation = 0;
- for (ProcessTaskRelation processTaskRelation :
taskRelationList) {
- deleteRelation +=
processTaskRelationMapper.deleteById(processTaskRelation.getId());
- }
- if (deleteRelation == 0) {
- throw new
ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
- }
long processDefinitionCode =
taskRelationList.get(0).getProcessDefinitionCode();
- updateProcessDefiniteVersion(loginUser, processDefinitionCode);
+ List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinitionCode);
+ List<ProcessTaskRelation> relationList =
processTaskRelations.stream().filter(r -> r.getPostTaskCode() !=
taskCode).collect(Collectors.toList());
+ updateDag(loginUser, result, processDefinitionCode,
relationList, Lists.newArrayList());
+ } else {
+ putMsg(result, Status.SUCCESS);
}
- putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
throw new
ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
@@ -332,7 +325,8 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
return result;
}
- private int updateProcessDefiniteVersion(User loginUser, long
processDefinitionCode) {
+ private void updateDag(User loginUser, Map<String, Object> result, long
processDefinitionCode, List<ProcessTaskRelation> processTaskRelationList,
+ List<TaskDefinitionLog> taskDefinitionLogs) {
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST);
@@ -341,7 +335,16 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
if (insertVersion <= 0) {
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
- return insertVersion;
+ List<ProcessTaskRelationLog> relationLogs =
processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+ int insertResult = processService.saveTaskRelation(loginUser,
processDefinition.getProjectCode(), processDefinition.getCode(),
+ insertVersion, relationLogs, taskDefinitionLogs, Boolean.TRUE);
+ if (insertResult == Constants.EXIT_CODE_SUCCESS) {
+ putMsg(result, Status.SUCCESS);
+ result.put(Constants.DATA_LIST, processDefinition);
+ } else {
+ putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+ throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+ }
}
/**
@@ -356,50 +359,55 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
@Override
public Map<String, Object> updateTaskDefinition(User loginUser, long
projectCode, long taskCode, String taskDefinitionJsonObj) {
Map<String, Object> result = new HashMap<>();
- int version = updateTask(loginUser, projectCode, taskCode,
taskDefinitionJsonObj, result);
- if (version <= 0) {
+ TaskDefinitionLog taskDefinitionToUpdate = updateTask(loginUser,
projectCode, taskCode, taskDefinitionJsonObj, result);
+ if (taskDefinitionToUpdate == null) {
return result;
}
- handleRelation(loginUser, taskCode, version);
+ List<ProcessTaskRelation> taskRelationList =
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
+ if (!taskRelationList.isEmpty()) {
+ long processDefinitionCode =
taskRelationList.get(0).getProcessDefinitionCode();
+ List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinitionCode);
+ updateDag(loginUser, result, processDefinitionCode,
processTaskRelations, Lists.newArrayList(taskDefinitionToUpdate));
+ }
result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS);
return result;
}
- private int updateTask(User loginUser, long projectCode, long taskCode,
String taskDefinitionJsonObj, Map<String, Object> result) {
+ private TaskDefinitionLog updateTask(User loginUser, long projectCode,
long taskCode, String taskDefinitionJsonObj, Map<String, Object> result) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
result.putAll(projectService.checkProjectAndAuth(loginUser, project,
projectCode));
if (result.get(Constants.STATUS) != Status.SUCCESS) {
- return Constants.EXIT_CODE_FAILURE;
+ return null;
}
TaskDefinition taskDefinition =
taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
- return Constants.EXIT_CODE_FAILURE;
+ return null;
}
if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag()
== Flag.YES) {
putMsg(result, Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION);
- return Constants.EXIT_CODE_FAILURE;
+ return null;
}
TaskDefinitionLog taskDefinitionToUpdate =
JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
if (taskDefinition.equals(taskDefinitionToUpdate)) {
- return taskDefinition.getVersion();
+ return null;
}
if (taskDefinitionToUpdate == null) {
logger.error("taskDefinitionJson is not valid json");
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj);
- return Constants.EXIT_CODE_FAILURE;
+ return null;
}
if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionToUpdate))
{
logger.error("task definition {} parameter invalid",
taskDefinitionToUpdate.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID,
taskDefinitionToUpdate.getName());
- return Constants.EXIT_CODE_FAILURE;
+ return null;
}
Integer version =
taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode);
if (version == null || version == 0) {
putMsg(result, Status.DATA_IS_NOT_VALID, taskCode);
- return Constants.EXIT_CODE_FAILURE;
+ return null;
}
Date now = new Date();
taskDefinitionToUpdate.setCode(taskCode);
@@ -419,42 +427,7 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
}
- return version;
- }
-
- private void handleRelation(User loginUser, long taskCode, Integer
version) {
- List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByTaskCode(taskCode);
- if (!processTaskRelationList.isEmpty()) {
- long processDefinitionCode =
processTaskRelationList.get(0).getProcessDefinitionCode();
- int definiteVersion = updateProcessDefiniteVersion(loginUser,
processDefinitionCode);
- List<ProcessTaskRelationLog> processTaskRelationLogList = new
ArrayList<>();
- int delete = 0;
- Date now = new Date();
- for (ProcessTaskRelation processTaskRelation :
processTaskRelationList) {
- ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog(processTaskRelation);
- delete +=
processTaskRelationMapper.deleteRelation(processTaskRelationLog);
- if (processTaskRelationLog.getPreTaskCode() == taskCode) {
- processTaskRelationLog.setPreTaskVersion(version);
- }
- if (processTaskRelationLog.getPostTaskCode() == taskCode) {
- processTaskRelationLog.setPostTaskVersion(version);
- }
-
processTaskRelationLog.setProcessDefinitionVersion(definiteVersion);
- processTaskRelationLog.setOperator(loginUser.getId());
- processTaskRelationLog.setOperateTime(now);
- processTaskRelationLog.setUpdateTime(now);
- processTaskRelationLogList.add(processTaskRelationLog);
- }
- if (delete == 0) {
- throw new
ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
- } else {
- int insertRelation =
processTaskRelationMapper.batchInsert(processTaskRelationLogList);
- int insertRelationLog =
processTaskRelationLogMapper.batchInsert(processTaskRelationLogList);
- if ((insertRelation & insertRelationLog) == 0) {
- throw new
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
- }
- }
- }
+ return taskDefinitionToUpdate;
}
/**
@@ -470,13 +443,23 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
@Override
public Map<String, Object> updateTaskWithUpstream(User loginUser, long
projectCode, long taskCode, String taskDefinitionJsonObj, String upstreamCodes)
{
Map<String, Object> result = new HashMap<>();
- int version = updateTask(loginUser, projectCode, taskCode,
taskDefinitionJsonObj, result);
- if (version <= 0) {
+ TaskDefinitionLog taskDefinitionToUpdate = updateTask(loginUser,
projectCode, taskCode, taskDefinitionJsonObj, result);
+ if (result.get(Constants.STATUS) != Status.SUCCESS &&
taskDefinitionToUpdate == null) {
return result;
}
+ List<ProcessTaskRelation> upstreamTaskRelations =
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
+ Set<Long> upstreamCodeSet =
upstreamTaskRelations.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toSet());
+ Set<Long> upstreamTaskCodes =
Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
+ if (CollectionUtils.isEqualCollection(upstreamCodeSet,
upstreamTaskCodes) && taskDefinitionToUpdate == null) {
+ putMsg(result, Status.SUCCESS);
+ return result;
+ } else {
+ if (taskDefinitionToUpdate == null) {
+ taskDefinitionToUpdate =
JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
+ }
+ }
Map<Long, TaskDefinition> queryUpStreamTaskCodeMap;
- if (StringUtils.isNotBlank(upstreamCodes)) {
- Set<Long> upstreamTaskCodes =
Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
+ if (!upstreamTaskCodes.isEmpty()) {
List<TaskDefinition> upstreamTaskDefinitionList =
taskDefinitionMapper.queryByCodeList(upstreamTaskCodes);
queryUpStreamTaskCodeMap =
upstreamTaskDefinitionList.stream().collect(Collectors.toMap(TaskDefinition::getCode,
taskDefinition -> taskDefinition));
// upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet
@@ -488,76 +471,48 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
} else {
queryUpStreamTaskCodeMap = new HashMap<>();
}
- List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByTaskCode(taskCode);
- if (!queryUpStreamTaskCodeMap.isEmpty() &&
processTaskRelationList.isEmpty()) {
- putMsg(result, Status.PROCESS_TASK_RELATION_NOT_EXIST,
StringUtils.join(queryUpStreamTaskCodeMap.keySet(), Constants.COMMA));
- throw new ServiceException(Status.PROCESS_TASK_RELATION_NOT_EXIST);
- }
- if (!processTaskRelationList.isEmpty()) {
- long processDefinitionCode =
processTaskRelationList.get(0).getProcessDefinitionCode();
- int definiteVersion = updateProcessDefiniteVersion(loginUser,
processDefinitionCode);
- List<ProcessTaskRelationLog> relationLogs = new ArrayList<>();
- Date now = new Date();
- int delete = 0;
- int deleteLog = 0;
+ if (!upstreamTaskRelations.isEmpty()) {
+ ProcessTaskRelation taskRelation = upstreamTaskRelations.get(0);
+ List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode,
taskRelation.getProcessDefinitionCode());
+ List<ProcessTaskRelation> processTaskRelationList =
Lists.newArrayList(processTaskRelations);
+ List<ProcessTaskRelation> relationList = Lists.newArrayList();
for (ProcessTaskRelation processTaskRelation :
processTaskRelationList) {
- ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog(processTaskRelation);
- delete +=
processTaskRelationMapper.deleteRelation(processTaskRelationLog);
- deleteLog +=
processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
- processTaskRelationLog.setOperator(loginUser.getId());
- processTaskRelationLog.setOperateTime(now);
- processTaskRelationLog.setUpdateTime(now);
- if (processTaskRelationLog.getPreTaskCode() == taskCode) {
- processTaskRelationLog.setPreTaskVersion(version);
- }
- if (processTaskRelationLog.getPostTaskCode() == taskCode) {
- processTaskRelationLog.setPostTaskVersion(version);
- TaskDefinition definition =
queryUpStreamTaskCodeMap.remove(processTaskRelationLog.getPreTaskCode());
- if (definition == null) {
- processTaskRelationLog.setPreTaskCode(0L);
- processTaskRelationLog.setPreTaskVersion(0);
+ if (processTaskRelation.getPostTaskCode() == taskCode) {
+ if
(queryUpStreamTaskCodeMap.containsKey(processTaskRelation.getPreTaskCode()) &&
processTaskRelation.getPreTaskCode() != 0L) {
+
queryUpStreamTaskCodeMap.remove(processTaskRelation.getPreTaskCode());
+ } else {
+ processTaskRelation.setPreTaskCode(0L);
+ processTaskRelation.setPreTaskVersion(0);
+ relationList.add(processTaskRelation);
}
}
-
processTaskRelationLog.setProcessDefinitionVersion(definiteVersion);
- relationLogs.add(processTaskRelationLog);
- }
- if ((delete & deleteLog) == 0) {
- throw new
ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
}
- if (!queryUpStreamTaskCodeMap.isEmpty()) {
- ProcessTaskRelationLog taskRelationLogDeepCopy =
JSONUtils.parseObject(JSONUtils.toJsonString(relationLogs.get(0)),
ProcessTaskRelationLog.class);
- assert taskRelationLogDeepCopy != null;
- for (TaskDefinition upstreamTask :
queryUpStreamTaskCodeMap.values()) {
-
taskRelationLogDeepCopy.setPreTaskCode(upstreamTask.getCode());
-
taskRelationLogDeepCopy.setPreTaskVersion(upstreamTask.getVersion());
- relationLogs.add(taskRelationLogDeepCopy);
- }
+ processTaskRelationList.removeAll(relationList);
+ for (Map.Entry<Long, TaskDefinition> queryUpStreamTask :
queryUpStreamTaskCodeMap.entrySet()) {
+ taskRelation.setPreTaskCode(queryUpStreamTask.getKey());
+
taskRelation.setPreTaskVersion(queryUpStreamTask.getValue().getVersion());
+ processTaskRelationList.add(taskRelation);
}
- Map<Long, ProcessTaskRelationLog> taskRelationLogMap =
-
relationLogs.stream().collect(Collectors.toMap(ProcessTaskRelation::getPreTaskCode,
processTaskRelationLog -> processTaskRelationLog));
- if (taskRelationLogMap.containsKey(0L) &&
taskRelationLogMap.size() >= 3) {
- taskRelationLogMap.remove(0L);
- }
- int insertRelation =
processTaskRelationMapper.batchInsert(relationLogs);
- int insertRelationLog =
processTaskRelationLogMapper.batchInsert(relationLogs);
- if ((insertRelation & insertRelationLog) == 0) {
- putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
- throw new
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+ if (queryUpStreamTaskCodeMap.isEmpty() &&
!processTaskRelationList.isEmpty()) {
+ processTaskRelationList.add(processTaskRelationList.get(0));
}
+ updateDag(loginUser, result,
taskRelation.getProcessDefinitionCode(), processTaskRelations,
Lists.newArrayList(taskDefinitionToUpdate));
}
result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS);
return result;
}
+
/**
- * Switch task definition
+ * switch task definition
*
* @param loginUser login user
* @param projectCode project code
* @param taskCode task code
* @param version the version user want to switch
*/
+ @Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> switchVersion(User loginUser, long projectCode,
long taskCode, int version) {
Project project = projectMapper.queryByCode(projectCode);
@@ -581,9 +536,14 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
taskDefinitionUpdate.setId(taskDefinition.getId());
int switchVersion =
taskDefinitionMapper.updateById(taskDefinitionUpdate);
if (switchVersion > 0) {
- handleRelation(loginUser, taskCode, version);
- result.put(Constants.DATA_LIST, taskCode);
- putMsg(result, Status.SUCCESS);
+ List<ProcessTaskRelation> taskRelationList =
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
+ if (!taskRelationList.isEmpty()) {
+ long processDefinitionCode =
taskRelationList.get(0).getProcessDefinitionCode();
+ List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinitionCode);
+ updateDag(loginUser, result, processDefinitionCode,
processTaskRelations, Lists.newArrayList(taskDefinitionUpdate));
+ } else {
+ putMsg(result, Status.SUCCESS);
+ }
} else {
putMsg(result, Status.SWITCH_TASK_DEFINITION_VERSION_ERROR);
}