This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4a4e72b  pick 8270/8308 (#8355)
4a4e72b is described below

commit 4a4e72b0d271dc2a5b6e488257ce1a28628f52f7
Author: JinYong Li <[email protected]>
AuthorDate: Mon Feb 14 12:03:12 2022 +0800

    pick 8270/8308 (#8355)
---
 .../api/controller/TaskDefinitionController.java   |   4 +-
 .../impl/ProcessTaskRelationServiceImpl.java       | 117 ++++++------
 .../service/impl/TaskDefinitionServiceImpl.java    | 202 +++++++++------------
 3 files changed, 136 insertions(+), 187 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java
index d00a052..9e59c39 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java
@@ -60,6 +60,8 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import springfox.documentation.annotations.ApiIgnore;
 
+import org.apache.commons.lang3.StringUtils;
+
 /**
  * task definition controller
  */
@@ -121,7 +123,7 @@ public class TaskDefinitionController extends 
BaseController {
                                           @RequestParam(value = 
"processDefinitionCode", required = true) long processDefinitionCode,
                                           @RequestParam(value = 
"taskDefinitionJsonObj", required = true) String taskDefinitionJsonObj,
                                           @RequestParam(value = 
"upstreamCodes", required = false) String upstreamCodes) {
-        Map<String, Object> result = 
taskDefinitionService.createTaskBindsWorkFlow(loginUser, projectCode, 
processDefinitionCode, taskDefinitionJsonObj, upstreamCodes);
+        Map<String, Object> result = 
taskDefinitionService.createTaskBindsWorkFlow(loginUser, projectCode, 
processDefinitionCode, taskDefinitionJsonObj, 
StringUtils.defaultString(upstreamCodes));
         return returnDataList(result);
     }
 
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
index a864c61..5761584 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
@@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
@@ -73,9 +72,6 @@ public class ProcessTaskRelationServiceImpl extends 
BaseServiceImpl implements P
     private ProcessTaskRelationMapper processTaskRelationMapper;
 
     @Autowired
-    private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
-
-    @Autowired
     private TaskDefinitionLogMapper taskDefinitionLogMapper;
 
     @Autowired
@@ -115,75 +111,59 @@ public class ProcessTaskRelationServiceImpl extends 
BaseServiceImpl implements P
             putMsg(result, Status.PROJECT_PROCESS_NOT_MATCH);
             return result;
         }
-        List<ProcessTaskRelation> processTaskRelations = 
processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, 
postTaskCode);
+        updateProcessDefiniteVersion(loginUser, result, processDefinition);
+        List<ProcessTaskRelation> processTaskRelationList = 
processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
+        List<ProcessTaskRelation> processTaskRelations = 
Lists.newArrayList(processTaskRelationList);
         if (!processTaskRelations.isEmpty()) {
-            Map<Long, ProcessTaskRelation> preTaskCodeMap = 
processTaskRelations.stream()
+            Map<Long, ProcessTaskRelation> preTaskCodeMap = 
processTaskRelations.stream().filter(r -> r.getPostTaskCode() == postTaskCode)
                 .collect(Collectors.toMap(ProcessTaskRelation::getPreTaskCode, 
processTaskRelation -> processTaskRelation));
-            if (preTaskCodeMap.containsKey(preTaskCode) || 
(!preTaskCodeMap.containsKey(0L) && preTaskCode == 0L)) {
-                putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, 
processDefinitionCode);
-                return result;
-            }
-            if (preTaskCodeMap.containsKey(0L) && preTaskCode != 0L) {
-                ProcessTaskRelationLog processTaskRelationLog = new 
ProcessTaskRelationLog(preTaskCodeMap.get(0L));
-                // delete no upstream
-                int delete = 
processTaskRelationMapper.deleteRelation(processTaskRelationLog);
-                int deleteLog = 
processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
-                if ((delete & deleteLog) == 0) {
-                    putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
-                    throw new 
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+            if (!preTaskCodeMap.isEmpty()) {
+                if (preTaskCodeMap.containsKey(preTaskCode) || 
(!preTaskCodeMap.containsKey(0L) && preTaskCode == 0L)) {
+                    putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, 
processDefinitionCode);
+                    return result;
+                }
+                if (preTaskCodeMap.containsKey(0L) && preTaskCode != 0L) {
+                    // delete no upstream
+                    processTaskRelations.remove(preTaskCodeMap.get(0L));
                 }
             }
         }
