This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 26dad5e2a8 Fix update TaskDefinition error (#12060)
26dad5e2a8 is described below

commit 26dad5e2a8ee66e599e7c7da74bc408d4b9f67f6
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Sep 20 22:23:16 2022 +0800

    Fix update TaskDefinition error (#12060)
---
 .../service/impl/TaskDefinitionServiceImpl.java    | 296 +++++++++++++--------
 .../service/process/ProcessServiceImpl.java        |  45 +++-
 2 files changed, 221 insertions(+), 120 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 9e30359d76..947dfc2639 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -17,8 +17,16 @@
 
 package org.apache.dolphinscheduler.api.service.impl;
 
+import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION;
+import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_CREATE;
+import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_DELETE;
+import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_UPDATE;
+import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_VERSION_VIEW;
+import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION;
+
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.permission.PermissionCheck;
 import org.apache.dolphinscheduler.api.service.ProjectService;
 import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
@@ -46,7 +54,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
-import org.apache.dolphinscheduler.api.permission.PermissionCheck;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 
@@ -73,8 +80,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.google.common.collect.Lists;
 
-import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.*;
-
 /**
  * task definition service impl
  */
@@ -122,8 +127,9 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
                                                     long projectCode,
                                                     String taskDefinitionJson) 
{
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, project, projectCode, 
TASK_DEFINITION_CREATE);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, 
projectCode, TASK_DEFINITION_CREATE);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -153,7 +159,8 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
         }
         Map<String, Object> resData = new HashMap<>();
         resData.put("total", taskDefinitionLogs.size());
-        resData.put("code", 
StringUtils.join(taskDefinitionLogs.stream().map(TaskDefinition::getCode).collect(Collectors.toList()),
 ","));
+        resData.put("code", StringUtils
+                
.join(taskDefinitionLogs.stream().map(TaskDefinition::getCode).collect(Collectors.toList()),
 ","));
         putMsg(result, Status.SUCCESS);
         result.put(Constants.DATA_LIST, resData);
         return result;
@@ -177,8 +184,9 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
                                                        String 
taskDefinitionJsonObj,
                                                        String upstreamCodes) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, project, 
projectCode,TASK_DEFINITION_CREATE);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, 
projectCode, TASK_DEFINITION_CREATE);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -211,25 +219,27 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
         }
         long taskCode = taskDefinition.getCode();
         if (taskCode == 0) {
-            try {
-                taskCode = CodeGenerateUtils.getInstance().genCode();
-                taskDefinition.setCode(taskCode);
-            } catch (CodeGenerateException e) {
-                logger.error("Generate task definition code error.", e);
-                putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, 
taskDefinitionJsonObj);
-                return result;
-            }
+            taskDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
         }
-        List<ProcessTaskRelationLog> processTaskRelationLogList = 
Lists.newArrayList();
+        List<ProcessTaskRelationLog> processTaskRelationLogList =
+                processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode)
+                        .stream()
+                        .map(ProcessTaskRelationLog::new)
+                        .collect(Collectors.toList());
+
         if (StringUtils.isNotBlank(upstreamCodes)) {
-            Set<Long> upstreamTaskCodes = 
Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
+            Set<Long> upstreamTaskCodes = 
Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong)
+                    .collect(Collectors.toSet());
             List<TaskDefinition> upstreamTaskDefinitionList = 
taskDefinitionMapper.queryByCodeList(upstreamTaskCodes);
-            Set<Long> queryUpStreamTaskCodes = 
upstreamTaskDefinitionList.stream().map(TaskDefinition::getCode).collect(Collectors.toSet());
+            Set<Long> queryUpStreamTaskCodes =
+                    
upstreamTaskDefinitionList.stream().map(TaskDefinition::getCode).collect(Collectors.toSet());
             // upstreamTaskCodes - queryUpStreamTaskCodes
-            Set<Long> diffCode = upstreamTaskCodes.stream().filter(code -> 
!queryUpStreamTaskCodes.contains(code)).collect(Collectors.toSet());
+            Set<Long> diffCode = upstreamTaskCodes.stream().filter(code -> 
!queryUpStreamTaskCodes.contains(code))
+                    .collect(Collectors.toSet());
             if (!diffCode.isEmpty()) {
                 String taskCodes = StringUtils.join(diffCode, Constants.COMMA);
-                logger.error("Some task definitions with parameter 
upstreamCodes do not exist, taskDefinitionCodes:{}.", taskCodes);
+                logger.error("Some task definitions with parameter 
upstreamCodes do not exist, taskDefinitionCodes:{}.",
+                        taskCodes);
                 putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCodes);
                 return result;
             }
@@ -243,10 +253,6 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
                 processTaskRelationLog.setConditionParams("{}");
                 processTaskRelationLogList.add(processTaskRelationLog);
             }
-            List<ProcessTaskRelation> processTaskRelationList = 
processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
-            if (!processTaskRelationList.isEmpty()) {
-                
processTaskRelationLogList.addAll(processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()));
-            }
         } else {
             ProcessTaskRelationLog processTaskRelationLog = new 
ProcessTaskRelationLog();
             processTaskRelationLog.setPreTaskCode(0);
@@ -257,22 +263,30 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
             processTaskRelationLog.setConditionParams("{}");
             processTaskRelationLogList.add(processTaskRelationLog);
         }
