caishunfeng commented on code in PR #13094:
URL: 
https://github.com/apache/dolphinscheduler/pull/13094#discussion_r1039286888


##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java:
##########
@@ -97,7 +97,7 @@ public class TaskCreateRequest {
     private Integer memoryMax;
 
     @Schema(example = "upstream-task-codes1,upstream-task-codes2", description 
= "use , to split multiple upstream task codes")
-    private String upstreamTasksCodes;
+    private String upstreamTasksCodes = "0";

Review Comment:
   What's the default value mean?



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -2775,12 +2776,104 @@ public ProcessDefinition 
updateSingleProcessDefinition(User loginUser,
             }
             processDefinitionUpdate.setTenantId(tenant.getId());
         }
-        int update = 
processDefinitionMapper.updateById(processDefinitionUpdate);
-        if (update <= 0) {
+        int insertVersion = this.saveProcessDefine(loginUser, 
processDefinitionUpdate);
+        if (insertVersion == 0) {
+            logger.error("Update process definition error, projectCode:{}, 
processDefinitionName:{}.",
+                    processDefinitionUpdate.getCode(),
+                    processDefinitionUpdate.getName());
             throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
-        this.syncObj2Log(loginUser, processDefinition);
-        return processDefinition;
+
+        int insertRelationVersion = this.saveTaskRelation(loginUser, 
processDefinitionUpdate, insertVersion);
+        if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+            throw new 
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        } else {
+            logger.info("Save process task relations complete, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+        }

Review Comment:
   ```suggestion
           } 
          logger.info("Save process task relations complete, projectCode:{}, 
processCode:{}, processVersion:{}.",
                       processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -2775,12 +2776,104 @@ public ProcessDefinition 
updateSingleProcessDefinition(User loginUser,
             }
             processDefinitionUpdate.setTenantId(tenant.getId());
         }
-        int update = 
processDefinitionMapper.updateById(processDefinitionUpdate);
-        if (update <= 0) {
+        int insertVersion = this.saveProcessDefine(loginUser, 
processDefinitionUpdate);
+        if (insertVersion == 0) {
+            logger.error("Update process definition error, projectCode:{}, 
processDefinitionName:{}.",
+                    processDefinitionUpdate.getCode(),
+                    processDefinitionUpdate.getName());
             throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
-        this.syncObj2Log(loginUser, processDefinition);
-        return processDefinition;
+
+        int insertRelationVersion = this.saveTaskRelation(loginUser, 
processDefinitionUpdate, insertVersion);
+        if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+            throw new 
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        } else {
+            logger.info("Save process task relations complete, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+        }
+        processDefinitionUpdate.setVersion(insertVersion);
+        return processDefinitionUpdate;
+    }
+    public int saveProcessDefine(User loginUser, ProcessDefinition 
processDefinition) {
+        ProcessDefinitionLog processDefinitionLog = new 
ProcessDefinitionLog(processDefinition);
+        Integer version = 
processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
+        int insertVersion = version == null || version == 0 ? 
Constants.VERSION_FIRST : version + 1;
+        processDefinitionLog.setVersion(insertVersion);
+
+        processDefinitionLog.setOperator(loginUser.getId());
+        processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
+        processDefinitionLog.setId(null);
+        int insertLog = 
processDefinitionLogMapper.insert(processDefinitionLog);
+        processDefinitionLog.setId(processDefinition.getId());
+        int result = processDefinitionMapper.updateById(processDefinitionLog);

Review Comment:
   Please use two different object to do it.
   
   Why need to update the old one with new object?



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java:
##########
@@ -90,6 +90,7 @@ void deleteTaskProcessRelationV2(User loginUser,
      */
     List<ProcessTaskRelation> updateUpstreamTaskDefinition(User loginUser,
                                                            long taskCode,
+                                                           boolean needSyncDag,

Review Comment:
   It's better to create a new method like 
`updateUpstreamTaskDefinitionWithSyncDag`, WDYT?



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -2775,12 +2776,104 @@ public ProcessDefinition 
updateSingleProcessDefinition(User loginUser,
             }
             processDefinitionUpdate.setTenantId(tenant.getId());
         }
-        int update = 
processDefinitionMapper.updateById(processDefinitionUpdate);
-        if (update <= 0) {
+        int insertVersion = this.saveProcessDefine(loginUser, 
processDefinitionUpdate);
+        if (insertVersion == 0) {
+            logger.error("Update process definition error, projectCode:{}, 
processDefinitionName:{}.",
+                    processDefinitionUpdate.getCode(),
+                    processDefinitionUpdate.getName());
             throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
-        this.syncObj2Log(loginUser, processDefinition);
-        return processDefinition;
+
+        int insertRelationVersion = this.saveTaskRelation(loginUser, 
processDefinitionUpdate, insertVersion);
+        if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+            throw new 
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        } else {
+            logger.info("Save process task relations complete, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+        }
+        processDefinitionUpdate.setVersion(insertVersion);
+        return processDefinitionUpdate;
+    }
+    public int saveProcessDefine(User loginUser, ProcessDefinition 
processDefinition) {
+        ProcessDefinitionLog processDefinitionLog = new 
ProcessDefinitionLog(processDefinition);
+        Integer version = 
processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
+        int insertVersion = version == null || version == 0 ? 
Constants.VERSION_FIRST : version + 1;
+        processDefinitionLog.setVersion(insertVersion);
+
+        processDefinitionLog.setOperator(loginUser.getId());
+        processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
+        processDefinitionLog.setId(null);
+        int insertLog = 
processDefinitionLogMapper.insert(processDefinitionLog);
+        processDefinitionLog.setId(processDefinition.getId());
+        int result = processDefinitionMapper.updateById(processDefinitionLog);
+        return (insertLog & result) > 0 ? insertVersion : 0;
+    }
+
+    public int saveTaskRelation(User loginUser, ProcessDefinition 
processDefinition,
+                                int processDefinitionVersion) {
+        long projectCode = processDefinition.getProjectCode();
+        long processDefinitionCode = processDefinition.getCode();
+        List<ProcessTaskRelation> taskRelations =
+                processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
+        List<ProcessTaskRelationLog> taskRelationList =
+                
taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+
+        List<Long> taskCodeList =
+                
taskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toList());
+        List<TaskDefinition> taskDefinitions = 
taskDefinitionMapper.queryByCodeList(taskCodeList);
+        List<TaskDefinitionLog> taskDefinitionLogs =
+                
taskDefinitions.stream().map(TaskDefinitionLog::new).collect(Collectors.toList());
+
+        if (taskRelationList.isEmpty()) {
+            return Constants.EXIT_CODE_SUCCESS;
+        }
+        Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
+        if 
(org.apache.commons.collections.CollectionUtils.isNotEmpty(taskDefinitionLogs)) 
{
+            taskDefinitionLogMap = taskDefinitionLogs
+                    .stream()
+                    .collect(Collectors.toMap(TaskDefinition::getCode, 
taskDefinitionLog -> taskDefinitionLog));
+        }
+        Date now = new Date();
+        for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) 
{
+            processTaskRelationLog.setProjectCode(projectCode);
+            
processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
+            
processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion);
+            if (taskDefinitionLogMap != null) {
+                TaskDefinitionLog preTaskDefinitionLog =
+                        
taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode());
+                if (preTaskDefinitionLog != null) {
+                    
processTaskRelationLog.setPreTaskVersion(preTaskDefinitionLog.getVersion());
+                }
+                TaskDefinitionLog postTaskDefinitionLog =
+                        
taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode());
+                if (postTaskDefinitionLog != null) {
+                    
processTaskRelationLog.setPostTaskVersion(postTaskDefinitionLog.getVersion());
+                }
+            }
+            processTaskRelationLog.setCreateTime(now);
+            processTaskRelationLog.setUpdateTime(now);
+            processTaskRelationLog.setOperator(loginUser.getId());
+            processTaskRelationLog.setOperateTime(now);
+        }
+        if (!taskRelations.isEmpty()) {
+            Set<Integer> processTaskRelationSet =
+                    
taskRelations.stream().map(ProcessTaskRelation::hashCode).collect(toSet());
+            Set<Integer> taskRelationSet =
+                    
taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());
+            boolean result = 
org.apache.commons.collections.CollectionUtils.isEqualCollection(processTaskRelationSet,
+                    taskRelationSet);
+            if (result) {
+                return Constants.EXIT_CODE_SUCCESS;

Review Comment:
   add some log here.



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java:
##########
@@ -473,10 +496,99 @@ public List<ProcessTaskRelation> 
updateUpstreamTaskDefinition(User loginUser,
         }
 
         // batch sync to process task relation log
-        this.batchPersist2ProcessTaskRelationLog(loginUser, 
processTaskRelations);
+        int saveTaskRelation = saveTaskRelation(loginUser, processDefinition, 
insertVersion);

Review Comment:
   ```suggestion
           int saveTaskRelationResult = saveTaskRelation(loginUser, 
processDefinition, insertVersion);
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java:
##########
@@ -473,10 +496,99 @@ public List<ProcessTaskRelation> 
updateUpstreamTaskDefinition(User loginUser,
         }
 
         // batch sync to process task relation log
-        this.batchPersist2ProcessTaskRelationLog(loginUser, 
processTaskRelations);
+        int saveTaskRelation = saveTaskRelation(loginUser, processDefinition, 
insertVersion);
+        if (saveTaskRelation != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+            throw new 
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        } else {
+            logger.info("Save process task relations complete, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+        }
+        processTaskRelations.get(0).setProcessDefinitionVersion(insertVersion);
         return processTaskRelations;
     }
 
+    public int saveTaskRelation(User loginUser, ProcessDefinition 
processDefinition,

Review Comment:
   It seems just copy the function from `processService`, but not delete the 
old one.



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java:
##########
@@ -473,10 +496,99 @@ public List<ProcessTaskRelation> 
updateUpstreamTaskDefinition(User loginUser,
         }
 
         // batch sync to process task relation log
-        this.batchPersist2ProcessTaskRelationLog(loginUser, 
processTaskRelations);
+        int saveTaskRelation = saveTaskRelation(loginUser, processDefinition, 
insertVersion);
+        if (saveTaskRelation != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+            throw new 
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        } else {
+            logger.info("Save process task relations complete, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+        }
+        processTaskRelations.get(0).setProcessDefinitionVersion(insertVersion);
         return processTaskRelations;
     }
 
+    public int saveTaskRelation(User loginUser, ProcessDefinition 
processDefinition,
+                                int processDefinitionVersion) {
+        long projectCode = processDefinition.getProjectCode();
+        long processDefinitionCode = processDefinition.getCode();
+        List<ProcessTaskRelation> taskRelations =
+                processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
+        List<ProcessTaskRelationLog> taskRelationList =
+                
taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+
+        List<Long> taskCodeList =
+                
taskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toList());
+        List<TaskDefinition> taskDefinitions = 
taskDefinitionMapper.queryByCodeList(taskCodeList);
+        List<TaskDefinitionLog> taskDefinitionLogs =
+                
taskDefinitions.stream().map(TaskDefinitionLog::new).collect(Collectors.toList());
+
+        if (taskRelationList.isEmpty()) {
+            return Constants.EXIT_CODE_SUCCESS;
+        }
+        Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
+        if 
(org.apache.commons.collections.CollectionUtils.isNotEmpty(taskDefinitionLogs)) 
{
+            taskDefinitionLogMap = taskDefinitionLogs
+                    .stream()
+                    .collect(Collectors.toMap(TaskDefinition::getCode, 
taskDefinitionLog -> taskDefinitionLog));
+        }
+        Date now = new Date();
+        for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) 
{
+            processTaskRelationLog.setProjectCode(projectCode);
+            
processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
+            
processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion);
+            if (taskDefinitionLogMap != null) {
+                TaskDefinitionLog preTaskDefinitionLog =
+                        
taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode());
+                if (preTaskDefinitionLog != null) {
+                    
processTaskRelationLog.setPreTaskVersion(preTaskDefinitionLog.getVersion());
+                }
+                TaskDefinitionLog postTaskDefinitionLog =
+                        
taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode());
+                if (postTaskDefinitionLog != null) {
+                    
processTaskRelationLog.setPostTaskVersion(postTaskDefinitionLog.getVersion());
+                }
+            }
+            processTaskRelationLog.setCreateTime(now);
+            processTaskRelationLog.setUpdateTime(now);
+            processTaskRelationLog.setOperator(loginUser.getId());
+            processTaskRelationLog.setOperateTime(now);
+        }
+        if (!taskRelations.isEmpty()) {

Review Comment:
   ```suggestion
           if (CollectionUtils.isNotEmpty(taskRelations)) {
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java:
##########
@@ -473,10 +496,99 @@ public List<ProcessTaskRelation> 
updateUpstreamTaskDefinition(User loginUser,
         }
 
         // batch sync to process task relation log
-        this.batchPersist2ProcessTaskRelationLog(loginUser, 
processTaskRelations);
+        int saveTaskRelation = saveTaskRelation(loginUser, processDefinition, 
insertVersion);
+        if (saveTaskRelation != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+            throw new 
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        } else {
+            logger.info("Save process task relations complete, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+        }

Review Comment:
   ```suggestion
           } 
           logger.info("Save process task relations complete, projectCode:{}, 
processCode:{}, processVersion:{}.",
                   processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java:
##########
@@ -473,10 +496,99 @@ public List<ProcessTaskRelation> 
updateUpstreamTaskDefinition(User loginUser,
         }
 
         // batch sync to process task relation log
-        this.batchPersist2ProcessTaskRelationLog(loginUser, 
processTaskRelations);
+        int saveTaskRelation = saveTaskRelation(loginUser, processDefinition, 
insertVersion);
+        if (saveTaskRelation != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+            throw new 
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        } else {
+            logger.info("Save process task relations complete, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+        }
+        processTaskRelations.get(0).setProcessDefinitionVersion(insertVersion);

Review Comment:
   Why set workflow version here? Just for the first one?



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -2775,12 +2776,104 @@ public ProcessDefinition 
updateSingleProcessDefinition(User loginUser,
             }
             processDefinitionUpdate.setTenantId(tenant.getId());
         }
-        int update = 
processDefinitionMapper.updateById(processDefinitionUpdate);
-        if (update <= 0) {
+        int insertVersion = this.saveProcessDefine(loginUser, 
processDefinitionUpdate);
+        if (insertVersion == 0) {
+            logger.error("Update process definition error, projectCode:{}, 
processDefinitionName:{}.",
+                    processDefinitionUpdate.getCode(),
+                    processDefinitionUpdate.getName());
             throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
-        this.syncObj2Log(loginUser, processDefinition);
-        return processDefinition;
+
+        int insertRelationVersion = this.saveTaskRelation(loginUser, 
processDefinitionUpdate, insertVersion);
+        if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+            throw new 
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        } else {
+            logger.info("Save process task relations complete, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+        }
+        processDefinitionUpdate.setVersion(insertVersion);
+        return processDefinitionUpdate;
+    }
+    public int saveProcessDefine(User loginUser, ProcessDefinition 
processDefinition) {
+        ProcessDefinitionLog processDefinitionLog = new 
ProcessDefinitionLog(processDefinition);
+        Integer version = 
processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
+        int insertVersion = version == null || version == 0 ? 
Constants.VERSION_FIRST : version + 1;
+        processDefinitionLog.setVersion(insertVersion);
+
+        processDefinitionLog.setOperator(loginUser.getId());
+        processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
+        processDefinitionLog.setId(null);
+        int insertLog = 
processDefinitionLogMapper.insert(processDefinitionLog);
+        processDefinitionLog.setId(processDefinition.getId());
+        int result = processDefinitionMapper.updateById(processDefinitionLog);
+        return (insertLog & result) > 0 ? insertVersion : 0;
+    }
+
+    public int saveTaskRelation(User loginUser, ProcessDefinition 
processDefinition,
+                                int processDefinitionVersion) {
+        long projectCode = processDefinition.getProjectCode();
+        long processDefinitionCode = processDefinition.getCode();
+        List<ProcessTaskRelation> taskRelations =
+                processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
+        List<ProcessTaskRelationLog> taskRelationList =
+                
taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+
+        List<Long> taskCodeList =
+                
taskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toList());
+        List<TaskDefinition> taskDefinitions = 
taskDefinitionMapper.queryByCodeList(taskCodeList);
+        List<TaskDefinitionLog> taskDefinitionLogs =
+                
taskDefinitions.stream().map(TaskDefinitionLog::new).collect(Collectors.toList());
+
+        if (taskRelationList.isEmpty()) {
+            return Constants.EXIT_CODE_SUCCESS;
+        }
+        Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
+        if 
(org.apache.commons.collections.CollectionUtils.isNotEmpty(taskDefinitionLogs)) 
{
+            taskDefinitionLogMap = taskDefinitionLogs
+                    .stream()
+                    .collect(Collectors.toMap(TaskDefinition::getCode, 
taskDefinitionLog -> taskDefinitionLog));
+        }
+        Date now = new Date();
+        for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) 
{
+            processTaskRelationLog.setProjectCode(projectCode);
+            
processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
+            
processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion);
+            if (taskDefinitionLogMap != null) {
+                TaskDefinitionLog preTaskDefinitionLog =
+                        
taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode());
+                if (preTaskDefinitionLog != null) {
+                    
processTaskRelationLog.setPreTaskVersion(preTaskDefinitionLog.getVersion());
+                }
+                TaskDefinitionLog postTaskDefinitionLog =
+                        
taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode());
+                if (postTaskDefinitionLog != null) {
+                    
processTaskRelationLog.setPostTaskVersion(postTaskDefinitionLog.getVersion());
+                }
+            }
+            processTaskRelationLog.setCreateTime(now);
+            processTaskRelationLog.setUpdateTime(now);
+            processTaskRelationLog.setOperator(loginUser.getId());
+            processTaskRelationLog.setOperateTime(now);
+        }
+        if (!taskRelations.isEmpty()) {
+            Set<Integer> processTaskRelationSet =
+                    
taskRelations.stream().map(ProcessTaskRelation::hashCode).collect(toSet());
+            Set<Integer> taskRelationSet =
+                    
taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());
+            boolean result = 
org.apache.commons.collections.CollectionUtils.isEqualCollection(processTaskRelationSet,

Review Comment:
   ```suggestion
               boolean result = 
CollectionUtils.isEqualCollection(processTaskRelationSet,
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -2775,12 +2776,104 @@ public ProcessDefinition 
updateSingleProcessDefinition(User loginUser,
             }
             processDefinitionUpdate.setTenantId(tenant.getId());
         }
-        int update = 
processDefinitionMapper.updateById(processDefinitionUpdate);
-        if (update <= 0) {
+        int insertVersion = this.saveProcessDefine(loginUser, 
processDefinitionUpdate);
+        if (insertVersion == 0) {
+            logger.error("Update process definition error, projectCode:{}, 
processDefinitionName:{}.",
+                    processDefinitionUpdate.getCode(),
+                    processDefinitionUpdate.getName());
             throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
-        this.syncObj2Log(loginUser, processDefinition);
-        return processDefinition;
+
+        int insertRelationVersion = this.saveTaskRelation(loginUser, 
processDefinitionUpdate, insertVersion);
+        if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+            throw new 
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        } else {
+            logger.info("Save process task relations complete, projectCode:{}, 
processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
+        }
+        processDefinitionUpdate.setVersion(insertVersion);
+        return processDefinitionUpdate;
+    }
+    public int saveProcessDefine(User loginUser, ProcessDefinition 
processDefinition) {
+        ProcessDefinitionLog processDefinitionLog = new 
ProcessDefinitionLog(processDefinition);
+        Integer version = 
processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
+        int insertVersion = version == null || version == 0 ? 
Constants.VERSION_FIRST : version + 1;
+        processDefinitionLog.setVersion(insertVersion);
+
+        processDefinitionLog.setOperator(loginUser.getId());
+        processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
+        processDefinitionLog.setId(null);
+        int insertLog = 
processDefinitionLogMapper.insert(processDefinitionLog);
+        processDefinitionLog.setId(processDefinition.getId());
+        int result = processDefinitionMapper.updateById(processDefinitionLog);
+        return (insertLog & result) > 0 ? insertVersion : 0;
+    }
+
+    public int saveTaskRelation(User loginUser, ProcessDefinition 
processDefinition,
+                                int processDefinitionVersion) {
+        long projectCode = processDefinition.getProjectCode();
+        long processDefinitionCode = processDefinition.getCode();
+        List<ProcessTaskRelation> taskRelations =
+                processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
+        List<ProcessTaskRelationLog> taskRelationList =
+                
taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+
+        List<Long> taskCodeList =
+                
taskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toList());
+        List<TaskDefinition> taskDefinitions = 
taskDefinitionMapper.queryByCodeList(taskCodeList);
+        List<TaskDefinitionLog> taskDefinitionLogs =
+                
taskDefinitions.stream().map(TaskDefinitionLog::new).collect(Collectors.toList());
+
+        if (taskRelationList.isEmpty()) {
+            return Constants.EXIT_CODE_SUCCESS;
+        }
+        Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
+        if 
(org.apache.commons.collections.CollectionUtils.isNotEmpty(taskDefinitionLogs)) 
{

Review Comment:
   ```suggestion
           if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to