-        updateProcessDefiniteVersion(loginUser, result, processDefinition);
-        Date now = new Date();
-        List<ProcessTaskRelationLog> processTaskRelationLogs = new 
ArrayList<>();
+        TaskDefinition postTaskDefinition = 
taskDefinitionMapper.queryByCode(postTaskCode);
+        ProcessTaskRelation processTaskRelation = 
setRelation(processDefinition, postTaskDefinition);
         if (preTaskCode != 0L) {
-            // upstream is or not exist
-            List<ProcessTaskRelation> upstreamProcessTaskRelations = 
processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, 
preTaskCode);
             TaskDefinition preTaskDefinition = 
taskDefinitionMapper.queryByCode(preTaskCode);
-            if (upstreamProcessTaskRelations.isEmpty()) {
-                ProcessTaskRelationLog processTaskRelationLog = 
setRelationLog(processDefinition, now, loginUser.getId(), preTaskDefinition);
-                processTaskRelationLog.setPreTaskCode(0L);
-                processTaskRelationLog.setPreTaskVersion(0);
-                processTaskRelationLogs.add(processTaskRelationLog);
+            List<ProcessTaskRelation> upstreamTaskRelationList = 
processTaskRelations.stream().filter(r -> r.getPostTaskCode() == 
preTaskCode).collect(Collectors.toList());
+            // upstream is or not exist
+            if (upstreamTaskRelationList.isEmpty()) {
+                ProcessTaskRelation preProcessTaskRelation = 
setRelation(processDefinition, preTaskDefinition);
+                preProcessTaskRelation.setPreTaskCode(0L);
+                preProcessTaskRelation.setPreTaskVersion(0);
+                processTaskRelations.add(preProcessTaskRelation);
             }
-            TaskDefinition postTaskDefinition = 
taskDefinitionMapper.queryByCode(postTaskCode);
-            ProcessTaskRelationLog processTaskRelationLog = 
setRelationLog(processDefinition, now, loginUser.getId(), postTaskDefinition);
-            processTaskRelationLog.setPreTaskCode(preTaskDefinition.getCode());
-            
processTaskRelationLog.setPreTaskVersion(preTaskDefinition.getVersion());
-            processTaskRelationLogs.add(processTaskRelationLog);
-        } else {
-            TaskDefinition postTaskDefinition = 
taskDefinitionMapper.queryByCode(postTaskCode);
-            ProcessTaskRelationLog processTaskRelationLog = 
setRelationLog(processDefinition, now, loginUser.getId(), postTaskDefinition);
-            processTaskRelationLog.setPreTaskCode(0L);
-            processTaskRelationLog.setPreTaskVersion(0);
-            processTaskRelationLogs.add(processTaskRelationLog);
-        }
-        int insert = 
processTaskRelationMapper.batchInsert(processTaskRelationLogs);
-        int insertLog = 
processTaskRelationLogMapper.batchInsert(processTaskRelationLogs);
-        if ((insert & insertLog) > 0) {
-            putMsg(result, Status.SUCCESS);
+            processTaskRelation.setPreTaskCode(preTaskDefinition.getCode());
+            
processTaskRelation.setPreTaskVersion(preTaskDefinition.getVersion());
         } else {
-            putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
-            throw new 
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+            processTaskRelation.setPreTaskCode(0L);
+            processTaskRelation.setPreTaskVersion(0);
         }
+        processTaskRelations.add(processTaskRelation);
+        updateRelation(loginUser, result, processDefinition, 
processTaskRelations);
         return result;
     }
 
