caishunfeng commented on code in PR #13094:
URL: 
https://github.com/apache/dolphinscheduler/pull/13094#discussion_r1046823898


##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java:
##########
@@ -473,10 +497,98 @@ public List<ProcessTaskRelation> 
updateUpstreamTaskDefinition(User loginUser,
         }
 
         // batch sync to process task relation log
-        this.batchPersist2ProcessTaskRelationLog(loginUser, 
processTaskRelations);
+        int saveTaskRelationResult = saveTaskRelation(loginUser, 
processDefinition, insertVersion);
+        if (saveTaskRelationResult != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+            throw new 
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        }
+        logger.info("Save process task relations complete, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+        processTaskRelations.get(0).setProcessDefinitionVersion(insertVersion);
         return processTaskRelations;
     }
 
+    public int saveTaskRelation(User loginUser, ProcessDefinition 
processDefinition,
+                                int processDefinitionVersion) {
+        long projectCode = processDefinition.getProjectCode();
+        long processDefinitionCode = processDefinition.getCode();
+        List<ProcessTaskRelation> taskRelations =
+                processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
+        List<ProcessTaskRelationLog> taskRelationList =
+                
taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+
+        List<Long> taskCodeList =
+                
taskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toList());
+        List<TaskDefinition> taskDefinitions = 
taskDefinitionMapper.queryByCodeList(taskCodeList);
+        List<TaskDefinitionLog> taskDefinitionLogs =
+                
taskDefinitions.stream().map(TaskDefinitionLog::new).collect(Collectors.toList());
+
+        if (taskRelationList.isEmpty()) {
+            return Constants.EXIT_CODE_SUCCESS;
+        }
+        Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
+        if 
(org.apache.commons.collections.CollectionUtils.isNotEmpty(taskDefinitionLogs)) 
{
+            taskDefinitionLogMap = taskDefinitionLogs
+                    .stream()
+                    .collect(Collectors.toMap(TaskDefinition::getCode, 
taskDefinitionLog -> taskDefinitionLog));
+        }
+        Date now = new Date();
+        for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) 
{
+            processTaskRelationLog.setProjectCode(projectCode);
+            
processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
+            
processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion);
+            if (taskDefinitionLogMap != null) {
+                TaskDefinitionLog preTaskDefinitionLog =
+                        
taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode());
+                if (preTaskDefinitionLog != null) {
+                    
processTaskRelationLog.setPreTaskVersion(preTaskDefinitionLog.getVersion());
+                }
+                TaskDefinitionLog postTaskDefinitionLog =
+                        
taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode());
+                if (postTaskDefinitionLog != null) {
+                    
processTaskRelationLog.setPostTaskVersion(postTaskDefinitionLog.getVersion());
+                }
+            }
+            processTaskRelationLog.setCreateTime(now);
+            processTaskRelationLog.setUpdateTime(now);
+            processTaskRelationLog.setOperator(loginUser.getId());
+            processTaskRelationLog.setOperateTime(now);
+        }
+        if (CollectionUtils.isNotEmpty(taskRelations)) {
+            Set<Integer> processTaskRelationSet =
+                    
taskRelations.stream().map(ProcessTaskRelation::hashCode).collect(toSet());
+            Set<Integer> taskRelationSet =
+                    
taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());
+            boolean result = 
org.apache.commons.collections.CollectionUtils.isEqualCollection(processTaskRelationSet,

Review Comment:
   ```suggestion
               boolean isSame = 
CollectionUtils.isEqualCollection(processTaskRelationSet,
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -2775,12 +2776,109 @@ public ProcessDefinition 
updateSingleProcessDefinition(User loginUser,
             }
             processDefinitionUpdate.setTenantId(tenant.getId());
         }
-        int update = 
processDefinitionMapper.updateById(processDefinitionUpdate);
-        if (update <= 0) {
+        int insertVersion = this.saveProcessDefine(loginUser, 
processDefinitionUpdate);
+        if (insertVersion == 0) {
+            logger.error("Update process definition error, projectCode:{}, 
processDefinitionName:{}.",
+                    processDefinitionUpdate.getCode(),
+                    processDefinitionUpdate.getName());
             throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
-        this.syncObj2Log(loginUser, processDefinition);
-        return processDefinition;
+
+        int insertRelationVersion = this.saveTaskRelation(loginUser, 
processDefinitionUpdate, insertVersion);
+        if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+            throw new 
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        }
+        logger.info("Save process task relations complete, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+        processDefinitionUpdate.setVersion(insertVersion);
+        return processDefinitionUpdate;
+    }
+    public int saveProcessDefine(User loginUser, ProcessDefinition 
processDefinition) {
+        ProcessDefinitionLog processDefinitionLog = new 
ProcessDefinitionLog(processDefinition);
+        Integer version = 
processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
+        int insertVersion = version == null || version == 0 ? 
Constants.VERSION_FIRST : version + 1;
+        processDefinitionLog.setVersion(insertVersion);
+        processDefinition.setVersion(insertVersion);
+
+        processDefinitionLog.setOperator(loginUser.getId());
+        processDefinition.setUserId(loginUser.getId());
+        processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
+        processDefinition.setUpdateTime(processDefinition.getUpdateTime());
+        processDefinitionLog.setId(null);
+        int insertLog = 
processDefinitionLogMapper.insert(processDefinitionLog);
+        processDefinitionLog.setId(processDefinition.getId());
+
+        int result = processDefinitionMapper.updateById(processDefinition);
+        return (insertLog & result) > 0 ? insertVersion : 0;
+    }
+
+    public int saveTaskRelation(User loginUser, ProcessDefinition 
processDefinition,
+                                int processDefinitionVersion) {
+        long projectCode = processDefinition.getProjectCode();
+        long processDefinitionCode = processDefinition.getCode();
+        List<ProcessTaskRelation> taskRelations =
+                processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
+        List<ProcessTaskRelationLog> taskRelationList =
+                
taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+
+        List<Long> taskCodeList =
+                
taskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toList());
+        List<TaskDefinition> taskDefinitions = 
taskDefinitionMapper.queryByCodeList(taskCodeList);
+        List<TaskDefinitionLog> taskDefinitionLogs =
+                
taskDefinitions.stream().map(TaskDefinitionLog::new).collect(Collectors.toList());
+
+        if (taskRelationList.isEmpty()) {
+            return Constants.EXIT_CODE_SUCCESS;
+        }
+        Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
+        if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
+            taskDefinitionLogMap = taskDefinitionLogs
+                    .stream()
+                    .collect(Collectors.toMap(TaskDefinition::getCode, 
taskDefinitionLog -> taskDefinitionLog));
+        }
+        Date now = new Date();
+        for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) 
{
+            processTaskRelationLog.setProjectCode(projectCode);
+            
processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
+            
processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion);
+            if (taskDefinitionLogMap != null) {
+                TaskDefinitionLog preTaskDefinitionLog =
+                        
taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode());
+                if (preTaskDefinitionLog != null) {
+                    
processTaskRelationLog.setPreTaskVersion(preTaskDefinitionLog.getVersion());
+                }
+                TaskDefinitionLog postTaskDefinitionLog =
+                        
taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode());
+                if (postTaskDefinitionLog != null) {
+                    
processTaskRelationLog.setPostTaskVersion(postTaskDefinitionLog.getVersion());
+                }
+            }
+            processTaskRelationLog.setCreateTime(now);
+            processTaskRelationLog.setUpdateTime(now);
+            processTaskRelationLog.setOperator(loginUser.getId());
+            processTaskRelationLog.setOperateTime(now);
+        }
+        if (!taskRelations.isEmpty()) {
+            Set<Integer> processTaskRelationSet =
+                    
taskRelations.stream().map(ProcessTaskRelation::hashCode).collect(toSet());
+            Set<Integer> taskRelationSet =
+                    
taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());
+            boolean result = 
CollectionUtils.isEqualCollection(processTaskRelationSet,

Review Comment:
   ```suggestion
               boolean isSame = 
CollectionUtils.isEqualCollection(processTaskRelationSet,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to