-        int insertResult = processService.saveTaskRelation(loginUser, 
projectCode, processDefinition.getCode(), processDefinition.getVersion(),
-            processTaskRelationLogList, Lists.newArrayList(), Boolean.TRUE);
+        int insertResult = processService.saveTaskRelation(loginUser, 
projectCode, processDefinition.getCode(),
+                processDefinition.getVersion(),
+                processTaskRelationLogList, Lists.newArrayList(), 
Boolean.TRUE);
         if (insertResult != Constants.EXIT_CODE_SUCCESS) {
-            logger.error("Save new version process task relations error, 
processDefinitionCode:{}, processDefinitionVersion:{}.", 
processDefinition.getCode(), processDefinition.getVersion());
+            logger.error(
+                    "Save new version process task relations error, 
processDefinitionCode:{}, processDefinitionVersion:{}.",
+                    processDefinition.getCode(), 
processDefinition.getVersion());
             putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
             throw new 
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
         } else
-            logger.info("Save new version process task relations complete, 
processDefinitionCode:{}, processDefinitionVersion:{}.", 
processDefinition.getCode(), processDefinition.getVersion());
+            logger.info(
+                    "Save new version process task relations complete, 
processDefinitionCode:{}, processDefinitionVersion:{}.",
+                    processDefinition.getCode(), 
processDefinition.getVersion());
 
-        int saveTaskResult = processService.saveTaskDefine(loginUser, 
projectCode, Lists.newArrayList(taskDefinition), Boolean.TRUE);
+        int saveTaskResult =
+                processService.saveTaskDefine(loginUser, projectCode, 
Lists.newArrayList(taskDefinition), Boolean.TRUE);
         if (saveTaskResult == Constants.DEFINITION_FAILURE) {
-            logger.error("Save task definition error, projectCode:{}, 
taskDefinitionCode:{}.", projectCode, taskDefinition.getCode());
+            logger.error("Save task definition error, projectCode:{}, 
taskDefinitionCode:{}.", projectCode,
+                    taskDefinition.getCode());
             putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
             throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
         } else
-            logger.info("Save task definition complete, projectCode:{}, 
taskDefinitionCode:{}.", projectCode, taskDefinition.getCode());
+            logger.info("Save task definition complete, projectCode:{}, 
taskDefinitionCode:{}.", projectCode,
+                    taskDefinition.getCode());
         putMsg(result, Status.SUCCESS);
         result.put(Constants.DATA_LIST, taskDefinition);
         return result;
@@ -287,10 +301,12 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
      * @param taskName task name
      */
     @Override
-    public Map<String, Object> queryTaskDefinitionByName(User loginUser, long 
projectCode, long processCode, String taskName) {
+    public Map<String, Object> queryTaskDefinitionByName(User loginUser, long 
projectCode, long processCode,
+                                                         String taskName) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, project, 
projectCode,TASK_DEFINITION);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, 
projectCode, TASK_DEFINITION);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -318,8 +334,9 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
     @Override
     public Map<String, Object> deleteTaskDefinitionByCode(User loginUser, long 
projectCode, long taskCode) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, project, 
projectCode,TASK_DEFINITION_DELETE);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, 
projectCode, TASK_DEFINITION_DELETE);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -335,33 +352,42 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
             return result;
         }
         if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() 