-    private ProcessTaskRelationLog setRelationLog(ProcessDefinition 
processDefinition, Date now, int userId, TaskDefinition taskDefinition) {
-        ProcessTaskRelationLog processTaskRelationLog = new 
ProcessTaskRelationLog();
-        
processTaskRelationLog.setProjectCode(processDefinition.getProjectCode());
-        
processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode());
-        
processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
-        processTaskRelationLog.setPostTaskCode(taskDefinition.getCode());
-        processTaskRelationLog.setPostTaskVersion(taskDefinition.getVersion());
-        processTaskRelationLog.setConditionType(ConditionType.NONE);
-        processTaskRelationLog.setConditionParams("{}");
-        processTaskRelationLog.setCreateTime(now);
-        processTaskRelationLog.setUpdateTime(now);
-        processTaskRelationLog.setOperator(userId);
-        processTaskRelationLog.setOperateTime(now);
-        return processTaskRelationLog;
+    private ProcessTaskRelation setRelation(ProcessDefinition 
processDefinition, TaskDefinition taskDefinition) {
+        Date now = new Date();
+        ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+        processTaskRelation.setProjectCode(processDefinition.getProjectCode());
+        
processTaskRelation.setProcessDefinitionCode(processDefinition.getCode());
+        
processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion());
+        processTaskRelation.setPostTaskCode(taskDefinition.getCode());
+        processTaskRelation.setPostTaskVersion(taskDefinition.getVersion());
+        processTaskRelation.setConditionType(ConditionType.NONE);
+        processTaskRelation.setConditionParams("{}");
+        processTaskRelation.setCreateTime(now);
+        processTaskRelation.setUpdateTime(now);
+        return processTaskRelation;
     }
 
     private void updateProcessDefiniteVersion(User loginUser, Map<String, 
Object> result, ProcessDefinition processDefinition) {
@@ -227,7 +207,8 @@ public class ProcessTaskRelationServiceImpl extends 
BaseServiceImpl implements P
             putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
             return result;
         }
-        List<ProcessTaskRelation> processTaskRelationList = 
processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
+        List<ProcessTaskRelation> processTaskRelations = 
processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
+        List<ProcessTaskRelation> processTaskRelationList = 
Lists.newArrayList(processTaskRelations);
         if (CollectionUtils.isEmpty(processTaskRelationList)) {
             putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList");
             return result;
@@ -245,6 +226,7 @@ public class ProcessTaskRelationServiceImpl extends 
BaseServiceImpl implements P
             putMsg(result, Status.TASK_HAS_DOWNSTREAM, 
org.apache.commons.lang.StringUtils.join(downstreamList, ","));
             return result;
         }
+        updateProcessDefiniteVersion(loginUser, result, processDefinition);
         updateRelation(loginUser, result, processDefinition, 
processTaskRelationList);
         if (TaskType.CONDITIONS.getDesc().equals(taskDefinition.getTaskType())
             || 
TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType())
@@ -261,7 +243,6 @@ public class ProcessTaskRelationServiceImpl extends 
BaseServiceImpl implements P
 
     private void updateRelation(User loginUser, Map<String, Object> result, 
ProcessDefinition processDefinition,
                                 List<ProcessTaskRelation> 
processTaskRelationList) {
-        updateProcessDefiniteVersion(loginUser, result, processDefinition);
         List<ProcessTaskRelationLog> relationLogs = 
processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
         int insertResult = processService.saveTaskRelation(loginUser, 
processDefinition.getProjectCode(), processDefinition.getCode(),
             processDefinition.getVersion(), relationLogs, 
Lists.newArrayList(), Boolean.TRUE);
@@ -312,7 +293,8 @@ public class ProcessTaskRelationServiceImpl extends 
BaseServiceImpl implements P
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, 
upstreamList.get(0).getProcessDefinitionCode());
             return result;
         }
-        List<ProcessTaskRelation> processTaskRelationList = 
processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinition.getCode());
+        List<ProcessTaskRelation> processTaskRelations = 
processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinition.getCode());
+        List<ProcessTaskRelation> processTaskRelationList = 
Lists.newArrayList(processTaskRelations);
         List<ProcessTaskRelation> processTaskRelationWaitRemove = 
Lists.newArrayList();
         for (ProcessTaskRelation processTaskRelation : 
processTaskRelationList) {
             if (preTaskCodeList.size() > 1) {
@@ -331,6 +313,7 @@ public class ProcessTaskRelationServiceImpl extends 
BaseServiceImpl implements P
             }
         }
         processTaskRelationList.removeAll(processTaskRelationWaitRemove);
+        updateProcessDefiniteVersion(loginUser, result, processDefinition);
         updateRelation(loginUser, result, processDefinition, 
processTaskRelationList);
         return result;
     }
@@ -372,8 +355,10 @@ public class ProcessTaskRelationServiceImpl extends 
BaseServiceImpl implements P
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, 
downstreamList.get(0).getProcessDefinitionCode());
             return result;
         }
-        List<ProcessTaskRelation> processTaskRelationList = 
processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinition.getCode());
+        List<ProcessTaskRelation> processTaskRelations = 
processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinition.getCode());
+        List<ProcessTaskRelation> processTaskRelationList = 
Lists.newArrayList(processTaskRelations);
         processTaskRelationList.removeIf(processTaskRelation -> 
postTaskCodeList.contains(processTaskRelation.getPostTaskCode()) && 
processTaskRelation.getPreTaskCode() == taskCode);
+        updateProcessDefiniteVersion(loginUser, result, processDefinition);
         updateRelation(loginUser, result, processDefinition, 
processTaskRelationList);
         return result;
     }
@@ -474,7 +459,8 @@ public class ProcessTaskRelationServiceImpl extends 
BaseServiceImpl implements P
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, 
processDefinitionCode);
             return result;
         }
-        List<ProcessTaskRelation> processTaskRelationList = 
processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
+        List<ProcessTaskRelation> processTaskRelations = 
processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
+        List<ProcessTaskRelation> processTaskRelationList = 
Lists.newArrayList(processTaskRelations);
         if (CollectionUtils.isEmpty(processTaskRelationList)) {
             putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList");
             return result;
@@ -511,6 +497,7 @@ public class ProcessTaskRelationServiceImpl extends 
BaseServiceImpl implements P
             processTaskRelation.setPreTaskCode(0L);
             processTaskRelationList.add(processTaskRelation);
         }
+        updateProcessDefiniteVersion(loginUser, result, processDefinition);
         updateRelation(loginUser, result, processDefinition, 
processTaskRelationList);
         return result;
     }
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 8b07435..5470969 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -42,7 +42,6 @@ import 
org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
@@ -50,6 +49,7 @@ import 
org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.service.permission.PermissionCheck;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.ArrayList;
@@ -97,14 +97,11 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
     private ProcessTaskRelationMapper processTaskRelationMapper;
 
     @Autowired
-    private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
+    private ProcessDefinitionMapper processDefinitionMapper;
 
     @Autowired
     private ProcessService processService;
 