== Flag.YES) {
-            logger.warn("Task definition can not be deleted due to task state 
online, taskDefinitionCode:{}.", taskCode);
+            logger.warn("Task definition can not be deleted due to task state 
online, taskDefinitionCode:{}.",
+                    taskCode);
             putMsg(result, Status.TASK_DEFINE_STATE_ONLINE, taskCode);
             return result;
         }
-        List<ProcessTaskRelation> processTaskRelationList = 
processTaskRelationMapper.queryDownstreamByTaskCode(taskCode);
+        List<ProcessTaskRelation> processTaskRelationList =
+                processTaskRelationMapper.queryDownstreamByTaskCode(taskCode);
         if (!processTaskRelationList.isEmpty()) {
             Set<Long> postTaskCodes = processTaskRelationList
                     .stream()
                     .map(ProcessTaskRelation::getPostTaskCode)
                     .collect(Collectors.toSet());
             String postTaskCodesStr = StringUtils.join(postTaskCodes, ",");
-            logger.warn("Task definition can not be deleted due to downstream 
tasks, taskDefinitionCode:{}, downstreamTaskCodes:{}",
+            logger.warn(
+                    "Task definition can not be deleted due to downstream 
tasks, taskDefinitionCode:{}, downstreamTaskCodes:{}",
                     taskCode, postTaskCodesStr);
             putMsg(result, Status.TASK_HAS_DOWNSTREAM, postTaskCodesStr);
             return result;
         }
         int delete = taskDefinitionMapper.deleteByCode(taskCode);
         if (delete > 0) {
-            List<ProcessTaskRelation> taskRelationList = 
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
+            List<ProcessTaskRelation> taskRelationList =
+                    processTaskRelationMapper.queryUpstreamByCode(projectCode, 
taskCode);
             if (!taskRelationList.isEmpty()) {
-                logger.info("Task definition has upstream tasks, start handle 
them after delete task, taskDefinitionCode:{}.", taskCode);
+                logger.info(
+                        "Task definition has upstream tasks, start handle them 
after delete task, taskDefinitionCode:{}.",
+                        taskCode);
                 long processDefinitionCode = 
taskRelationList.get(0).getProcessDefinitionCode();
-                List<ProcessTaskRelation> processTaskRelations = 
processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
-                List<ProcessTaskRelation> relationList = 
processTaskRelations.stream().filter(r -> r.getPostTaskCode() != 
taskCode).collect(Collectors.toList());
+                List<ProcessTaskRelation> processTaskRelations =
+                        
processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
+                List<ProcessTaskRelation> relationList = 
processTaskRelations.stream()
+                        .filter(r -> r.getPostTaskCode() != 
taskCode).collect(Collectors.toList());
                 updateDag(loginUser, result, processDefinitionCode, 
relationList, Lists.newArrayList());
             } else {
-                logger.info("Task definition delete complete, projectCode:{}, 
taskDefinitionCode:{}.", projectCode, taskCode);
+                logger.info("Task definition delete complete, projectCode:{}, 
taskDefinitionCode:{}.", projectCode,
+                        taskCode);
                 putMsg(result, Status.SUCCESS);
             }
         } else {
@@ -372,7 +398,8 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
         return result;
     }
 
-    private void updateDag(User loginUser, Map<String, Object> result, long 
processDefinitionCode, List<ProcessTaskRelation> processTaskRelationList,
+    private void updateDag(User loginUser, Map<String, Object> result, long 
processDefinitionCode,
+                           List<ProcessTaskRelation> processTaskRelationList,
                            List<TaskDefinitionLog> taskDefinitionLogs) {
         ProcessDefinition processDefinition = 
processDefinitionMapper.queryByCode(processDefinitionCode);
         if (processDefinition == null) {
@@ -381,19 +408,27 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
         }
         int insertVersion = processService.saveProcessDefine(loginUser, 
processDefinition, Boolean.TRUE, Boolean.TRUE);
         if (insertVersion <= 0) {
-            logger.error("Update process definition error, projectCode:{}, 
processDefinitionCode:{}.", processDefinition.getProjectCode(), 
processDefinitionCode);
+            logger.error("Update process definition error, projectCode:{}, 
processDefinitionCode:{}.",
+                    processDefinition.getProjectCode(), processDefinitionCode);
             throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         } else
-            logger.info("Save new version process definition complete, 
projectCode:{}, processDefinitionCode:{}, newVersion:{}.", 
processDefinition.getProjectCode(), processDefinitionCode, insertVersion);
-        List<ProcessTaskRelationLog> relationLogs = 
processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
-        int insertResult = processService.saveTaskRelation(loginUser, 
processDefinition.getProjectCode(), processDefinition.getCode(),
+            logger.info(
+                    "Save new version process definition complete, 
projectCode:{}, processDefinitionCode:{}, newVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinitionCode, 
insertVersion);
+        List<ProcessTaskRelationLog> relationLogs =
+                
processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+        int insertResult = processService.saveTaskRelation(loginUser, 
processDefinition.getProjectCode(),
+                processDefinition.getCode(),
                 insertVersion, relationLogs, taskDefinitionLogs, Boolean.TRUE);
         if (insertResult == Constants.EXIT_CODE_SUCCESS) {
-            logger.info("Save new version task relations complete, 
projectCode:{}, processDefinitionCode:{}, newVersion:{}.", 
processDefinition.getProjectCode(), processDefinitionCode, insertVersion);
+            logger.info(
+                    "Save new version task relations complete, projectCode:{}, 
processDefinitionCode:{}, newVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinitionCode, 
insertVersion);
             putMsg(result, Status.SUCCESS);
             result.put(Constants.DATA_LIST, processDefinition);
         } else {
-            logger.error("Update task relations error, projectCode:{}, 
processDefinitionCode:{}.", processDefinition.getProjectCode(), 
processDefinitionCode);
+            logger.error("Update task relations error, projectCode:{}, 
processDefinitionCode:{}.",
+                    processDefinition.getProjectCode(), processDefinitionCode);
             putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
             throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
@@ -409,18 +444,25 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
      */
     @Transactional
     @Override
-    public Map<String, Object> updateTaskDefinition(User loginUser, long 
projectCode, long taskCode, String taskDefinitionJsonObj) {
+    public Map<String, Object> updateTaskDefinition(User loginUser, long 
projectCode, long taskCode,
+                                                    String 
taskDefinitionJsonObj) {
         Map<String, Object> result = new HashMap<>();
-        TaskDefinitionLog taskDefinitionToUpdate = updateTask(loginUser, 
projectCode, taskCode, taskDefinitionJsonObj, result);
+        TaskDefinitionLog taskDefinitionToUpdate =
+                updateTask(loginUser, projectCode, taskCode, 
taskDefinitionJsonObj, result);
         if (taskDefinitionToUpdate == null) {
             return result;
         }
-        List<ProcessTaskRelation> taskRelationList = 
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
+        List<ProcessTaskRelation> taskRelationList =
+                processTaskRelationMapper.queryUpstreamByCode(projectCode, 
taskCode);
         if (!taskRelationList.isEmpty()) {
-            logger.info("Task definition has upstream tasks, start handle them 
after update task, taskDefinitionCode:{}.", taskCode);
+            logger.info(
+                    "Task definition has upstream tasks, start handle them 
after update task, taskDefinitionCode:{}.",
+                    taskCode);
             long processDefinitionCode = 
taskRelationList.get(0).getProcessDefinitionCode();
-            List<ProcessTaskRelation> processTaskRelations = 
processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
-            updateDag(loginUser, result, processDefinitionCode, 
processTaskRelations, Lists.newArrayList(taskDefinitionToUpdate));
+            List<ProcessTaskRelation> processTaskRelations =
+                    processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
+            updateDag(loginUser, result, processDefinitionCode, 
processTaskRelations,
+                    Lists.newArrayList(taskDefinitionToUpdate));
         }
         logger.info("Update task definition complete, projectCode:{}, 
taskDefinitionCode:{}.", projectCode, taskCode);
         result.put(Constants.DATA_LIST, taskCode);
@@ -428,10 +470,11 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
         return result;
     }
 
-    private TaskDefinitionLog updateTask(User loginUser, long projectCode, 
long taskCode, String taskDefinitionJsonObj, Map<String, Object> result) {
+    private TaskDefinitionLog updateTask(User loginUser, long projectCode, 
long taskCode, String taskDefinitionJsonObj,
+                                         Map<String, Object> result) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        result.putAll(projectService.checkProjectAndAuth(loginUser, project, 
projectCode,TASK_DEFINITION_UPDATE));
+        // check user access for project
+        result.putAll(projectService.checkProjectAndAuth(loginUser, project, 
projectCode, TASK_DEFINITION_UPDATE));
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return null;
         }
@@ -444,12 +487,14 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
         if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() 
== Flag.YES) {
             // if stream, can update task definition without online check
             if (taskDefinition.getTaskExecuteType() != TaskExecuteType.STREAM) 
{
-                logger.warn("Only {} type task can be updated without online 
check, taskDefinitionCode:{}.", TaskExecuteType.STREAM, taskCode);
+                logger.warn("Only {} type task can be updated without online 
check, taskDefinitionCode:{}.",
+                        TaskExecuteType.STREAM, taskCode);
                 putMsg(result, Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION);
                 return null;
             }
         }
-        TaskDefinitionLog taskDefinitionToUpdate = 
JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
+        TaskDefinitionLog taskDefinitionToUpdate =
+                JSONUtils.parseObject(taskDefinitionJsonObj, 
TaskDefinitionLog.class);
         if (taskDefinition.equals(taskDefinitionToUpdate)) {
             logger.warn("Task definition does not need update because no 
change, taskDefinitionCode:{}.", taskCode);
             return null;
@@ -464,13 +509,15 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
                 .taskParams(taskDefinitionToUpdate.getTaskParams())
                 .dependence(taskDefinitionToUpdate.getDependence())
                 .build())) {
-            logger.warn("Task definition parameters are invalid, 
taskDefinitionName:{}.", taskDefinitionToUpdate.getName());
+            logger.warn("Task definition parameters are invalid, 
taskDefinitionName:{}.",
+                    taskDefinitionToUpdate.getName());
             putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, 
taskDefinitionToUpdate.getName());
             return null;
         }
         Integer version = 
taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode);
         if (version == null || version == 0) {
-            logger.error("Max version task definitionLog can not be found in 
database, taskDefinitionCode:{}.", taskCode);
+            logger.error("Max version task definitionLog can not be found in 
database, taskDefinitionCode:{}.",
+                    taskCode);
             putMsg(result, Status.DATA_IS_NOT_VALID, taskCode);
             return null;
         }
@@ -490,11 +537,13 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
         taskDefinitionToUpdate.setId(null);
         int insert = taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
         if ((update & insert) != 1) {
-            logger.error("Update task definition or definitionLog error, 
projectCode:{}, taskDefinitionCode:{}.", projectCode, taskCode);
+            logger.error("Update task definition or definitionLog error, 
projectCode:{}, taskDefinitionCode:{}.",
+                    projectCode, taskCode);
             putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
             throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
         } else
-            logger.info("Update task definition and definitionLog complete, 
projectCode:{}, taskDefinitionCode:{}, newTaskVersion:{}.",
+            logger.info(
+                    "Update task definition and definitionLog complete, 
projectCode:{}, taskDefinitionCode:{}, newTaskVersion:{}.",
                     projectCode, taskCode, 
taskDefinitionToUpdate.getVersion());
         return taskDefinitionToUpdate;
     }
@@ -510,17 +559,22 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
      * @return update result code
      */
     @Override
-    public Map<String, Object> updateTaskWithUpstream(User loginUser, long 
projectCode, long taskCode, String taskDefinitionJsonObj, String upstreamCodes) 
{
+    public Map<String, Object> updateTaskWithUpstream(User loginUser, long 
projectCode, long taskCode,
+                                                      String 
taskDefinitionJsonObj, String upstreamCodes) {
         Map<String, Object> result = new HashMap<>();
-        TaskDefinitionLog taskDefinitionToUpdate = updateTask(loginUser, 
projectCode, taskCode, taskDefinitionJsonObj, result);
+        TaskDefinitionLog taskDefinitionToUpdate =
+                updateTask(loginUser, projectCode, taskCode, 
taskDefinitionJsonObj, result);
         if (result.get(Constants.STATUS) != Status.SUCCESS && 
taskDefinitionToUpdate == null) {
             return result;
         }
-        List<ProcessTaskRelation> upstreamTaskRelations = 
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
-        Set<Long> upstreamCodeSet = 
upstreamTaskRelations.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toSet());
+        List<ProcessTaskRelation> upstreamTaskRelations =
+                processTaskRelationMapper.queryUpstreamByCode(projectCode, 
taskCode);
+        Set<Long> upstreamCodeSet =
+                
upstreamTaskRelations.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toSet());
         Set<Long> upstreamTaskCodes = Collections.emptySet();
         if (StringUtils.isNotEmpty(upstreamCodes)) {
-            upstreamTaskCodes = 
Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
+            upstreamTaskCodes = 
Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong)
+                    .collect(Collectors.toSet());
         }
         if (CollectionUtils.isEqualCollection(upstreamCodeSet, 
upstreamTaskCodes) && taskDefinitionToUpdate == null) {
             putMsg(result, Status.SUCCESS);
@@ -533,12 +587,14 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
         Map<Long, TaskDefinition> queryUpStreamTaskCodeMap;
         if (!upstreamTaskCodes.isEmpty()) {
             List<TaskDefinition> upstreamTaskDefinitionList = 
taskDefinitionMapper.queryByCodeList(upstreamTaskCodes);
-            queryUpStreamTaskCodeMap = 
upstreamTaskDefinitionList.stream().collect(Collectors.toMap(TaskDefinition::getCode,
 taskDefinition -> taskDefinition));
+            queryUpStreamTaskCodeMap = upstreamTaskDefinitionList.stream()
+                    .collect(Collectors.toMap(TaskDefinition::getCode, 
taskDefinition -> taskDefinition));
             // upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet
             upstreamTaskCodes.removeAll(queryUpStreamTaskCodeMap.keySet());
             if (!upstreamTaskCodes.isEmpty()) {
                 String notExistTaskCodes = StringUtils.join(upstreamTaskCodes, 
Constants.COMMA);
-                logger.error("Some task definitions in parameter 
upstreamTaskCodes do not exist, notExistTaskCodes:{}.", notExistTaskCodes);
+                logger.error("Some task definitions in parameter 
upstreamTaskCodes do not exist, notExistTaskCodes:{}.",
+                        notExistTaskCodes);
                 putMsg(result, Status.TASK_DEFINE_NOT_EXIST, 
notExistTaskCodes);
                 return result;
             }
@@ -547,12 +603,14 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
         }
         if (!upstreamTaskRelations.isEmpty()) {
             ProcessTaskRelation taskRelation = upstreamTaskRelations.get(0);
-            List<ProcessTaskRelation> processTaskRelations = 
processTaskRelationMapper.queryByProcessCode(projectCode, 
taskRelation.getProcessDefinitionCode());
+            List<ProcessTaskRelation> processTaskRelations =
+                    processTaskRelationMapper.queryByProcessCode(projectCode, 
taskRelation.getProcessDefinitionCode());
             List<ProcessTaskRelation> processTaskRelationList = 
Lists.newArrayList(processTaskRelations);
             List<ProcessTaskRelation> relationList = Lists.newArrayList();
             for (ProcessTaskRelation processTaskRelation : 
processTaskRelationList) {
                 if (processTaskRelation.getPostTaskCode() == taskCode) {
-                    if 
(queryUpStreamTaskCodeMap.containsKey(processTaskRelation.getPreTaskCode()) && 
processTaskRelation.getPreTaskCode() != 0L) {
+                    if 
(queryUpStreamTaskCodeMap.containsKey(processTaskRelation.getPreTaskCode())
+                            && processTaskRelation.getPreTaskCode() != 0L) {
                         
queryUpStreamTaskCodeMap.remove(processTaskRelation.getPreTaskCode());
                     } else {
                         processTaskRelation.setPreTaskCode(0L);
@@ -570,16 +628,17 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
             if (queryUpStreamTaskCodeMap.isEmpty() && 
!processTaskRelationList.isEmpty()) {
                 processTaskRelationList.add(processTaskRelationList.get(0));
             }
-            updateDag(loginUser, result, 
taskRelation.getProcessDefinitionCode(), processTaskRelations, 
Lists.newArrayList(taskDefinitionToUpdate));
+            updateDag(loginUser, result, 
taskRelation.getProcessDefinitionCode(), processTaskRelations,
+                    Lists.newArrayList(taskDefinitionToUpdate));
         }
-        logger.info("Update task with upstream tasks complete, projectCode:{}, 
taskDefinitionCode:{}, upstreamTaskCodes:{}.",
+        logger.info(
+                "Update task with upstream tasks complete, projectCode:{}, 
taskDefinitionCode:{}, upstreamTaskCodes:{}.",
                 projectCode, taskCode, upstreamTaskCodes);
         result.put(Constants.DATA_LIST, taskCode);
         putMsg(result, Status.SUCCESS);
         return result;
     }
 
-
     /**
      * switch task definition
      *
@@ -592,13 +651,15 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
     @Override
     public Map<String, Object> switchVersion(User loginUser, long projectCode, 
long taskCode, int version) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, 
project,projectCode,WORKFLOW_SWITCH_TO_THIS_VERSION);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, 
projectCode, WORKFLOW_SWITCH_TO_THIS_VERSION);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
         if (processService.isTaskOnline(taskCode)) {
-            logger.warn("Task definition version can not be switched due to 
process definition is {}, taskDefinitionCode:{}.",
+            logger.warn(
+                    "Task definition version can not be switched due to 
process definition is {}, taskDefinitionCode:{}.",
                     ReleaseState.ONLINE.getDescp(), taskCode);
             putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE);
             return result;
@@ -609,20 +670,28 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
             putMsg(result, Status.TASK_DEFINE_NOT_EXIST, 
String.valueOf(taskCode));
             return result;
         }
-        TaskDefinitionLog taskDefinitionUpdate = 
taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version);
+        TaskDefinitionLog taskDefinitionUpdate =
+                
taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version);
         taskDefinitionUpdate.setUserId(loginUser.getId());
         taskDefinitionUpdate.setUpdateTime(new Date());
         taskDefinitionUpdate.setId(taskDefinition.getId());
         int switchVersion = 
taskDefinitionMapper.updateById(taskDefinitionUpdate);
         if (switchVersion > 0) {
-            List<ProcessTaskRelation> taskRelationList = 
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
+            List<ProcessTaskRelation> taskRelationList =
+                    processTaskRelationMapper.queryUpstreamByCode(projectCode, 
taskCode);
             if (!taskRelationList.isEmpty()) {
-                logger.info("Task definition has upstream tasks, start handle 
them after switch task, taskDefinitionCode:{}.", taskCode);
+                logger.info(
+                        "Task definition has upstream tasks, start handle them 
after switch task, taskDefinitionCode:{}.",
+                        taskCode);
                 long processDefinitionCode = 
taskRelationList.get(0).getProcessDefinitionCode();
-                List<ProcessTaskRelation> processTaskRelations = 
processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
-                updateDag(loginUser, result, processDefinitionCode, 
processTaskRelations, Lists.newArrayList(taskDefinitionUpdate));
+                List<ProcessTaskRelation> processTaskRelations =
+                        
processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
+                updateDag(loginUser, result, processDefinitionCode, 
processTaskRelations,
+                        Lists.newArrayList(taskDefinitionUpdate));
             } else {
-                logger.info("Task definition version switch complete, switch 
task version to {}, taskDefinitionCode:{}.", version, taskCode);
+                logger.info(
+                        "Task definition version switch complete, switch task 
version to {}, taskDefinitionCode:{}.",
+                        version, taskCode);
                 putMsg(result, Status.SUCCESS);
             }
         } else {
@@ -641,7 +710,8 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
         Result result = new Result();
         Project project = projectMapper.queryByCode(projectCode);
         // check user access for project
-        Map<String, Object> checkResult = 
projectService.checkProjectAndAuth(loginUser, project, 
projectCode,TASK_VERSION_VIEW);
+        Map<String, Object> checkResult =
+                projectService.checkProjectAndAuth(loginUser, project, 
projectCode, TASK_VERSION_VIEW);
         Status resultStatus = (Status) checkResult.get(Constants.STATUS);
         if (resultStatus != Status.SUCCESS) {
             putMsg(result, resultStatus);
@@ -649,7 +719,8 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
         }
         PageInfo<TaskDefinitionLog> pageInfo = new PageInfo<>(pageNo, 
pageSize);
         Page<TaskDefinitionLog> page = new Page<>(pageNo, pageSize);
-        IPage<TaskDefinitionLog> taskDefinitionVersionsPaging = 
taskDefinitionLogMapper.queryTaskDefinitionVersionsPaging(page, taskCode, 
projectCode);
+        IPage<TaskDefinitionLog> taskDefinitionVersionsPaging =
+                
taskDefinitionLogMapper.queryTaskDefinitionVersionsPaging(page, taskCode, 
projectCode);
         List<TaskDefinitionLog> taskDefinitionLogs = 
taskDefinitionVersionsPaging.getRecords();
 
         pageInfo.setTotalList(taskDefinitionLogs);
@@ -662,8 +733,9 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
     @Override
     public Map<String, Object> deleteByCodeAndVersion(User loginUser, long 
projectCode, long taskCode, int version) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, project, 
projectCode,TASK_DEFINITION_DELETE);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, 
projectCode, TASK_DEFINITION_DELETE);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -674,14 +746,16 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
             putMsg(result, Status.TASK_DEFINE_NOT_EXIST, 
String.valueOf(taskCode));
         } else {
             if (taskDefinition.getVersion() == version) {
-                logger.warn("Task definition can not be deleted due to version 
is being used, projectCode:{}, taskDefinitionCode:{}, version:{}.",
+                logger.warn(
+                        "Task definition can not be deleted due to version is 
being used, projectCode:{}, taskDefinitionCode:{}, version:{}.",
                         projectCode, taskCode, version);
                 putMsg(result, Status.MAIN_TABLE_USING_VERSION);
                 return result;
             }
             int delete = 
taskDefinitionLogMapper.deleteByCodeAndVersion(taskCode, version);
             if (delete > 0) {
-                logger.info("Task definition version delete complete, 
projectCode:{}, taskDefinitionCode:{}, version:{}.",
+                logger.info(
+                        "Task definition version delete complete, 
projectCode:{}, taskDefinitionCode:{}, version:{}.",
                         projectCode, taskCode, version);
                 putMsg(result, Status.SUCCESS);
             } else {
@@ -696,8 +770,9 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
     @Override
     public Map<String, Object> queryTaskDefinitionDetail(User loginUser, long 
projectCode, long taskCode) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, project, 
projectCode,TASK_DEFINITION);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, 
projectCode, TASK_DEFINITION);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -724,8 +799,9 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
                                                 Integer pageSize) {
         Result result = new Result();
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> checkResult = 
projectService.checkProjectAndAuth(loginUser, project, 
projectCode,TASK_DEFINITION);
+        // check user access for project
+        Map<String, Object> checkResult =
+                projectService.checkProjectAndAuth(loginUser, project, 
projectCode, TASK_DEFINITION);
         Status resultStatus = (Status) checkResult.get(Constants.STATUS);
         if (resultStatus != Status.SUCCESS) {
             putMsg(result, resultStatus);
@@ -733,8 +809,9 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
         }
         taskType = taskType == null ? StringUtils.EMPTY : taskType;
         Page<TaskMainInfo> page = new Page<>(pageNo, pageSize);
-        IPage<TaskMainInfo> taskMainInfoIPage = 
taskDefinitionMapper.queryDefineListPaging(page, projectCode, 
searchWorkflowName,
-                searchTaskName, taskType, taskExecuteType);
+        IPage<TaskMainInfo> taskMainInfoIPage =
+                taskDefinitionMapper.queryDefineListPaging(page, projectCode, 
searchWorkflowName,
+                        searchTaskName, taskType, taskExecuteType);
         List<TaskMainInfo> records = taskMainInfoIPage.getRecords();
         if (!records.isEmpty()) {
             Map<Long, TaskMainInfo> taskMainInfoMap = new HashMap<>();
@@ -800,10 +877,11 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
      */
     @Transactional
     @Override
-    public Map<String, Object> releaseTaskDefinition(User loginUser, long 
projectCode, long code, ReleaseState releaseState) {
+    public Map<String, Object> releaseTaskDefinition(User loginUser, long 
projectCode, long code,
+                                                     ReleaseState 
releaseState) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, project, projectCode,null);
+        // check user access for project
+        Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, project, projectCode, null);
         Status resultStatus = (Status) result.get(Constants.STATUS);
         if (resultStatus != Status.SUCCESS) {
             return result;
@@ -817,7 +895,8 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
             putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(code));
             return result;
         }
-        TaskDefinitionLog taskDefinitionLog = 
taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, 
taskDefinition.getVersion());
+        TaskDefinitionLog taskDefinitionLog =
+                taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, 
taskDefinition.getVersion());
         if (taskDefinitionLog == null) {
             logger.error("Task definition does not exist, 
taskDefinitionCode:{}.", code);
             putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(code));
@@ -831,8 +910,10 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
             case ONLINE:
                 String resourceIds = taskDefinition.getResourceIds();
                 if (StringUtils.isNotBlank(resourceIds)) {
-                    Integer[] resourceIdArray = 
Arrays.stream(resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new);
-                    PermissionCheck<Integer> permissionCheck = new 
PermissionCheck(AuthorizationType.RESOURCE_FILE_ID, processService, 
resourceIdArray, loginUser.getId(), logger);
+                    Integer[] resourceIdArray =
+                            
Arrays.stream(resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new);
+                    PermissionCheck<Integer> permissionCheck = new 
PermissionCheck(AuthorizationType.RESOURCE_FILE_ID,
+                            processService, resourceIdArray, 
loginUser.getId(), logger);
                     try {
                         permissionCheck.checkPermission();
                     } catch (Exception e) {
@@ -856,7 +937,8 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
             putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
             throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
         }
-        logger.error("Update taskDefinition state or taskDefinitionLog state 
to complete, taskDefinitionCode:{}.", code);
+        logger.error("Update taskDefinition state or taskDefinitionLog state 
to complete, taskDefinitionCode:{}.",
+                code);
         putMsg(result, Status.SUCCESS);
         return result;
     }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 6536a6b4ed..09ccbd7d10 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -602,7 +602,8 @@ public class ProcessServiceImpl implements ProcessService {
     @Override
     public void removeTaskLogFile(Integer processInstanceId) {
         ProcessInstance processInstance = 
processInstanceMapper.selectById(processInstanceId);
-        List<TaskInstance> taskInstanceList = 
findValidTaskListByProcessId(processInstanceId,processInstance.getTestFlag());
+        List<TaskInstance> taskInstanceList =
+                findValidTaskListByProcessId(processInstanceId, 
processInstance.getTestFlag());
         if (CollectionUtils.isEmpty(taskInstanceList)) {
             return;
         }
@@ -623,7 +624,8 @@ public class ProcessServiceImpl implements ProcessService {
     @Override
     public void deleteWorkTaskInstanceByProcessInstanceId(int 
processInstanceId) {
         ProcessInstance processInstance = 
processInstanceMapper.selectById(processInstanceId);
-        List<TaskInstance> taskInstanceList = 
findValidTaskListByProcessId(processInstanceId,processInstance.getTestFlag());
+        List<TaskInstance> taskInstanceList =
+                findValidTaskListByProcessId(processInstanceId, 
processInstance.getTestFlag());
         if (CollectionUtils.isEmpty(taskInstanceList)) {
             return;
         }
@@ -1037,7 +1039,8 @@ public class ProcessServiceImpl implements ProcessService 
{
             case COMPLEMENT_DATA:
                 // delete all the valid tasks when complement data if id is 
not null
                 if (processInstance.getId() != null) {
-                    List<TaskInstance> taskInstanceList = 
this.findValidTaskListByProcessId(processInstance.getId(), 
processInstance.getTestFlag());
+                    List<TaskInstance> taskInstanceList =
+                            
this.findValidTaskListByProcessId(processInstance.getId(), 
processInstance.getTestFlag());
                     for (TaskInstance taskInstance : taskInstanceList) {
                         taskInstance.setFlag(Flag.NO);
                         this.updateTaskInstance(taskInstance);
@@ -1051,7 +1054,8 @@ public class ProcessServiceImpl implements ProcessService 
{
                     
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
                 }
                 // delete all the valid tasks when repeat running
-                List<TaskInstance> validTaskList = 
findValidTaskListByProcessId(processInstance.getId(),processInstance.getTestFlag());
+                List<TaskInstance> validTaskList =
+                        findValidTaskListByProcessId(processInstance.getId(), 
processInstance.getTestFlag());
                 for (TaskInstance taskInstance : validTaskList) {
                     taskInstance.setFlag(Flag.NO);
                     updateTaskInstance(taskInstance);
@@ -1654,7 +1658,8 @@ public class ProcessServiceImpl implements ProcessService 
{
         if (failureStrategy == FailureStrategy.CONTINUE) {
             return true;
         }
-        List<TaskInstance> taskInstances = 
this.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(),taskInstance.getTestFlag());
+        List<TaskInstance> taskInstances =
+                
this.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), 
taskInstance.getTestFlag());
 
         for (TaskInstance task : taskInstances) {
             if (task.getState() == TaskExecutionStatus.FAILURE
@@ -1862,7 +1867,8 @@ public class ProcessServiceImpl implements ProcessService 
{
     @Override
     public List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer 
processInstanceId) {
         ProcessInstance processInstance = 
processInstanceMapper.selectById(processInstanceId);
-        return 
taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO, 
processInstance.getTestFlag());
+        return 
taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO,
+                processInstance.getTestFlag());
     }
 
     /**
@@ -2194,7 +2200,8 @@ public class ProcessServiceImpl implements ProcessService 
{
      * @return process instance
      */
     @Override
-    public ProcessInstance findLastSchedulerProcessInterval(Long 
definitionCode, DateInterval dateInterval, int testFlag) {
+    public ProcessInstance findLastSchedulerProcessInterval(Long 
definitionCode, DateInterval dateInterval,
+                                                            int testFlag) {
         return processInstanceMapper.queryLastSchedulerProcess(definitionCode,
                 dateInterval.getStartTime(),
                 dateInterval.getEndTime(),
@@ -2535,6 +2542,7 @@ public class ProcessServiceImpl implements ProcessService 
{
             
taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime());
             updateTaskDefinitionLogs.add(taskDefinitionLog);
         }
+
         if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs)) {
             List<Long> taskDefinitionCodes = updateTaskDefinitionLogs
                     .stream()
@@ -2554,15 +2562,24 @@ public class ProcessServiceImpl implements 
ProcessService {
 
         // for each taskDefinitionLog, we will insert a new version into db
         // and update the origin one if exist
-        int updateResult = updateTaskDefinitionLogs.size();
-        int insertResult = newTaskDefinitionLogs.size();
-        if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
-            insertResult = 
taskDefinitionLogMapper.batchInsert(taskDefinitionLogs);
+        int updateResult = 0;
+        int insertResult = 0;
+        if (CollectionUtils.isNotEmpty(newTaskDefinitionLogs)) {
+            insertResult += 
taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);
+        }
+        if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs)) {
+            insertResult += 
taskDefinitionLogMapper.batchInsert(updateTaskDefinitionLogs);
         }
 
         if (CollectionUtils.isNotEmpty(newTaskDefinitionLogs) && 
Boolean.TRUE.equals(syncDefine)) {
             updateResult += 
taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
         }
+        if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs) && 
Boolean.TRUE.equals(syncDefine)) {
+            for (TaskDefinitionLog taskDefinitionLog : 
updateTaskDefinitionLogs) {
+                updateResult += 
taskDefinitionMapper.updateById(taskDefinitionLog);
+            }
+        }
+
         return (insertResult & updateResult) > 0 ? 1 : 
Constants.EXIT_CODE_SUCCESS;
     }
 
@@ -3157,7 +3174,8 @@ public class ProcessServiceImpl implements ProcessService 
{
         ProcessInstance processInstance = 
findProcessInstanceDetailById(task.getProcessInstanceId()).orElse(null);
         if (processInstance != null
                 && (processInstance.getState().isFailure() || 
processInstance.getState().isStop())) {
-            List<TaskInstance> validTaskList = 
findValidTaskListByProcessId(processInstance.getId(), 
processInstance.getTestFlag());
+            List<TaskInstance> validTaskList =
+                    findValidTaskListByProcessId(processInstance.getId(), 
processInstance.getTestFlag());
             List<Long> instanceTaskCodeList =
                     
validTaskList.stream().map(TaskInstance::getTaskCode).collect(Collectors.toList());
             List<ProcessTaskRelation> taskRelations = 
findRelationByCode(processInstance.getProcessDefinitionCode(),
@@ -3183,7 +3201,8 @@ public class ProcessServiceImpl implements ProcessService 
{
     @Override
     public Integer queryTestDataSourceId(Integer onlineDataSourceId) {
         Integer testDataSourceId = 
dataSourceMapper.queryTestDataSourceId(onlineDataSourceId);
-        if (testDataSourceId!=null) return testDataSourceId;
+        if (testDataSourceId != null)
+            return testDataSourceId;
         return null;
     }
 }

Reply via email to