-    @Autowired
-    private ProcessDefinitionMapper processDefinitionMapper;
-
     /**
      * create task definition
      *
@@ -314,17 +311,13 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
         if (delete > 0) {
             List<ProcessTaskRelation> taskRelationList = 
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
             if (!taskRelationList.isEmpty()) {
-                int deleteRelation = 0;
-                for (ProcessTaskRelation processTaskRelation : 
taskRelationList) {
-                    deleteRelation += 
processTaskRelationMapper.deleteById(processTaskRelation.getId());
-                }
-                if (deleteRelation == 0) {
-                    throw new 
ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
-                }
                 long processDefinitionCode = 
taskRelationList.get(0).getProcessDefinitionCode();
-                updateProcessDefiniteVersion(loginUser, processDefinitionCode);
+                List<ProcessTaskRelation> processTaskRelations = 
processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
+                List<ProcessTaskRelation> relationList = 
processTaskRelations.stream().filter(r -> r.getPostTaskCode() != 
taskCode).collect(Collectors.toList());
+                updateDag(loginUser, result, processDefinitionCode, 
relationList, Lists.newArrayList());
+            } else {
+                putMsg(result, Status.SUCCESS);
             }
-            putMsg(result, Status.SUCCESS);
         } else {
             putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
             throw new 
ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
@@ -332,7 +325,8 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
         return result;
     }
 
-    private int updateProcessDefiniteVersion(User loginUser, long 
processDefinitionCode) {
+    private void updateDag(User loginUser, Map<String, Object> result, long 
processDefinitionCode, List<ProcessTaskRelation> processTaskRelationList,
+                                List<TaskDefinitionLog> taskDefinitionLogs) {
         ProcessDefinition processDefinition = 
processDefinitionMapper.queryByCode(processDefinitionCode);
         if (processDefinition == null) {
             throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST);
@@ -341,7 +335,16 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
         if (insertVersion <= 0) {
             throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
-        return insertVersion;
+        List<ProcessTaskRelationLog> relationLogs = 
processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+        int insertResult = processService.saveTaskRelation(loginUser, 
processDefinition.getProjectCode(), processDefinition.getCode(),
+            insertVersion, relationLogs, taskDefinitionLogs, Boolean.TRUE);
+        if (insertResult == Constants.EXIT_CODE_SUCCESS) {
+            putMsg(result, Status.SUCCESS);
+            result.put(Constants.DATA_LIST, processDefinition);
+        } else {
+            putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+            throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+        }
     }
 
     /**
@@ -356,50 +359,55 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
     @Override
     public Map<String, Object> updateTaskDefinition(User loginUser, long 
projectCode, long taskCode, String taskDefinitionJsonObj) {
         Map<String, Object> result = new HashMap<>();
-        int version = updateTask(loginUser, projectCode, taskCode, 
taskDefinitionJsonObj, result);
-        if (version <= 0) {
+        TaskDefinitionLog taskDefinitionToUpdate = updateTask(loginUser, 
projectCode, taskCode, taskDefinitionJsonObj, result);
+        if (taskDefinitionToUpdate == null) {
             return result;
         }
-        handleRelation(loginUser, taskCode, version);
+        List<ProcessTaskRelation> taskRelationList = 
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
+        if (!taskRelationList.isEmpty()) {
+            long processDefinitionCode = 
taskRelationList.get(0).getProcessDefinitionCode();
+            List<ProcessTaskRelation> processTaskRelations = 
processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
+            updateDag(loginUser, result, processDefinitionCode, 
processTaskRelations, Lists.newArrayList(taskDefinitionToUpdate));
+        }
         result.put(Constants.DATA_LIST, taskCode);
         putMsg(result, Status.SUCCESS);
         return result;
     }
 
-    private int updateTask(User loginUser, long projectCode, long taskCode, 
String taskDefinitionJsonObj, Map<String, Object> result) {
+    private TaskDefinitionLog updateTask(User loginUser, long projectCode, 
long taskCode, String taskDefinitionJsonObj, Map<String, Object> result) {
         Project project = projectMapper.queryByCode(projectCode);
         //check user access for project
         result.putAll(projectService.checkProjectAndAuth(loginUser, project, 
projectCode));
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
-            return Constants.EXIT_CODE_FAILURE;
+            return null;
         }
         TaskDefinition taskDefinition = 
taskDefinitionMapper.queryByCode(taskCode);
         if (taskDefinition == null) {
             putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
-            return Constants.EXIT_CODE_FAILURE;
+            return null;
         }
         if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() 
== Flag.YES) {
             putMsg(result, Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION);
-            return Constants.EXIT_CODE_FAILURE;
+            return null;
         }
         TaskDefinitionLog taskDefinitionToUpdate = 
JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
         if (taskDefinition.equals(taskDefinitionToUpdate)) {
-            return taskDefinition.getVersion();
+            return null;
         }
         if (taskDefinitionToUpdate == null) {
             logger.error("taskDefinitionJson is not valid json");
             putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj);
-            return Constants.EXIT_CODE_FAILURE;
+            return null;
         }
         if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionToUpdate)) 
{
             logger.error("task definition {} parameter invalid", 
taskDefinitionToUpdate.getName());
             putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, 
taskDefinitionToUpdate.getName());
-            return Constants.EXIT_CODE_FAILURE;
+            return null;
         }
         Integer version = 
taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode);
         if (version == null || version == 0) {
             putMsg(result, Status.DATA_IS_NOT_VALID, taskCode);
-            return Constants.EXIT_CODE_FAILURE;
+            return null;
         }
         Date now = new Date();
         taskDefinitionToUpdate.setCode(taskCode);
@@ -419,42 +427,7 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
             putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
             throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
         }
-        return version;
-    }
-
-    private void handleRelation(User loginUser, long taskCode, Integer 
version) {
-        List<ProcessTaskRelation> processTaskRelationList = 
processTaskRelationMapper.queryByTaskCode(taskCode);
-        if (!processTaskRelationList.isEmpty()) {
-            long processDefinitionCode = 
processTaskRelationList.get(0).getProcessDefinitionCode();
-            int definiteVersion = updateProcessDefiniteVersion(loginUser, 
processDefinitionCode);
-            List<ProcessTaskRelationLog> processTaskRelationLogList = new 
ArrayList<>();
-            int delete = 0;
-            Date now = new Date();
-            for (ProcessTaskRelation processTaskRelation : 
processTaskRelationList) {
-                ProcessTaskRelationLog processTaskRelationLog = new 
ProcessTaskRelationLog(processTaskRelation);
-                delete += 
processTaskRelationMapper.deleteRelation(processTaskRelationLog);
-                if (processTaskRelationLog.getPreTaskCode() == taskCode) {
-                    processTaskRelationLog.setPreTaskVersion(version);
-                }
-                if (processTaskRelationLog.getPostTaskCode() == taskCode) {
-                    processTaskRelationLog.setPostTaskVersion(version);
-                }
-                
processTaskRelationLog.setProcessDefinitionVersion(definiteVersion);
-                processTaskRelationLog.setOperator(loginUser.getId());
-                processTaskRelationLog.setOperateTime(now);
-                processTaskRelationLog.setUpdateTime(now);
-                processTaskRelationLogList.add(processTaskRelationLog);
-            }
-            if (delete == 0) {
-                throw new 
ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
-            } else {
-                int insertRelation = 
processTaskRelationMapper.batchInsert(processTaskRelationLogList);
-                int insertRelationLog = 
processTaskRelationLogMapper.batchInsert(processTaskRelationLogList);
-                if ((insertRelation & insertRelationLog) == 0) {
-                    throw new 
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
-                }
-            }
-        }
+        return taskDefinitionToUpdate;
     }
 
     /**
@@ -470,13 +443,23 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
     @Override
     public Map<String, Object> updateTaskWithUpstream(User loginUser, long 
projectCode, long taskCode, String taskDefinitionJsonObj, String upstreamCodes) 
{
         Map<String, Object> result = new HashMap<>();
-        int version = updateTask(loginUser, projectCode, taskCode, 
taskDefinitionJsonObj, result);
-        if (version <= 0) {
+        TaskDefinitionLog taskDefinitionToUpdate = updateTask(loginUser, 
projectCode, taskCode, taskDefinitionJsonObj, result);
+        if (result.get(Constants.STATUS) != Status.SUCCESS && 
taskDefinitionToUpdate == null) {
             return result;
         }
+        List<ProcessTaskRelation> upstreamTaskRelations = 
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
+        Set<Long> upstreamCodeSet = 
upstreamTaskRelations.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toSet());
+        Set<Long> upstreamTaskCodes = 
Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
+        if (CollectionUtils.isEqualCollection(upstreamCodeSet, 
upstreamTaskCodes) && taskDefinitionToUpdate == null) {
+            putMsg(result, Status.SUCCESS);
+            return result;
+        } else {
+            if (taskDefinitionToUpdate == null) {
+                taskDefinitionToUpdate = 
JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
+            }
+        }
         Map<Long, TaskDefinition> queryUpStreamTaskCodeMap;
-        if (StringUtils.isNotBlank(upstreamCodes)) {
-            Set<Long> upstreamTaskCodes = 
Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
+        if (!upstreamTaskCodes.isEmpty()) {
             List<TaskDefinition> upstreamTaskDefinitionList = 
taskDefinitionMapper.queryByCodeList(upstreamTaskCodes);
             queryUpStreamTaskCodeMap = 
upstreamTaskDefinitionList.stream().collect(Collectors.toMap(TaskDefinition::getCode,
 taskDefinition -> taskDefinition));
             // upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet
@@ -488,76 +471,48 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
         } else {
             queryUpStreamTaskCodeMap = new HashMap<>();
         }
-        List<ProcessTaskRelation> processTaskRelationList = 
processTaskRelationMapper.queryByTaskCode(taskCode);
-        if (!queryUpStreamTaskCodeMap.isEmpty() && 
processTaskRelationList.isEmpty()) {
-            putMsg(result, Status.PROCESS_TASK_RELATION_NOT_EXIST, 
StringUtils.join(queryUpStreamTaskCodeMap.keySet(), Constants.COMMA));
-            throw new ServiceException(Status.PROCESS_TASK_RELATION_NOT_EXIST);
-        }
-        if (!processTaskRelationList.isEmpty()) {
-            long processDefinitionCode = 
processTaskRelationList.get(0).getProcessDefinitionCode();
-            int definiteVersion = updateProcessDefiniteVersion(loginUser, 
processDefinitionCode);
-            List<ProcessTaskRelationLog> relationLogs = new ArrayList<>();
-            Date now = new Date();
-            int delete = 0;
-            int deleteLog = 0;
+        if (!upstreamTaskRelations.isEmpty()) {
+            ProcessTaskRelation taskRelation = upstreamTaskRelations.get(0);
+            List<ProcessTaskRelation> processTaskRelations = 
processTaskRelationMapper.queryByProcessCode(projectCode, 
taskRelation.getProcessDefinitionCode());
+            List<ProcessTaskRelation> processTaskRelationList = 
Lists.newArrayList(processTaskRelations);
+            List<ProcessTaskRelation> relationList = Lists.newArrayList();
             for (ProcessTaskRelation processTaskRelation : 
processTaskRelationList) {
-                ProcessTaskRelationLog processTaskRelationLog = new 
ProcessTaskRelationLog(processTaskRelation);
-                delete += 
processTaskRelationMapper.deleteRelation(processTaskRelationLog);
-                deleteLog += 
processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
-                processTaskRelationLog.setOperator(loginUser.getId());
-                processTaskRelationLog.setOperateTime(now);
-                processTaskRelationLog.setUpdateTime(now);
-                if (processTaskRelationLog.getPreTaskCode() == taskCode) {
-                    processTaskRelationLog.setPreTaskVersion(version);
-                }
-                if (processTaskRelationLog.getPostTaskCode() == taskCode) {
-                    processTaskRelationLog.setPostTaskVersion(version);
-                    TaskDefinition definition = 
queryUpStreamTaskCodeMap.remove(processTaskRelationLog.getPreTaskCode());
-                    if (definition == null) {
-                        processTaskRelationLog.setPreTaskCode(0L);
-                        processTaskRelationLog.setPreTaskVersion(0);
+                if (processTaskRelation.getPostTaskCode() == taskCode) {
+                    if 
(queryUpStreamTaskCodeMap.containsKey(processTaskRelation.getPreTaskCode()) && 
processTaskRelation.getPreTaskCode() != 0L) {
+                        
queryUpStreamTaskCodeMap.remove(processTaskRelation.getPreTaskCode());
+                    } else {
+                        processTaskRelation.setPreTaskCode(0L);
+                        processTaskRelation.setPreTaskVersion(0);
+                        relationList.add(processTaskRelation);
                     }
                 }
-                
processTaskRelationLog.setProcessDefinitionVersion(definiteVersion);
-                relationLogs.add(processTaskRelationLog);
-            }
-            if ((delete & deleteLog) == 0) {
-                throw new 
ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
             }
-            if (!queryUpStreamTaskCodeMap.isEmpty()) {
-                ProcessTaskRelationLog taskRelationLogDeepCopy = 
JSONUtils.parseObject(JSONUtils.toJsonString(relationLogs.get(0)), 
ProcessTaskRelationLog.class);
-                assert taskRelationLogDeepCopy != null;
-                for (TaskDefinition upstreamTask : 
queryUpStreamTaskCodeMap.values()) {
-                    
taskRelationLogDeepCopy.setPreTaskCode(upstreamTask.getCode());
-                    
taskRelationLogDeepCopy.setPreTaskVersion(upstreamTask.getVersion());
-                    relationLogs.add(taskRelationLogDeepCopy);
-                }
+            processTaskRelationList.removeAll(relationList);
+            for (Map.Entry<Long, TaskDefinition> queryUpStreamTask : 
queryUpStreamTaskCodeMap.entrySet()) {
+                taskRelation.setPreTaskCode(queryUpStreamTask.getKey());
+                
taskRelation.setPreTaskVersion(queryUpStreamTask.getValue().getVersion());
+                processTaskRelationList.add(taskRelation);
             }
-            Map<Long, ProcessTaskRelationLog> taskRelationLogMap =
-                
relationLogs.stream().collect(Collectors.toMap(ProcessTaskRelation::getPreTaskCode,
 processTaskRelationLog -> processTaskRelationLog));
-            if (taskRelationLogMap.containsKey(0L) && 
taskRelationLogMap.size() >= 3) {
-                taskRelationLogMap.remove(0L);
-            }
-            int insertRelation = 
processTaskRelationMapper.batchInsert(relationLogs);
-            int insertRelationLog = 
processTaskRelationLogMapper.batchInsert(relationLogs);
-            if ((insertRelation & insertRelationLog) == 0) {
-                putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
-                throw new 
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+            if (queryUpStreamTaskCodeMap.isEmpty() && 
!processTaskRelationList.isEmpty()) {
+                processTaskRelationList.add(processTaskRelationList.get(0));
             }
+            updateDag(loginUser, result, 
taskRelation.getProcessDefinitionCode(), processTaskRelations, 
Lists.newArrayList(taskDefinitionToUpdate));
         }
         result.put(Constants.DATA_LIST, taskCode);
         putMsg(result, Status.SUCCESS);
         return result;
     }
 
+
     /**
-     * Switch task definition
+     * switch task definition
      *
      * @param loginUser login user
      * @param projectCode project code
      * @param taskCode task code
      * @param version the version user want to switch
      */
+    @Transactional(rollbackFor = RuntimeException.class)
     @Override
     public Map<String, Object> switchVersion(User loginUser, long projectCode, 
long taskCode, int version) {
         Project project = projectMapper.queryByCode(projectCode);
@@ -581,9 +536,14 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
         taskDefinitionUpdate.setId(taskDefinition.getId());
         int switchVersion = 
taskDefinitionMapper.updateById(taskDefinitionUpdate);
         if (switchVersion > 0) {
-            handleRelation(loginUser, taskCode, version);
-            result.put(Constants.DATA_LIST, taskCode);
-            putMsg(result, Status.SUCCESS);
+            List<ProcessTaskRelation> taskRelationList = 
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
+            if (!taskRelationList.isEmpty()) {
+                long processDefinitionCode = 
taskRelationList.get(0).getProcessDefinitionCode();
+                List<ProcessTaskRelation> processTaskRelations = 
processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
+                updateDag(loginUser, result, processDefinitionCode, 
processTaskRelations, Lists.newArrayList(taskDefinitionUpdate));
+            } else {
+                putMsg(result, Status.SUCCESS);
+            }
         } else {
             putMsg(result, Status.SWITCH_TASK_DEFINITION_VERSION_ERROR);
         }

Reply via email to